跳到内容

本节包含旨在用于应用于批次的常见列操作的现有步骤。

expand

ExpandColumns

基类: Step

将包含列表的列展开为多行。

ExpandColumns 是一个 Step,它接受列列表并将它们展开为多行。新行将与原始行具有相同的数据,但展开的列除外,该列将包含原始列表中的单个项目。

属性

名称 类型 描述
columns Union[Dict[str, str], List[str]]

一个字典,将要展开的列映射到新列名,或要展开的列的列表。如果提供列表,则新列名将与列名相同。

encoded Union[bool, List[str]]

一个布尔值,用于告知列是否为 JSON 编码的列表。如果此值设置为 True,则将在展开之前解码列。或者,要指定可以编码的列,可以提供列表。在这种情况下,告知的列名必须是要展开的列的子集。

split_statistics bool

一个布尔值,用于告知是否应将 distilabel_metadata 列中的统计信息拆分为多行。如果我们想展开一些包含字符串列表的列,这些字符串列表来自解析 LLM 的输出,则应拆分 distilabel_metadata 列的 statistics_{step_name} 中的标记,以避免在之后聚合数据时将其相乘。例如,对于一个应该生成 N 个指令列表的任务,并且我们希望这 N 个指令中的每一个都在不同的行中,我们应该按 N 拆分统计信息。在这种情况下,将此值设置为 True。

输入列
  • 动态(由 columns 属性确定):要展开为多行的列。
输出列
  • 动态(由 columns 属性确定):展开的列。
类别
  • columns

示例

将选定的列展开为多行

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"]}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]

将 JSON 编码的选定列展开为多行

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
    encoded=True,  # It can also be a list of columns that are encoded, i.e. ["generation"]
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": '["generation 1", "generation 2"]'}
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]

展开选定的列并拆分 distilabel_metadata 列中的统计信息

from distilabel.steps import ExpandColumns

expand_columns = ExpandColumns(
    columns=["generation"],
    split_statistics=True,
)
expand_columns.load()

result = next(
    expand_columns.process(
        [
            {
                "instruction": "instruction 1",
                "generation": ["generation 1", "generation 2"],
                "distilabel_metadata": {
                    "statistics_generation": {
                        "input_tokens": [12],
                        "output_tokens": [12],
                    },
                },
            }
        ],
    )
)
# >>> result
# [{'instruction': 'instruction 1', 'generation': 'generation 1', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}, {'instruction': 'instruction 1', 'generation': 'generation 2', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}]
源代码位于 src/distilabel/steps/columns/expand.py
class ExpandColumns(Step):
    """Expand columns that contain lists into multiple rows.

    `ExpandColumns` is a `Step` that takes a list of columns and expands them into multiple
    rows. The new rows will have the same data as the original row, except for the expanded
    column, which will contain a single item from the original list.

    Attributes:
        columns: A dictionary that maps the column to be expanded to the new column name
            or a list of columns to be expanded. If a list is provided, the new column name
            will be the same as the column name.
        encoded: A bool to inform Whether the columns are JSON encoded lists. If this value is
            set to True, the columns will be decoded before expanding. Alternatively, to specify
            columns that can be encoded, a list can be provided. In this case, the column names
            informed must be a subset of the columns selected for expansion.
        split_statistics: A bool to inform whether the statistics in the `distilabel_metadata`
            column should be split into multiple rows.
            If we want to expand some columns containing a list of strings that come from
            having parsed the output of an LLM, the tokens in the `statistics_{step_name}`
            of the `distilabel_metadata` column should be splitted to avoid multiplying
            them if we aggregate the data afterwards. For example, with a task that is supposed
            to generate a list of N instructions, and we want each of those N instructions in
            different rows, we should split the statistics by N.
            In such a case, set this value to True.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to be expanded into
            multiple rows.

    Output columns:
        - dynamic (determined by `columns` attribute):  The expanded columns.

    Categories:
        - columns

    Examples:
        Expand the selected columns into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"]}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```

        Expand the selected columns which are JSON encoded into multiple rows:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
            encoded=True,  # It can also be a list of columns that are encoded, i.e. ["generation"]
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": '["generation 1", "generation 2"]'}
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1'}, {'instruction': 'instruction 1', 'generation': 'generation 2'}]
        ```

        Expand the selected columns and split the statistics in the `distilabel_metadata` column:

        ```python
        from distilabel.steps import ExpandColumns

        expand_columns = ExpandColumns(
            columns=["generation"],
            split_statistics=True,
        )
        expand_columns.load()

        result = next(
            expand_columns.process(
                [
                    {
                        "instruction": "instruction 1",
                        "generation": ["generation 1", "generation 2"],
                        "distilabel_metadata": {
                            "statistics_generation": {
                                "input_tokens": [12],
                                "output_tokens": [12],
                            },
                        },
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction 1', 'generation': 'generation 1', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}, {'instruction': 'instruction 1', 'generation': 'generation 2', 'distilabel_metadata': {'statistics_generation': {'input_tokens': [6], 'output_tokens': [6]}}}]
        ```
    """

    columns: Union[Dict[str, str], List[str]]
    encoded: Union[bool, List[str]] = False
    split_statistics: bool = False

    @field_validator("columns")
    @classmethod
    def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
        """Ensure that the columns are always a dictionary.

        Args:
            value: The columns to be expanded.

        Returns:
            The columns to be expanded as a dictionary.
        """
        if isinstance(value, list):
            return {col: col for col in value}

        return value

    @model_validator(mode="after")
    def is_subset(self) -> Self:
        """Ensure the "encoded" column names are a subset of the "columns" selected.

        Returns:
            The "encoded" attribute updated to work internally.
        """
        if isinstance(self.encoded, list):
            if not set(self.encoded).issubset(set(self.columns.keys())):
                raise ValueError(
                    "The 'encoded' columns must be a subset of the 'columns' selected for expansion."
                )
        if isinstance(self.encoded, bool):
            self.encoded = list(self.columns.keys()) if self.encoded else []
        return self

    @property
    def inputs(self) -> "StepColumns":
        """The columns to be expanded."""
        return list(self.columns.keys())

    @property
    def outputs(self) -> "StepColumns":
        """The expanded columns."""
        return [
            new_column if new_column else expand_column
            for expand_column, new_column in self.columns.items()
        ]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Expand the columns in the input data.

        Args:
            inputs: The input data.

        Yields:
            The expanded rows.
        """
        if self.encoded:
            for input in inputs:
                for column in self.encoded:
                    input[column] = json.loads(input[column])

        yield [row for input in inputs for row in self._expand_columns(input)]

    def _expand_columns(self, input: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Expand the columns in the input data.

        Args:
            input: The input data.

        Returns:
            The expanded rows.
        """
        metadata_visited = False
        expanded_rows = []
        # Update the columns here to avoid doing the validation on the `inputs`, as the
        # `distilabel_metadata` is not defined on Pipeline creation on the DAG.
        columns = self.columns
        if self.split_statistics:
            columns["distilabel_metadata"] = "distilabel_metadata"

        for expand_column, new_column in columns.items():  # type: ignore
            data = input.get(expand_column)
            input, metadata_visited = self._split_metadata(
                input, len(data), metadata_visited
            )

            rows = []
            for item, expanded in zip_longest(*[data, expanded_rows], fillvalue=input):
                rows.append({**expanded, new_column: item})
            expanded_rows = rows
        return expanded_rows

    def _split_metadata(
        self, input: Dict[str, Any], n: int, metadata_visited: bool = False
    ) -> None:
        """Help method to split the statistics in `distilabel_metadata` column.

        Args:
            input: The input data.
            n: Number of splits to apply to the tokens (if we have 12 tokens and want to split
                them 3 times, n==3).
            metadata_visited: Bool to prevent from updating the data more than once.

        Returns:
            Updated input with the `distilabel_metadata` updated.
        """
        # - If we want to split the statistics, we need to ensure that the metadata is present.
        # - Metadata can only be visited once per row to avoid successive splitting.
        # TODO: For an odd number of tokens, this will miss 1, we have to fix it.
        if (
            self.split_statistics
            and (metadata := input.get("distilabel_metadata", {}))
            and not metadata_visited
        ):
            for k, v in metadata.items():
                if (
                    not v
                ):  # In case it wasn't found in the metadata for some error, skip it
                    continue
                if k.startswith("statistics_") and (
                    "input_tokens" in v and "output_tokens" in v
                ):
                    # For num_generations>1 we assume all the tokens should be divided by n
                    # TODO: The tokens should always come as a list, but there can
                    # be differences
                    if isinstance(v["input_tokens"], list):
                        input_tokens = [value // n for value in v["input_tokens"]]
                    else:
                        input_tokens = [v["input_tokens"] // n]
                    if isinstance(v["input_tokens"], list):
                        output_tokens = [value // n for value in v["output_tokens"]]
                    else:
                        output_tokens = [v["output_tokens"] // n]

                    input["distilabel_metadata"][k] = {
                        "input_tokens": input_tokens,
                        "output_tokens": output_tokens,
                    }
                metadata_visited = True
            # Once we have updated the metadata, Create a list out of it to let the
            # following section to expand it as any other column.
            if isinstance(input["distilabel_metadata"], dict):
                input["distilabel_metadata"] = [input["distilabel_metadata"]] * n
        return input, metadata_visited
inputs property

要展开的列。

outputs property

展开的列。

always_dict(value) classmethod

确保列始终是字典。

参数

名称 类型 描述 默认
value Union[Dict[str, str], List[str]]

要展开的列。

必需

返回

类型 描述
Dict[str, str]

要展开的列作为字典。

源代码位于 src/distilabel/steps/columns/expand.py
@field_validator("columns")
@classmethod
def always_dict(cls, value: Union[Dict[str, str], List[str]]) -> Dict[str, str]:
    """Ensure that the columns are always a dictionary.

    Args:
        value: The columns to be expanded.

    Returns:
        The columns to be expanded as a dictionary.
    """
    if isinstance(value, list):
        return {col: col for col in value}

    return value
is_subset()

确保“encoded”列名是所选“columns”的子集。

返回

类型 描述
Self

“encoded”属性已更新以在内部工作。

源代码位于 src/distilabel/steps/columns/expand.py
@model_validator(mode="after")
def is_subset(self) -> Self:
    """Ensure the "encoded" column names are a subset of the "columns" selected.

    Returns:
        The "encoded" attribute updated to work internally.
    """
    if isinstance(self.encoded, list):
        if not set(self.encoded).issubset(set(self.columns.keys())):
            raise ValueError(
                "The 'encoded' columns must be a subset of the 'columns' selected for expansion."
            )
    if isinstance(self.encoded, bool):
        self.encoded = list(self.columns.keys()) if self.encoded else []
    return self
process(inputs)

展开输入数据中的列。

参数

名称 类型 描述 默认
inputs StepInput

输入数据。

必需

产生

类型 描述
StepOutput

展开的行。

源代码位于 src/distilabel/steps/columns/expand.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Expand the columns in the input data.

    Args:
        inputs: The input data.

    Yields:
        The expanded rows.
    """
    if self.encoded:
        for input in inputs:
            for column in self.encoded:
                input[column] = json.loads(input[column])

    yield [row for input in inputs for row in self._expand_columns(input)]

keep

KeepColumns

基类: Step

保留数据集中的选定列。

KeepColumns 是一个 Step,它实现了 process 方法,该方法仅保留 columns 属性中指定的列。此外,KeepColumns 提供了一个属性 columns 来指定要保留的列,这将覆盖属性 inputsoutputs 的默认值。

注意

提供的列的顺序很重要,因为输出将使用提供的顺序进行排序,这在通过 PushToHub 步骤推送 dataset.Dataset 或通过 Pipeline.run 输出变量推送 distilabel.Distiset 之前很有用。

属性

名称 类型 描述
columns List[str]

包含要保留的列名称的字符串列表。

输入列
  • 动态(由 columns 属性确定):要保留的列。
输出列
  • 动态(由 columns 属性确定):已保留的列。
类别
  • columns

示例

选择要保留的列

from distilabel.steps import KeepColumns

keep_columns = KeepColumns(
    columns=["instruction", "generation"],
)
keep_columns.load()

result = next(
    keep_columns.process(
        [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
    )
)
# >>> result
# [{'instruction': "What's the brightest color?", 'generation': 'white'}]
源代码位于 src/distilabel/steps/columns/keep.py
class KeepColumns(Step):
    """Keeps selected columns in the dataset.

    `KeepColumns` is a `Step` that implements the `process` method that keeps only the columns
    specified in the `columns` attribute. Also `KeepColumns` provides an attribute `columns` to
    specify the columns to keep which will override the default value for the properties `inputs`
    and `outputs`.

    Note:
        The order in which the columns are provided is important, as the output will be sorted
        using the provided order, which is useful before pushing either a `dataset.Dataset` via
        the `PushToHub` step or a `distilabel.Distiset` via the `Pipeline.run` output variable.

    Attributes:
        columns: List of strings with the names of the columns to keep.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to keep.

    Output columns:
        - dynamic (determined by `columns` attribute): The columns that were kept.

    Categories:
        - columns

    Examples:
        Select the columns to keep:

        ```python
        from distilabel.steps import KeepColumns

        keep_columns = KeepColumns(
            columns=["instruction", "generation"],
        )
        keep_columns.load()

        result = next(
            keep_columns.process(
                [{"instruction": "What's the brightest color?", "generation": "white", "model_name": "my_model"}],
            )
        )
        # >>> result
        # [{'instruction': "What's the brightest color?", 'generation': 'white'}]
        ```
    """

    columns: List[str]

    @property
    def inputs(self) -> "StepColumns":
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        """The outputs for the task are the column names in `columns`."""
        return self.columns

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method keeps only the columns specified in the `columns` attribute.

        Args:
            *inputs: A list of dictionaries with the input data.

        Yields:
            A list of dictionaries with the output data.
        """
        for input in inputs:
            outputs = []
            for item in input:
                outputs.append({col: item[col] for col in self.columns})
            yield outputs
inputs property

任务的输入是 columns 中的列名。

outputs property

任务的输出是 columns 中的列名。

process(*inputs)

process 方法仅保留 columns 属性中指定的列。

参数

名称 类型 描述 默认
*inputs StepInput

包含输入数据的字典列表。

()

产生

类型 描述
StepOutput

包含输出数据的字典列表。

源代码位于 src/distilabel/steps/columns/keep.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method keeps only the columns specified in the `columns` attribute.

    Args:
        *inputs: A list of dictionaries with the input data.

    Yields:
        A list of dictionaries with the output data.
    """
    for input in inputs:
        outputs = []
        for item in input:
            outputs.append({col: item[col] for col in self.columns})
        yield outputs

merge

MergeColumns

基类: Step

合并行中的列。

MergeColumns 是一个 Step,它实现了 process 方法,该方法调用 merge_columns 函数来处理和组合 StepInput 中的列。MergeColumns 提供了两个属性 columnsoutput_column 来指定要合并的列和生成的输出列。

如果您有一个 Task 例如生成指令,并且您想要更多这些指令的示例,则此步骤可能很有用。在这种情况下,您可以例如使用另一个 Task 以合成方式倍增您的指令,这将产生两个不同的拆分列。使用 MergeColumns,您可以合并它们并在数据集中使用它们作为单个列以进行进一步处理。

属性

名称 类型 描述
columns List[str]

包含要合并的列名称的字符串列表。

output_column Optional[str]

输出列的字符串名称

输入列
  • 动态(由 columns 属性确定):要合并的列。
输出列
  • 动态(由 columnsoutput_column 属性确定):已合并的列。
类别
  • columns

示例

组合数据集行中的列

from distilabel.steps import MergeColumns

combiner = MergeColumns(
    columns=["queries", "multiple_queries"],
    output_column="queries",
)
combiner.load()

result = next(
    combiner.process(
        [
            {
                "queries": "How are you?",
                "multiple_queries": ["What's up?", "Everything ok?"]
            }
        ],
    )
)
# >>> result
# [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
源代码位于 src/distilabel/steps/columns/merge.py
class MergeColumns(Step):
    """Merge columns from a row.

    `MergeColumns` is a `Step` that implements the `process` method that calls the `merge_columns`
    function to handle and combine columns in a `StepInput`. `MergeColumns` provides two attributes
    `columns` and `output_column` to specify the columns to merge and the resulting output column.

    This step can be useful if you have a `Task` that generates instructions for example, and you
    want to have more examples of those. In such a case, you could for example use another `Task`
    to multiply your instructions synthetically, what would yield two different columns splitted.
    Using `MergeColumns` you can merge them and use them as a single column in your dataset for
    further processing.

    Attributes:
        columns: List of strings with the names of the columns to merge.
        output_column: str name of the output column

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to merge.

    Output columns:
        - dynamic (determined by `columns` and `output_column` attributes): The columns
            that were merged.

    Categories:
        - columns

    Examples:
        Combine columns in rows of a dataset:

        ```python
        from distilabel.steps import MergeColumns

        combiner = MergeColumns(
            columns=["queries", "multiple_queries"],
            output_column="queries",
        )
        combiner.load()

        result = next(
            combiner.process(
                [
                    {
                        "queries": "How are you?",
                        "multiple_queries": ["What's up?", "Everything ok?"]
                    }
                ],
            )
        )
        # >>> result
        # [{'queries': ['How are you?', "What's up?", 'Everything ok?']}]
        ```
    """

    columns: List[str]
    output_column: Optional[str] = None

    @property
    def inputs(self) -> "StepColumns":
        return self.columns

    @property
    def outputs(self) -> "StepColumns":
        return [self.output_column] if self.output_column else ["merged_column"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        combined = []
        for input in inputs:
            combined.append(
                merge_columns(
                    input,
                    columns=self.columns,
                    new_column=self.outputs[0],
                )
            )
        yield combined

group

GroupColumns

基类: Step

StepInput 列表中组合列。

GroupColumns 是一个 Step,它实现了 process 方法,该方法调用 group_dicts 函数来处理和组合 StepInput 列表。此外,GroupColumns 提供了两个属性 columnsoutput_columns 来指定要分组的列和输出列,这将分别覆盖属性 inputsoutputs 的默认值。

属性

名称 类型 描述
columns List[str]

包含要分组的列名称的字符串列表。

output_columns Optional[List[str]]

包含输出列名称的可选字符串列表。

输入列
  • 动态(由 columns 属性确定):要分组的列。
输出列
  • 动态(由 columnsoutput_columns 属性确定):已分组的列。
类别
  • columns

示例

Group columns of a dataset:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
# [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
```

Specify the name of the output columns:

```python
from distilabel.steps import GroupColumns

group_columns = GroupColumns(
    name="group_columns",
    columns=["generation", "model_name"],
    output_columns=["generations", "generation_models"]
)
group_columns.load()

result = next(
    group_columns.process(
        [{"generation": "AI generated text"}, {"model_name": "my_model"}],
        [{"generation": "Other generated text", "model_name": "my_model"}]
    )
)
# >>> result
#[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
```
源代码位于 src/distilabel/steps/columns/group.py
class GroupColumns(Step):
    """Combines columns from a list of `StepInput`.

    `GroupColumns` is a `Step` that implements the `process` method that calls the `group_dicts`
    function to handle and combine a list of `StepInput`. Also `GroupColumns` provides two attributes
    `columns` and `output_columns` to specify the columns to group and the output columns
    which will override the default value for the properties `inputs` and `outputs`, respectively.

    Attributes:
        columns: List of strings with the names of the columns to group.
        output_columns: Optional list of strings with the names of the output columns.

    Input columns:
        - dynamic (determined by `columns` attribute): The columns to group.

    Output columns:
        - dynamic (determined by `columns` and `output_columns` attributes): The columns
            that were grouped.

    Categories:
        - columns

    Examples:

        Group columns of a dataset:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        # [{'merged_generation': ['AI generated text', 'Other generated text'], 'merged_model_name': ['my_model']}]
        ```

        Specify the name of the output columns:

        ```python
        from distilabel.steps import GroupColumns

        group_columns = GroupColumns(
            name="group_columns",
            columns=["generation", "model_name"],
            output_columns=["generations", "generation_models"]
        )
        group_columns.load()

        result = next(
            group_columns.process(
                [{"generation": "AI generated text"}, {"model_name": "my_model"}],
                [{"generation": "Other generated text", "model_name": "my_model"}]
            )
        )
        # >>> result
        #[{'generations': ['AI generated text', 'Other generated text'], 'generation_models': ['my_model']}]
        ```
    """

    columns: List[str]
    output_columns: Optional[List[str]] = None

    @property
    def inputs(self) -> List[str]:
        """The inputs for the task are the column names in `columns`."""
        return self.columns

    @property
    def outputs(self) -> List[str]:
        """The outputs for the task are the column names in `output_columns` or
        `grouped_{column}` for each column in `columns`."""
        return (
            self.output_columns
            if self.output_columns is not None
            else [f"grouped_{column}" for column in self.columns]
        )

    @override
    def process(self, *inputs: StepInput) -> "StepOutput":
        """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
        """
        yield group_columns(
            *inputs,
            group_columns=self.inputs,
            output_group_columns=self.outputs,
        )
inputs property

任务的输入是 columns 中的列名。

outputs property

任务的输出是 output_columns 中的列名,或者 columns 中每个列的 grouped_{column}

process(*inputs)

process 方法调用 group_dicts 函数来处理和组合 StepInput 列表。

参数

名称 类型 描述 默认
*inputs StepInput

要组合的 StepInput 列表。

()

产生

类型 描述
StepOutput

一个 StepOutput,其中包含使用 group_dicts 函数组合的 StepInput

源代码位于 src/distilabel/steps/columns/group.py
@override
def process(self, *inputs: StepInput) -> "StepOutput":
    """The `process` method calls the `group_dicts` function to handle and combine a list of `StepInput`.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with the combined `StepInput` using the `group_dicts` function.
    """
    yield group_columns(
        *inputs,
        group_columns=self.inputs,
        output_group_columns=self.outputs,
    )

utils

merge_distilabel_metadata(*output_dicts)

从多个输出字典中合并 DISTILABEL_METADATA_KEYDISTILABEL_METADATA_KEY 可以是包含元数据键的字典,也可以是包含元数据键字典的列表。

参数

名称 类型 描述 默认
*output_dicts Dict[str, Any]

包含 distilabel 元数据的可变数量的字典或列表。

()

返回

类型 描述
Union[Dict[str, Any], List[Dict[str, Any]]]

包含所有 distilabel 元数据的合并字典或列表。

源代码位于 src/distilabel/steps/columns/utils.py
def merge_distilabel_metadata(
    *output_dicts: Dict[str, Any],
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
    """
    Merge the `DISTILABEL_METADATA_KEY` from multiple output dictionaries. `DISTILABEL_METADATA_KEY`
    can be either a dictionary containing metadata keys or a list containing dictionaries
    of metadata keys.

    Args:
        *output_dicts: Variable number of dictionaries or lists containing distilabel metadata.

    Returns:
        A merged dictionary or list containing all the distilabel metadata.
    """
    merged_metadata = defaultdict(list)

    for output_dict in output_dicts:
        metadata = output_dict.get(DISTILABEL_METADATA_KEY, {})
        # If `distilabel_metadata_key` is a `list` then it contains dictionaries with
        # the metadata per `num_generations` created when `group_generations==True`
        if isinstance(metadata, list):
            if not isinstance(merged_metadata, list):
                merged_metadata = []
            merged_metadata.extend(metadata)
        else:
            for key, value in metadata.items():
                merged_metadata[key].append(value)

    if isinstance(merged_metadata, list):
        return merged_metadata

    final_metadata = {}
    for key, value_list in merged_metadata.items():
        if len(value_list) == 1:
            final_metadata[key] = value_list[0]
        else:
            final_metadata[key] = value_list

    return final_metadata

group_columns(*inputs, group_columns, output_group_columns=None)

在指定的 group_columns 上将多个字典列表分组为单个字典列表。如果提供了 group_columns,则它还将重命名 group_columns

参数

名称 类型 描述 默认
inputs StepInput

要组合的字典列表。

()
group_columns List[str]

要合并的键列表。

必需
output_group_columns Optional[List[str]]

用于重命名合并键的键列表。默认为 None

None

返回

类型 描述
StepInput

字典列表,其中 group_columns 的值组合成一个

StepInput

列表并重命名为 output_group_columns

源代码位于 src/distilabel/steps/columns/utils.py
def group_columns(
    *inputs: "StepInput",
    group_columns: List[str],
    output_group_columns: Optional[List[str]] = None,
) -> "StepInput":
    """Groups multiple list of dictionaries into a single list of dictionaries on the
    specified `group_columns`. If `group_columns` are provided, then it will also rename
    `group_columns`.

    Args:
        inputs: list of dictionaries to combine.
        group_columns: list of keys to merge on.
        output_group_columns: list of keys to rename the merge keys to. Defaults to `None`.

    Returns:
        A list of dictionaries where the values of the `group_columns` are combined into a
        list and renamed to `output_group_columns`.
    """
    if output_group_columns is not None and len(output_group_columns) != len(
        group_columns
    ):
        raise ValueError(
            "The length of `output_group_columns` must be the same as the length of `group_columns`."
        )
    if output_group_columns is None:
        output_group_columns = [f"grouped_{key}" for key in group_columns]
    group_columns_dict = dict(zip(group_columns, output_group_columns))

    result = []
    # Use zip to iterate over lists based on their index
    for dicts_at_index in zip(*inputs):
        combined_dict = {}
        metadata_dicts = []
        # Iterate over dicts at the same index
        for d in dicts_at_index:
            # Extract metadata for merging
            if DISTILABEL_METADATA_KEY in d:
                metadata_dicts.append(
                    {DISTILABEL_METADATA_KEY: d[DISTILABEL_METADATA_KEY]}
                )
            # Iterate over key-value pairs in each dict
            for key, value in d.items():
                if key == DISTILABEL_METADATA_KEY:
                    continue
                # If the key is in the merge_keys, append the value to the existing list
                if key in group_columns_dict.keys():
                    combined_dict.setdefault(group_columns_dict[key], []).append(value)
                # If the key is not in the merge_keys, create a new key-value pair
                else:
                    combined_dict[key] = value

        if metadata_dicts:
            combined_dict[DISTILABEL_METADATA_KEY] = merge_distilabel_metadata(
                *metadata_dicts
            )

        result.append(combined_dict)
    return result

merge_columns(row, columns, new_column='combined_key')

将字典中的列合并到指定 new_column 上的单个列中。

参数

名称 类型 描述 默认
row Dict[str, Any]

与数据集中的行对应的字典。

必需
columns List[str]

要合并的键列表。

必需
new_column str

创建的新键的名称。

'combined_key'

返回

类型 描述
Dict[str, Any]

包含新合并键的字典。

源代码位于 src/distilabel/steps/columns/utils.py
def merge_columns(
    row: Dict[str, Any], columns: List[str], new_column: str = "combined_key"
) -> Dict[str, Any]:
    """Merge columns in a dictionary into a single column on the specified `new_column`.

    Args:
        row: Dictionary corresponding to a row in a dataset.
        columns: List of keys to merge.
        new_column: Name of the new key created.

    Returns:
        Dictionary with the new merged key.
    """
    result = row.copy()  # preserve the original dictionary
    combined = []
    for key in columns:
        to_combine = result.pop(key)
        if not isinstance(to_combine, list):
            to_combine = [to_combine]
        combined += to_combine
    result[new_column] = combined
    return result