跳到内容

数据处理步骤

使用步骤

Step 旨在用于 Pipeline 的范围内,它将协调定义的各种步骤,但也可以单独使用。

假设我们已经定义了一个 Step,如下所示

from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput

if TYPE_CHECKING:
    from distilabel.typing import StepColumns, StepOutput

class MyStep(Step):
    @property
    def inputs(self) -> "StepColumns":
        return ["input_field"]

    @property
    def outputs(self) -> "StepColumns":
        return ["output_field"]

    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input["output_field"] = input["input_field"]
        yield inputs

然后我们可以如下使用它

step = MyStep(name="my-step")
step.load()

next(step.process([{"input_field": "value"}]))
# [{'input_field': 'value', 'output_field': 'value'}]

注意

当作为独立步骤使用时,始终需要执行 Step.load()。在 pipeline 中,这将在 pipeline 执行期间自动完成。

参数

  • input_mappings 是一个字典,它将输入字典中的键映射到步骤期望的键。例如,如果 input_mappings={"instruction": "prompt"},则意味着输入键 prompt 将用作当前步骤的键 instruction

  • output_mappings 是一个字典,可用于将步骤的输出映射到其他名称。例如,如果 output_mappings={"conversation": "prompt"},则意味着输出键 conversation 将重命名为 prompt 以用于下一步。

  • input_batch_size(默认设置为 50)对于每个步骤都是独立的,并将确定一次处理多少个输入字典。

运行时参数

Step 也可以具有 RuntimeParameter,这些参数只能在 pipeline 初始化后调用 Pipeline.run 时使用。

from distilabel.mixins.runtime_parameters import RuntimeParameter

class Step(...):
    input_batch_size: RuntimeParameter[PositiveInt] = Field(
        default=DEFAULT_INPUT_BATCH_SIZE,
        description="The number of rows that will contain the batches processed by the"
        " step.",
    )

步骤类型

distilabel 中有两种特殊类型的 Step

  • GeneratorStep:是一个仅生成数据的步骤,它不需要来自先前步骤的任何输入数据,通常是 Pipeline 中的第一个节点。更多信息:组件 -> 步骤 - GeneratorStep

  • GlobalStep:是一个具有标准接口的步骤,即接收输入并生成输出,但它一次处理所有数据,并且通常是 Pipeline 中的最后一步。GlobalStep 需要先前的步骤完成才能开始。更多信息:组件 - 步骤 - GlobalStep

  • Task,本质上与默认的 Step 相同,但它依赖于 LLM 作为属性,并且 process 方法将负责调用该 LLM。更多信息:组件 - Task

定义自定义步骤

我们可以通过创建 Step 的新子类并定义以下内容来定义自定义步骤

  • inputs:是一个属性,返回一个字符串列表,其中包含所需输入字段的名称,或者是一个字典,其中键是列的名称,值是布尔值,指示该列是否是必需的。

  • outputs:是一个属性,返回一个字符串列表,其中包含输出字段的名称,或者是一个字典,其中键是列的名称,值是布尔值,指示该列是否是必需的。

  • process:是一个接收输入数据并返回输出数据的方法,它应该是一个生成器,意味着它应该 yield 输出数据。

注意

process 方法的默认签名是 process(self, *inputs: StepInput) -> StepOutput。应该尊重参数 inputs,不能提供更多参数,并且应该尊重类型提示和返回类型提示,因为它应该能够默认接收任意数量的输入,即一次可以有多个 Step 连接到当前步骤。

警告

为了使自定义 Step 子类能够与 distilabel 以及默认在 Pipeline 中的每个 Step 上执行的验证和序列化正常工作,StepInputStepOutput 的类型提示应使用,而不是用双引号括起来或在 typing.TYPE_CHECKING 下导入,否则,验证和/或序列化将失败。

我们可以从 Step 类继承,并按如下方式定义 inputsoutputsprocess 方法

from typing import TYPE_CHECKING
from distilabel.steps import Step, StepInput

if TYPE_CHECKING:
    from distilabel.typing import StepColumns, StepOutput

class CustomStep(Step):
    @property
    def inputs(self) -> "StepColumns":
        ...

    @property
    def outputs(self) -> "StepColumns":
        ...

    def process(self, *inputs: StepInput) -> "StepOutput":
        for upstream_step_inputs in inputs:
            ...
            yield item

    # When overridden (ideally under the `typing_extensions.override` decorator)
    # @typing_extensions.override
    # def process(self, inputs: StepInput) -> StepOutput:
    #     for input in inputs:
    #         ...
    #     yield inputs

@step 装饰器将处理样板代码,并允许以更直接的方式定义 inputsoutputsprocess 方法。一个缺点是它不允许您访问 self 属性(如果有),也不允许设置这些属性,因此如果您需要访问或设置任何属性,则应使用第一种方法来定义自定义 Step 子类。

from typing import TYPE_CHECKING
from distilabel.steps import StepInput, step

if TYPE_CHECKING:
    from distilabel.typing import StepOutput

@step(inputs=[...], outputs=[...])
def CustomStep(inputs: StepInput) -> "StepOutput":
    for input in inputs:
        ...
    yield inputs

step = CustomStep(name="my-step")