跳到内容

GlobalStep

GlobalStepStep 的子类,用于定义需要先完成之前的步骤才能运行的步骤,因为它会等待接收到所有输入批次后才运行。此步骤在需要运行一个步骤,该步骤需要在运行之前处理所有输入数据时非常有用。或者,它也可以用作独立步骤。

定义自定义 GlobalStep

我们可以通过创建一个新的 GlobalStep 子类并定义以下内容来定义自定义步骤

  • inputs:是一个属性,返回一个字符串列表,其中包含所需输入字段的名称,或一个字典,其中键是列的名称,值是布尔值,指示列是否为必需。

  • outputs:是一个属性,返回一个字符串列表,其中包含输出字段的名称,或一个字典,其中键是列的名称,值是布尔值,指示列是否为必需。

  • process:是一个方法,接收输入数据并返回输出数据,它应该是一个生成器,意味着它应该 yield 输出数据。

注意

process 方法的默认签名是 process(self, *inputs: StepInput) -> StepOutput。参数 inputs 应被遵守,不能提供更多参数,并且类型提示和返回类型提示也应被遵守,因为它应该能够默认接收任意数量的输入,即,一次可以有多个 Step 连接到当前步骤。

警告

为了使自定义 GlobalStep 子类能够与 distilabel 以及默认对 Pipeline 中的每个 Step 执行的验证和序列化正常工作,StepInputStepOutput 的类型提示应该被使用,并且不应该用双引号括起来,也不应该在 typing.TYPE_CHECKING 下导入,否则,验证和/或序列化将失败。

我们可以从 GlobalStep 类继承,并按如下方式定义 inputsoutputsprocess 方法

from typing import TYPE_CHECKING
from distilabel.steps import GlobalStep, StepInput

if TYPE_CHECKING:
    from distilabel.typing import StepColumns, StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> "StepColumns":
        ...

    @property
    def outputs(self) -> "StepColumns":
        ...

    def process(self, *inputs: StepInput) -> StepOutput:
        for upstream_step_inputs in inputs:
            for item in input:
                ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs

@step 装饰器将处理样板代码,并允许以更直接的方式定义 inputsoutputsprocess 方法。一个缺点是它不允许您访问 self 属性(如果有),也不允许设置这些属性,因此,如果您需要访问或设置任何属性,您应该使用第一种方法来定义自定义 GlobalStep 子类。

from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step

if TYPE_CHECKING:
    from distilabel.typing import StepOutput

@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> "StepOutput":
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")