跳到内容

@step

本节包含 @step 装饰器的参考信息,用于创建新的 Step 子类,而无需手动定义类。

有关更多信息,请查看 教程 - Step 页面。

装饰器

step(inputs=None, outputs=None, step_type='normal')

step(inputs: Union[StepColumns, None] = None, outputs: Union[StepColumns, None] = None, step_type: Literal['normal'] = 'normal') -> Callable[..., Type[Step]]
step(inputs: Union[StepColumns, None] = None, outputs: Union[StepColumns, None] = None, step_type: Literal['global'] = 'global') -> Callable[..., Type[GlobalStep]]
step(inputs: None = None, outputs: Union[StepColumns, None] = None, step_type: Literal['generator'] = 'generator') -> Callable[..., Type[GeneratorStep]]

从处理函数创建一个 Step

参数

名称 类型 描述 默认值
inputs Union[StepColumns, None]

一个列表,包含 step 所需的输入列/键的名称;或者一个字典,其中键是列名,值是布尔值,指示该列是否是必需的。如果未提供,则默认值将为空列表 [],并将假定 step 不需要任何特定的列。默认为 None

None
outputs Union[StepColumns, None]

一个列表,包含输出列/键的名称;或者一个字典,其中键是列名,值是布尔值,指示是否将生成该列。如果未提供,则默认值将为空列表 [],并将假定 step 不需要任何特定的列。默认为 None

None
step_type Literal['normal', 'global', 'generator']

要创建的 step 的类型。有效选项包括:"normal" (Step)、"global" (GlobalStep) 或 "generator" (GeneratorStep)。默认为 "normal"

'normal'

返回

类型 描述
Callable[..., Type[_Step]]

一个可调用对象,它将根据给定的处理函数生成类型。

示例

# Normal step
@step(inputs=["instruction"], outputs=["generation"])
def GenerationStep(inputs: StepInput, dummy_generation: RuntimeParameter[str]) -> StepOutput:
    for input in inputs:
        input["generation"] = dummy_generation
    yield inputs

# Global step
@step(inputs=["instruction"], step_type="global")
def FilteringStep(inputs: StepInput, max_length: RuntimeParameter[int] = 256) -> StepOutput:
    yield [
        input
        for input in inputs
        if len(input["instruction"]) <= max_length
    ]

# Generator step
@step(outputs=["num"], step_type="generator")
def RowGenerator(num_rows: RuntimeParameter[int] = 500) -> GeneratorStepOutput:
    data = list(range(num_rows))
    for i in range(0, len(data), 100):
        last_batch = i + 100 >= len(data)
        yield [{"num": num} for num in data[i : i + 100]], last_batch
源代码位于 src/distilabel/steps/decorator.py
def step(
    inputs: Union["StepColumns", None] = None,
    outputs: Union["StepColumns", None] = None,
    step_type: Literal["normal", "global", "generator"] = "normal",
) -> Callable[..., Type["_Step"]]:
    """Creates an `Step` from a processing function.

    Args:
        inputs: a list containing the name of the inputs columns/keys or a dictionary
            where the keys are the columns and the values are booleans indicating whether
            the column is required or not, that are required by the step. If not provided
            the default will be an empty list `[]` and it will be assumed that the step
            doesn't need any specific columns. Defaults to `None`.
        outputs: a list containing the name of the outputs columns/keys or a dictionary
            where the keys are the columns and the values are booleans indicating whether
            the column will be generated or not. If not provided the default will be an
            empty list `[]` and it will be assumed that the step doesn't need any specific
            columns. Defaults to `None`.
        step_type: the kind of step to create. Valid choices are: "normal" (`Step`),
            "global" (`GlobalStep`) or "generator" (`GeneratorStep`). Defaults to
            `"normal"`.

    Returns:
        A callable that will generate the type given the processing function.

    Example:

    ```python
    # Normal step
    @step(inputs=["instruction"], outputs=["generation"])
    def GenerationStep(inputs: StepInput, dummy_generation: RuntimeParameter[str]) -> StepOutput:
        for input in inputs:
            input["generation"] = dummy_generation
        yield inputs

    # Global step
    @step(inputs=["instruction"], step_type="global")
    def FilteringStep(inputs: StepInput, max_length: RuntimeParameter[int] = 256) -> StepOutput:
        yield [
            input
            for input in inputs
            if len(input["instruction"]) <= max_length
        ]

    # Generator step
    @step(outputs=["num"], step_type="generator")
    def RowGenerator(num_rows: RuntimeParameter[int] = 500) -> GeneratorStepOutput:
        data = list(range(num_rows))
        for i in range(0, len(data), 100):
            last_batch = i + 100 >= len(data)
            yield [{"num": num} for num in data[i : i + 100]], last_batch
    ```
    """

    inputs = inputs or []
    outputs = outputs or []

    def decorator(func: ProcessingFunc) -> Type["_Step"]:
        if step_type not in _STEP_MAPPING:
            raise ValueError(
                f"Invalid step type '{step_type}'. Please, review the '{func.__name__}'"
                " function decorated with the `@step` decorator and provide a valid"
                " `step_type`. Valid choices are: 'normal', 'global' or 'generator'."
            )

        BaseClass = _STEP_MAPPING[step_type]

        signature = inspect.signature(func)

        runtime_parameters = {
            name: (
                param.annotation,
                param.default if param.default != param.empty else None,
            )
            for name, param in signature.parameters.items()
        }

        runtime_parameters = {}
        step_input_parameter = None
        for name, param in signature.parameters.items():
            if is_parameter_annotated_with(param, _RUNTIME_PARAMETER_ANNOTATION):
                runtime_parameters[name] = (
                    param.annotation,
                    param.default if param.default != param.empty else None,
                )

            if not step_type == "generator" and is_parameter_annotated_with(
                param, _STEP_INPUT_ANNOTATION
            ):
                if step_input_parameter is not None:
                    raise ValueError(
                        f"Function '{func.__name__}' has more than one parameter annotated"
                        f" with `StepInput`. Please, review the '{func.__name__}' function"
                        " decorated with the `@step` decorator and provide only one"
                        " argument annotated with `StepInput`."
                    )
                step_input_parameter = param

        RuntimeParametersModel = create_model(  # type: ignore
            "RuntimeParametersModel",
            **runtime_parameters,  # type: ignore
        )

        def inputs_property(self) -> "StepColumns":
            return inputs

        def outputs_property(self) -> "StepColumns":
            return outputs

        def process(
            self, *args: Any, **kwargs: Any
        ) -> Union["StepOutput", "GeneratorStepOutput"]:
            return func(*args, **kwargs)

        return type(  # type: ignore
            func.__name__,
            (
                BaseClass,
                RuntimeParametersModel,
            ),
            {
                "process": process,
                "inputs": property(inputs_property),
                "outputs": property(outputs_property),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
                "_built_from_decorator": True,
                # Override the `get_process_step_input` method to return the parameter
                # of the original function annotated with `StepInput`.
                "get_process_step_input": lambda self: step_input_parameter,
            },
        )

    return decorator