GlobalStep¶
GlobalStep
是 Step
的子类,用于定义需要先完成之前的步骤才能运行的步骤,因为它会等待接收到所有输入批次后才运行。此步骤在需要运行一个步骤,该步骤需要在运行之前处理所有输入数据时非常有用。或者,它也可以用作独立步骤。
定义自定义 GlobalStep¶
我们可以通过创建一个新的 GlobalStep
子类并定义以下内容来定义自定义步骤
-
inputs
:是一个属性,返回一个字符串列表,其中包含所需输入字段的名称,或一个字典,其中键是列的名称,值是布尔值,指示列是否为必需。 -
outputs
:是一个属性,返回一个字符串列表,其中包含输出字段的名称,或一个字典,其中键是列的名称,值是布尔值,指示列是否为必需。 -
process
:是一个方法,接收输入数据并返回输出数据,它应该是一个生成器,意味着它应该yield
输出数据。
注意
process
方法的默认签名是 process(self, *inputs: StepInput) -> StepOutput
。参数 inputs
应被遵守,不能提供更多参数,并且类型提示和返回类型提示也应被遵守,因为它应该能够默认接收任意数量的输入,即,一次可以有多个 Step
连接到当前步骤。
警告
为了使自定义 GlobalStep
子类能够与 distilabel
以及默认对 Pipeline
中的每个 Step
执行的验证和序列化正常工作,StepInput
和 StepOutput
的类型提示应该被使用,并且不应该用双引号括起来,也不应该在 typing.TYPE_CHECKING
下导入,否则,验证和/或序列化将失败。
我们可以从 GlobalStep
类继承,并按如下方式定义 inputs
、outputs
和 process
方法
from typing import TYPE_CHECKING
from distilabel.steps import GlobalStep, StepInput
if TYPE_CHECKING:
from distilabel.typing import StepColumns, StepOutput
class CustomStep(Step):
@property
def inputs(self) -> "StepColumns":
...
@property
def outputs(self) -> "StepColumns":
...
def process(self, *inputs: StepInput) -> StepOutput:
for upstream_step_inputs in inputs:
for item in input:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
@step
装饰器将处理样板代码,并允许以更直接的方式定义 inputs
、outputs
和 process
方法。一个缺点是它不允许您访问 self
属性(如果有),也不允许设置这些属性,因此,如果您需要访问或设置任何属性,您应该使用第一种方法来定义自定义 GlobalStep
子类。
from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step
if TYPE_CHECKING:
from distilabel.typing import StepOutput
@step(inputs=[...], outputs=[...], step_type="global")
def CustomStep(inputs: StepInput) -> "StepOutput":
for input in inputs:
...
yield inputs
step = CustomStep(name="my-step")