数据处理步骤¶
使用步骤¶
Step
旨在用于 Pipeline
的范围内,它将协调定义的各种步骤,但也可以单独使用。
假设我们已经定义了一个 Step
,如下所示
from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput
if TYPE_CHECKING:
from distilabel.typing import StepColumns, StepOutput
class MyStep(Step):
@property
def inputs(self) -> "StepColumns":
return ["input_field"]
@property
def outputs(self) -> "StepColumns":
return ["output_field"]
def process(self, inputs: StepInput) -> "StepOutput":
for input in inputs:
input["output_field"] = input["input_field"]
yield inputs
然后我们可以如下使用它
step = MyStep(name="my-step")
step.load()
next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]
注意
当作为独立步骤使用时,始终需要执行 Step.load()
。在 pipeline 中,这将在 pipeline 执行期间自动完成。
参数¶
-
input_mappings
是一个字典,它将输入字典中的键映射到步骤期望的键。例如,如果input_mappings={"instruction": "prompt"}
,则意味着输入键prompt
将用作当前步骤的键instruction
。 -
output_mappings
是一个字典,可用于将步骤的输出映射到其他名称。例如,如果output_mappings={"conversation": "prompt"}
,则意味着输出键conversation
将重命名为prompt
以用于下一步。 -
input_batch_size
(默认设置为 50)对于每个步骤都是独立的,并将确定一次处理多少个输入字典。
运行时参数¶
Step
也可以具有 RuntimeParameter
,这些参数只能在 pipeline 初始化后调用 Pipeline.run
时使用。
from distilabel.mixins.runtime_parameters import RuntimeParameter
class Step(...):
input_batch_size: RuntimeParameter[PositiveInt] = Field(
default=DEFAULT_INPUT_BATCH_SIZE,
description="The number of rows that will contain the batches processed by the"
" step.",
)
步骤类型¶
在 distilabel
中有两种特殊类型的 Step
-
GeneratorStep
:是一个仅生成数据的步骤,它不需要来自先前步骤的任何输入数据,通常是Pipeline
中的第一个节点。更多信息:组件 -> 步骤 - GeneratorStep。 -
GlobalStep
:是一个具有标准接口的步骤,即接收输入并生成输出,但它一次处理所有数据,并且通常是Pipeline
中的最后一步。GlobalStep
需要先前的步骤完成才能开始。更多信息:组件 - 步骤 - GlobalStep。 -
Task
,本质上与默认的Step
相同,但它依赖于LLM
作为属性,并且process
方法将负责调用该 LLM。更多信息:组件 - Task。
定义自定义步骤¶
我们可以通过创建 Step
的新子类并定义以下内容来定义自定义步骤
-
inputs
:是一个属性,返回一个字符串列表,其中包含所需输入字段的名称,或者是一个字典,其中键是列的名称,值是布尔值,指示该列是否是必需的。 -
outputs
:是一个属性,返回一个字符串列表,其中包含输出字段的名称,或者是一个字典,其中键是列的名称,值是布尔值,指示该列是否是必需的。 -
process
:是一个接收输入数据并返回输出数据的方法,它应该是一个生成器,意味着它应该yield
输出数据。
注意
process
方法的默认签名是 process(self, *inputs: StepInput) -> StepOutput
。应该尊重参数 inputs
,不能提供更多参数,并且应该尊重类型提示和返回类型提示,因为它应该能够默认接收任意数量的输入,即一次可以有多个 Step
连接到当前步骤。
警告
为了使自定义 Step
子类能够与 distilabel
以及默认在 Pipeline
中的每个 Step
上执行的验证和序列化正常工作,StepInput
和 StepOutput
的类型提示应使用,而不是用双引号括起来或在 typing.TYPE_CHECKING
下导入,否则,验证和/或序列化将失败。
我们可以从 Step
类继承,并按如下方式定义 inputs
、outputs
和 process
方法
from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput
if TYPE_CHECKING:
from distilabel.typing import StepColumns, StepOutput
class CustomStep(Step):
@property
def inputs(self) -> "StepColumns":
...
@property
def outputs(self) -> "StepColumns":
...
def process(self, *inputs: StepInput) -> "StepOutput":
for upstream_step_inputs in inputs:
...
yield item
# When overridden (ideally under the `typing_extensions.override` decorator)
# @typing_extensions.override
# def process(self, inputs: StepInput) -> StepOutput:
# for input in inputs:
# ...
# yield inputs
@step
装饰器将处理样板代码,并允许以更直接的方式定义 inputs
、outputs
和 process
方法。一个缺点是它不允许您访问 self
属性(如果有),也不允许设置这些属性,因此如果您需要访问或设置任何属性,则应使用第一种方法来定义自定义 Step
子类。