跳到内容

Hugging Face

本节包含与 Hugging Face 集成的现有步骤,以便轻松地将生成的数据集推送到 Hugging Face。

LoadDataFromDisk

基类: LoadDataFromHub

加载先前保存到磁盘的数据集。

如果您之前使用 save_to_disk 方法或 Distiset.save_to_disk 保存了数据集,您可以再次加载它以使用此类构建新的 pipeline。

属性

名称 类型 描述
dataset_path RuntimeParameter[Union[str, Path]]

数据集或 distiset 的路径。

split Optional[RuntimeParameter[str]]

要加载的数据集拆分(通常为 traintestvalidation)。

config Optional[RuntimeParameter[str]]

要加载的数据集的配置。默认为 default,如果数据集有多个配置,则必须提供此配置,否则会引发错误。

运行时参数
  • batch_size:处理数据时使用的批次大小。
  • dataset_path:数据集或 distiset 的路径。
  • is_distiset:要加载的数据集是否为 Distiset。默认为 False。
  • split:要加载的数据集拆分。默认为 'train'。
  • config:要加载的数据集的配置。默认为 default,如果数据集有多个配置,则必须提供此配置,否则会引发错误。
  • num_examples:要从数据集中加载的示例数量。默认情况下将加载所有示例。
  • storage_options:要传递给文件系统后端的键/值对(如果有)。默认为 None
输出列
  • 动态 (all):将由此步骤生成的列,基于从 Hugging Face Hub 加载的数据集。
类别
  • load

示例

从 Hugging Face 数据集加载数据

from distilabel.steps import LoadDataFromDisk

loader = LoadDataFromDisk(dataset_path="path/to/dataset")
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)

从 distilabel Distiset 加载数据

from distilabel.steps import LoadDataFromDisk

# Specify the configuration to load.
loader = LoadDataFromDisk(
    dataset_path="path/to/dataset",
    is_distiset=True,
    config="leaf_step_1"
)
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'a': 1}, {'a': 2}, {'a': 3}], True)

从云提供商中的 Hugging Face 数据集或 Distiset 加载数据

from distilabel.steps import LoadDataFromDisk

loader = LoadDataFromDisk(
    dataset_path="gcs://path/to/dataset",
    storage_options={"project": "experiments-0001"}
)
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)
源代码位于 src/distilabel/steps/generators/huggingface.py
class LoadDataFromDisk(LoadDataFromHub):
    """Load a dataset that was previously saved to disk.

    If you previously saved your dataset using the `save_to_disk` method, or
    `Distiset.save_to_disk` you can load it again to build a new pipeline using this class.

    Attributes:
        dataset_path: The path to the dataset or distiset.
        split: The split of the dataset to load (typically will be `train`, `test` or `validation`).
        config: The configuration of the dataset to load. Defaults to `default`, if there are
            multiple configurations in the dataset this must be suplied or an error is raised.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.
        - `dataset_path`: The path to the dataset or distiset.
        - `is_distiset`: Whether the dataset to load is a `Distiset` or not. Defaults to False.
        - `split`: The split of the dataset to load. Defaults to 'train'.
        - `config`: The configuration of the dataset to load. Defaults to `default`, if there are
            multiple configurations in the dataset this must be suplied or an error is raised.
        - `num_examples`: The number of examples to load from the dataset.
            By default will load all examples.
        - `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
            Defaults to `None`.

    Output columns:
        - dynamic (`all`): The columns that will be generated by this step, based on the
            datasets loaded from the Hugging Face Hub.

    Categories:
        - load

    Examples:
        Load data from a Hugging Face Dataset:

        ```python
        from distilabel.steps import LoadDataFromDisk

        loader = LoadDataFromDisk(dataset_path="path/to/dataset")
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```

        Load data from a distilabel Distiset:

        ```python
        from distilabel.steps import LoadDataFromDisk

        # Specify the configuration to load.
        loader = LoadDataFromDisk(
            dataset_path="path/to/dataset",
            is_distiset=True,
            config="leaf_step_1"
        )
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'a': 1}, {'a': 2}, {'a': 3}], True)
        ```

        Load data from a Hugging Face Dataset or Distiset in your cloud provider:

        ```python
        from distilabel.steps import LoadDataFromDisk

        loader = LoadDataFromDisk(
            dataset_path="gcs://path/to/dataset",
            storage_options={"project": "experiments-0001"}
        )
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```
    """

    dataset_path: RuntimeParameter[Union[str, Path]] = Field(
        default=None,
        description="Path to the dataset or distiset.",
    )
    config: Optional[RuntimeParameter[str]] = Field(
        default="default",
        description=(
            "The configuration of the dataset to load. Will default to 'default'",
            " which corresponds to a distiset with a single configuration.",
        ),
    )
    is_distiset: Optional[RuntimeParameter[bool]] = Field(
        default=False,
        description="Whether the dataset to load is a `Distiset` or not. Defaults to False.",
    )
    keep_in_memory: Optional[RuntimeParameter[bool]] = Field(
        default=None,
        description="Whether to copy the dataset in-memory, see `datasets.Dataset.load_from_disk` "
        " for more information. Defaults to `None`.",
    )
    split: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The split of the dataset to load. By default will load the whole Dataset/Distiset.",
    )
    repo_id: ExcludedField[Union[str, None]] = None

    def load(self) -> None:
        """Load the dataset from the file/s in disk."""
        super(GeneratorStep, self).load()
        if self.is_distiset:
            ds = Distiset.load_from_disk(
                self.dataset_path,
                keep_in_memory=self.keep_in_memory,
                storage_options=self.storage_options,
            )
            if self.config not in ds.keys():
                raise DistilabelUserError(
                    f"Configuration '{self.config}' not found in the Distiset, available ones"
                    f" are: {list(ds.keys())}. Please try changing the `config` parameter to one "
                    "of the available configurations.\n\n",
                    page="sections/how_to_guides/advanced/distiset/#using-the-distiset-dataset-object",
                )
            ds = ds[self.config]

        else:
            ds = load_from_disk(
                self.dataset_path,
                keep_in_memory=self.keep_in_memory,
                storage_options=self.storage_options,
            )

        if self.split:
            ds = ds[self.split]

        self._dataset = ds

        if self.num_examples:
            self._dataset = self._dataset.select(range(self.num_examples))
        else:
            self.num_examples = len(self._dataset)

    @property
    def outputs(self) -> List[str]:
        """The columns that will be generated by this step, based on the datasets from a file
        in disk.

        Returns:
            The columns that will be generated by this step.
        """
        # We assume there are Dataset/IterableDataset, not it's ...Dict counterparts
        if self._dataset is None:
            self.load()

        return self._dataset.column_names

outputs property

将由此步骤生成的列,基于来自磁盘文件的数据集。

返回

类型 描述
List[str]

将由此步骤生成的列。

load()

从磁盘中的文件加载数据集。

源代码位于 src/distilabel/steps/generators/huggingface.py
def load(self) -> None:
    """Load the dataset from the file/s in disk."""
    super(GeneratorStep, self).load()
    if self.is_distiset:
        ds = Distiset.load_from_disk(
            self.dataset_path,
            keep_in_memory=self.keep_in_memory,
            storage_options=self.storage_options,
        )
        if self.config not in ds.keys():
            raise DistilabelUserError(
                f"Configuration '{self.config}' not found in the Distiset, available ones"
                f" are: {list(ds.keys())}. Please try changing the `config` parameter to one "
                "of the available configurations.\n\n",
                page="sections/how_to_guides/advanced/distiset/#using-the-distiset-dataset-object",
            )
        ds = ds[self.config]

    else:
        ds = load_from_disk(
            self.dataset_path,
            keep_in_memory=self.keep_in_memory,
            storage_options=self.storage_options,
        )

    if self.split:
        ds = ds[self.split]

    self._dataset = ds

    if self.num_examples:
        self._dataset = self._dataset.select(range(self.num_examples))
    else:
        self.num_examples = len(self._dataset)

LoadDataFromFileSystem

基类: LoadDataFromHub

从文件系统中的文件加载数据集。

GeneratorStep 从文件系统中的文件创建数据集,使用 Hugging Face datasets 库。查看 Hugging Face Datasets 以获取有关支持的文件类型的更多信息。

属性

名称 类型 描述
data_files RuntimeParameter[Union[str, Path]]

文件路径,或包含构成数据集的文件的目录。

split RuntimeParameter[str]

要加载的数据集拆分(通常为 traintestvalidation)。

运行时参数
  • batch_size:处理数据时使用的批次大小。
  • data_files:文件路径,或包含构成数据集的文件的目录。
  • split:要加载的数据集拆分。默认为 'train'。
  • streaming:是否以流式模式加载数据集。默认为 False
  • num_examples:要从数据集中加载的示例数量。默认情况下将加载所有示例。
  • storage_options:要传递给文件系统后端的键/值对(如果有)。默认为 None
  • filetype:预期的文件类型。如果未提供,将从文件扩展名推断。对于多个文件,将从第一个文件推断。
输出列
  • 动态 (all):将由此步骤生成的列,基于从 Hugging Face Hub 加载的数据集。
类别
  • load

示例

从文件系统中的 Hugging Face 数据集加载数据

from distilabel.steps import LoadDataFromFileSystem

loader = LoadDataFromFileSystem(data_files="path/to/dataset.jsonl")
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)

如果文件扩展名不是预期的,请指定文件类型

from distilabel.steps import LoadDataFromFileSystem

loader = LoadDataFromFileSystem(filetype="csv", data_files="path/to/dataset.txtr")
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)

从云提供商中的文件加载数据

from distilabel.steps import LoadDataFromFileSystem

loader = LoadDataFromFileSystem(
    data_files="gcs://path/to/dataset",
    storage_options={"project": "experiments-0001"}
)
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)

加载数据传递 glob 模式

from distilabel.steps import LoadDataFromFileSystem

loader = LoadDataFromFileSystem(
    data_files="path/to/dataset/*.jsonl",
    streaming=True
)
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'type': 'function', 'function':...', False)
源代码位于 src/distilabel/steps/generators/huggingface.py
class LoadDataFromFileSystem(LoadDataFromHub):
    """Loads a dataset from a file in your filesystem.

    `GeneratorStep` that creates a dataset from a file in the filesystem, uses Hugging Face `datasets`
    library. Take a look at [Hugging Face Datasets](https://hugging-face.cn/docs/datasets/loading)
    for more information of the supported file types.

    Attributes:
        data_files: The path to the file, or directory containing the files that conform
            the dataset.
        split: The split of the dataset to load (typically will be `train`, `test` or `validation`).

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.
        - `data_files`: The path to the file, or directory containing the files that conform
            the dataset.
        - `split`: The split of the dataset to load. Defaults to 'train'.
        - `streaming`: Whether to load the dataset in streaming mode or not. Defaults to
            `False`.
        - `num_examples`: The number of examples to load from the dataset.
            By default will load all examples.
        - `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
            Defaults to `None`.
        - `filetype`: The expected filetype. If not provided, it will be inferred from the file extension.
            For more than one file, it will be inferred from the first file.

    Output columns:
        - dynamic (`all`): The columns that will be generated by this step, based on the
            datasets loaded from the Hugging Face Hub.

    Categories:
        - load

    Examples:
        Load data from a Hugging Face dataset in your file system:

        ```python
        from distilabel.steps import LoadDataFromFileSystem

        loader = LoadDataFromFileSystem(data_files="path/to/dataset.jsonl")
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```

        Specify a filetype if the file extension is not expected:

        ```python
        from distilabel.steps import LoadDataFromFileSystem

        loader = LoadDataFromFileSystem(filetype="csv", data_files="path/to/dataset.txtr")
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```

        Load data from a file in your cloud provider:

        ```python
        from distilabel.steps import LoadDataFromFileSystem

        loader = LoadDataFromFileSystem(
            data_files="gcs://path/to/dataset",
            storage_options={"project": "experiments-0001"}
        )
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```

        Load data passing a glob pattern:

        ```python
        from distilabel.steps import LoadDataFromFileSystem

        loader = LoadDataFromFileSystem(
            data_files="path/to/dataset/*.jsonl",
            streaming=True
        )
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'type': 'function', 'function':...', False)
        ```
    """

    data_files: RuntimeParameter[Union[str, Path]] = Field(
        default=None,
        description="The data files, or directory containing the data files, to generate the dataset from.",
    )
    filetype: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The expected filetype. If not provided, it will be inferred from the file extension.",
    )
    repo_id: ExcludedField[Union[str, None]] = None

    def load(self) -> None:
        """Load the dataset from the file/s in disk."""
        GeneratorStep.load(self)

        data_path = UPath(self.data_files, storage_options=self.storage_options)

        (data_files, self.filetype) = self._prepare_data_files(data_path)

        self._dataset = load_dataset(
            self.filetype,
            data_files=data_files,
            split=self.split,
            streaming=self.streaming,
            storage_options=self.storage_options,
        )

        if not self.streaming and self.num_examples:
            self._dataset = self._dataset.select(range(self.num_examples))
        if not self.num_examples:
            if self.streaming:
                # There's no better way to get the number of examples in a streaming dataset,
                # load it again for the moment.
                self.num_examples = len(
                    load_dataset(
                        self.filetype, data_files=self.data_files, split=self.split
                    )
                )
            else:
                self.num_examples = len(self._dataset)

    @staticmethod
    def _prepare_data_files(  # noqa: C901
        data_path: UPath,
    ) -> Tuple[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]], str]:
        """Prepare the loading process by setting the `data_files` attribute.

        Args:
            data_path: The path to the data files, or directory containing the data files.

        Returns:
            Tuple with the data files and the filetype.
        """

        def get_filetype(data_path: UPath) -> str:
            filetype = data_path.suffix.lstrip(".")
            if filetype == "jsonl":
                filetype = "json"
            return filetype

        if data_path.is_file() or (
            len(str(data_path.parent.glob(data_path.name))) >= 1
        ):
            filetype = get_filetype(data_path)
            data_files = str(data_path)

        elif data_path.is_dir():
            file_sequence = []
            file_map = defaultdict(list)
            for file_or_folder in data_path.iterdir():
                if file_or_folder.is_file():
                    file_sequence.append(str(file_or_folder))
                elif file_or_folder.is_dir():
                    for file in file_or_folder.iterdir():
                        file_sequence.append(str(file))
                        file_map[str(file_or_folder)].append(str(file))

            data_files = file_sequence or file_map
            # Try to obtain the filetype from any of the files, assuming all files have the same type.
            if file_sequence:
                filetype = get_filetype(UPath(file_sequence[0]))
            else:
                filetype = get_filetype(UPath(file_map[list(file_map.keys())[0]][0]))
        return data_files, filetype

    @property
    def outputs(self) -> List[str]:
        """The columns that will be generated by this step, based on the datasets from a file
        in disk.

        Returns:
            The columns that will be generated by this step.
        """
        # We assume there are Dataset/IterableDataset, not it's ...Dict counterparts
        if self._dataset is None:
            self.load()

        return self._dataset.column_names

outputs property

将由此步骤生成的列,基于来自磁盘文件的数据集。

返回

类型 描述
List[str]

将由此步骤生成的列。

load()

从磁盘中的文件加载数据集。

源代码位于 src/distilabel/steps/generators/huggingface.py
def load(self) -> None:
    """Load the dataset from the file/s in disk."""
    GeneratorStep.load(self)

    data_path = UPath(self.data_files, storage_options=self.storage_options)

    (data_files, self.filetype) = self._prepare_data_files(data_path)

    self._dataset = load_dataset(
        self.filetype,
        data_files=data_files,
        split=self.split,
        streaming=self.streaming,
        storage_options=self.storage_options,
    )

    if not self.streaming and self.num_examples:
        self._dataset = self._dataset.select(range(self.num_examples))
    if not self.num_examples:
        if self.streaming:
            # There's no better way to get the number of examples in a streaming dataset,
            # load it again for the moment.
            self.num_examples = len(
                load_dataset(
                    self.filetype, data_files=self.data_files, split=self.split
                )
            )
        else:
            self.num_examples = len(self._dataset)

LoadDataFromHub

基类: GeneratorStep

从 Hugging Face Hub 加载数据集。

GeneratorStep 使用 datasets 库从 Hugging Face Hub 加载数据集。

属性

名称 类型 描述
repo_id RuntimeParameter[str]

要加载的数据集的 Hugging Face Hub 仓库 ID。

split RuntimeParameter[str]

要加载的数据集拆分。

config Optional[RuntimeParameter[str]]

要加载的数据集的配置。这是可选的,仅当数据集具有多个配置时才需要。

运行时参数
  • batch_size:处理数据时使用的批次大小。
  • repo_id:要加载的数据集的 Hugging Face Hub 仓库 ID。
  • split:要加载的数据集拆分。默认为 'train'。
  • config:要加载的数据集的配置。这是可选的,仅当数据集具有多个配置时才需要。
  • revision:要加载的数据集的修订版本。默认为最新修订版本。
  • streaming:是否以流式模式加载数据集。默认为 False
  • num_examples:要从数据集中加载的示例数量。默认情况下将加载所有示例。
  • storage_options:要传递给文件系统后端的键/值对(如果有)。默认为 None
输出列
  • 动态 (all):将由此步骤生成的列,基于从 Hugging Face Hub 加载的数据集。
类别
  • load

示例

从 Hugging Face Hub 中的数据集加载数据

from distilabel.steps import LoadDataFromHub

loader = LoadDataFromHub(
    repo_id="distilabel-internal-testing/instruction-dataset-mini",
    split="test",
    batch_size=2
)
loader.load()

# Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
result = next(loader.process())
# >>> result
# ([{'prompt': 'Arianna has 12...', False)
源代码位于 src/distilabel/steps/generators/huggingface.py
class LoadDataFromHub(GeneratorStep):
    """Loads a dataset from the Hugging Face Hub.

    `GeneratorStep` that loads a dataset from the Hugging Face Hub using the `datasets`
    library.

    Attributes:
        repo_id: The Hugging Face Hub repository ID of the dataset to load.
        split: The split of the dataset to load.
        config: The configuration of the dataset to load. This is optional and only needed
            if the dataset has multiple configurations.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.
        - `repo_id`: The Hugging Face Hub repository ID of the dataset to load.
        - `split`: The split of the dataset to load. Defaults to 'train'.
        - `config`: The configuration of the dataset to load. This is optional and only
            needed if the dataset has multiple configurations.
        - `revision`: The revision of the dataset to load. Defaults to the latest revision.
        - `streaming`: Whether to load the dataset in streaming mode or not. Defaults to
            `False`.
        - `num_examples`: The number of examples to load from the dataset.
            By default will load all examples.
        - `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
            Defaults to `None`.

    Output columns:
        - dynamic (`all`): The columns that will be generated by this step, based on the
            datasets loaded from the Hugging Face Hub.

    Categories:
        - load

    Examples:
        Load data from a dataset in Hugging Face Hub:

        ```python
        from distilabel.steps import LoadDataFromHub

        loader = LoadDataFromHub(
            repo_id="distilabel-internal-testing/instruction-dataset-mini",
            split="test",
            batch_size=2
        )
        loader.load()

        # Just like we saw with LoadDataFromDicts, the `process` method will yield batches.
        result = next(loader.process())
        # >>> result
        # ([{'prompt': 'Arianna has 12...', False)
        ```
    """

    repo_id: RuntimeParameter[str] = Field(
        default=None,
        description="The Hugging Face Hub repository ID of the dataset to load.",
    )
    split: RuntimeParameter[str] = Field(
        default="train",
        description="The split of the dataset to load. Defaults to 'train'.",
    )
    config: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The configuration of the dataset to load. This is optional and only"
        " needed if the dataset has multiple configurations.",
    )
    revision: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The revision of the dataset to load. Defaults to the latest revision.",
    )
    streaming: RuntimeParameter[bool] = Field(
        default=False,
        description="Whether to load the dataset in streaming mode or not. Defaults to False.",
    )
    num_examples: Optional[RuntimeParameter[int]] = Field(
        default=None,
        description="The number of examples to load from the dataset. By default will load all examples.",
    )
    storage_options: Optional[Dict[str, Any]] = Field(
        default=None,
        description="The storage options to use when loading the dataset.",
    )

    _dataset: Union[IterableDataset, Dataset, None] = PrivateAttr(None)

    def load(self) -> None:
        """Load the dataset from the Hugging Face Hub"""
        super().load()

        if self._dataset is not None:
            # Here to simplify the functionality of distilabel.steps.generators.util.make_generator_step
            return

        self._dataset = load_dataset(
            self.repo_id,  # type: ignore
            self.config,
            split=self.split,
            revision=self.revision,
            streaming=self.streaming,
        )
        num_examples = self._get_dataset_num_examples()
        self.num_examples = (
            min(self.num_examples, num_examples) if self.num_examples else num_examples
        )

        if not self.streaming:
            self._dataset = self._dataset.select(range(self.num_examples))

    def process(self, offset: int = 0) -> "GeneratorStepOutput":
        """Yields batches from the loaded dataset from the Hugging Face Hub.

        Args:
            offset: The offset to start yielding the data from. Will be used during the caching
                process to help skipping already processed data.

        Yields:
            A tuple containing a batch of rows and a boolean indicating if the batch is
            the last one.
        """
        num_returned_rows = 0
        for batch_num, batch in enumerate(
            self._dataset.iter(batch_size=self.batch_size)  # type: ignore
        ):
            if batch_num * self.batch_size < offset:
                continue
            transformed_batch = self._transform_batch(batch)
            batch_size = len(transformed_batch)
            num_returned_rows += batch_size
            yield transformed_batch, num_returned_rows >= self.num_examples

    @property
    def outputs(self) -> List[str]:
        """The columns that will be generated by this step, based on the datasets loaded
        from the Hugging Face Hub.

        Returns:
            The columns that will be generated by this step.
        """
        return self._get_dataset_columns()

    def _transform_batch(self, batch: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Transform a batch of data from the Hugging Face Hub into a list of rows.

        Args:
            batch: The batch of data from the Hugging Face Hub.

        Returns:
            A list of rows, where each row is a dictionary of column names and values.
        """
        length = len(next(iter(batch.values())))
        rows = []
        for i in range(length):
            rows.append({col: values[i] for col, values in batch.items()})
        return rows

    def _get_dataset_num_examples(self) -> int:
        """Get the number of examples in the dataset, based on the `split` and `config`
        runtime parameters provided.

        Returns:
            The number of examples in the dataset.
        """
        default_config = self.config
        if not default_config:
            default_config = list(self._dataset_info.keys())[0]

        return self._dataset_info[default_config].splits[self.split].num_examples

    def _get_dataset_columns(self) -> List[str]:
        """Get the columns of the dataset, based on the `config` runtime parameter provided.

        Returns:
            The columns of the dataset.
        """
        return list(
            self._dataset_info[
                self.config if self.config else "default"
            ].features.keys()
        )

    @cached_property
    def _dataset_info(self) -> Dict[str, DatasetInfo]:
        """Calls the Datasets Server API from Hugging Face to obtain the dataset information.

        Returns:
            The dataset information.
        """

        try:
            return get_dataset_infos(self.repo_id)
        except Exception as e:
            warnings.warn(
                f"Failed to get dataset info from Hugging Face Hub, trying to get it loading the dataset. Error: {e}",
                UserWarning,
                stacklevel=2,
            )
            ds = load_dataset(self.repo_id, config=self.config, split=self.split)
            if self.config:
                return ds[self.config].info
            return ds.info

outputs property

将由此步骤生成的列,基于从 Hugging Face Hub 加载的数据集。

返回

类型 描述
List[str]

将由此步骤生成的列。

load()

从 Hugging Face Hub 加载数据集

源代码位于 src/distilabel/steps/generators/huggingface.py
def load(self) -> None:
    """Load the dataset from the Hugging Face Hub"""
    super().load()

    if self._dataset is not None:
        # Here to simplify the functionality of distilabel.steps.generators.util.make_generator_step
        return

    self._dataset = load_dataset(
        self.repo_id,  # type: ignore
        self.config,
        split=self.split,
        revision=self.revision,
        streaming=self.streaming,
    )
    num_examples = self._get_dataset_num_examples()
    self.num_examples = (
        min(self.num_examples, num_examples) if self.num_examples else num_examples
    )

    if not self.streaming:
        self._dataset = self._dataset.select(range(self.num_examples))

process(offset=0)

从 Hugging Face Hub 加载的数据集中生成批次。

参数

名称 类型 描述 默认
offset int

开始生成数据的偏移量。将在缓存过程中使用,以帮助跳过已处理的数据。

0

生成

类型 描述
GeneratorStepOutput

一个元组,包含一批行和一个布尔值,指示批次是否为

GeneratorStepOutput

最后一个批次。

源代码位于 src/distilabel/steps/generators/huggingface.py
def process(self, offset: int = 0) -> "GeneratorStepOutput":
    """Yields batches from the loaded dataset from the Hugging Face Hub.

    Args:
        offset: The offset to start yielding the data from. Will be used during the caching
            process to help skipping already processed data.

    Yields:
        A tuple containing a batch of rows and a boolean indicating if the batch is
        the last one.
    """
    num_returned_rows = 0
    for batch_num, batch in enumerate(
        self._dataset.iter(batch_size=self.batch_size)  # type: ignore
    ):
        if batch_num * self.batch_size < offset:
            continue
        transformed_batch = self._transform_batch(batch)
        batch_size = len(transformed_batch)
        num_returned_rows += batch_size
        yield transformed_batch, num_returned_rows >= self.num_examples

PushToHub

基类: GlobalStep

将数据推送到 Hugging Face Hub 数据集。

一个 GlobalStep,它使用输入数据创建 datasets.Dataset 并将其推送到 Hugging Face Hub。

属性

名称 类型 描述
repo_id RuntimeParameter[str]

Hugging Face Hub 仓库 ID,数据集将上传到该仓库。

split RuntimeParameter[str]

将推送的数据集拆分。默认为 "train"

private RuntimeParameter[bool]

要推送的数据集是否应为私有。默认为 False

token Optional[RuntimeParameter[str]]

将在 Hub 中进行身份验证的令牌。如果未提供,将尝试从环境变量 HF_TOKEN 获取令牌。如果未使用上述方法之一提供,则 huggingface_hub 库将尝试使用来自本地 Hugging Face CLI 配置的令牌。默认为 None

运行时参数
  • repo_id:Hugging Face Hub 仓库 ID,数据集将上传到该仓库。
  • split:将推送的数据集拆分。
  • private:要推送的数据集是否应为私有。
  • token:将在 Hub 中进行身份验证的令牌。
输入列
  • 动态 (all):来自输入的所有列将用于创建数据集。
类别
  • save
  • dataset
  • huggingface

示例

将数据集批次推送到 Hugging Face Hub 仓库

from distilabel.steps import PushToHub

push = PushToHub(repo_id="path_to/repo")
push.load()

result = next(
    push.process(
        [
            {
                "instruction": "instruction ",
                "generation": "generation"
            }
        ],
    )
)
# >>> result
# [{'instruction': 'instruction ', 'generation': 'generation'}]
源代码位于 src/distilabel/steps/globals/huggingface.py
class PushToHub(GlobalStep):
    """Push data to a Hugging Face Hub dataset.

    A `GlobalStep` which creates a `datasets.Dataset` with the input data and pushes
    it to the Hugging Face Hub.

    Attributes:
        repo_id: The Hugging Face Hub repository ID where the dataset will be uploaded.
        split: The split of the dataset that will be pushed. Defaults to `"train"`.
        private: Whether the dataset to be pushed should be private or not. Defaults to
            `False`.
        token: The token that will be used to authenticate in the Hub. If not provided, the
            token will be tried to be obtained from the environment variable `HF_TOKEN`.
            If not provided using one of the previous methods, then `huggingface_hub` library
            will try to use the token from the local Hugging Face CLI configuration. Defaults
            to `None`.

    Runtime parameters:
        - `repo_id`: The Hugging Face Hub repository ID where the dataset will be uploaded.
        - `split`: The split of the dataset that will be pushed.
        - `private`: Whether the dataset to be pushed should be private or not.
        - `token`: The token that will be used to authenticate in the Hub.

    Input columns:
        - dynamic (`all`): all columns from the input will be used to create the dataset.

    Categories:
        - save
        - dataset
        - huggingface

    Examples:
        Push batches of your dataset to the Hugging Face Hub repository:

        ```python
        from distilabel.steps import PushToHub

        push = PushToHub(repo_id="path_to/repo")
        push.load()

        result = next(
            push.process(
                [
                    {
                        "instruction": "instruction ",
                        "generation": "generation"
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'instruction ', 'generation': 'generation'}]
        ```
    """

    repo_id: RuntimeParameter[str] = Field(
        default=None,
        description="The Hugging Face Hub repository ID where the dataset will be uploaded.",
    )
    split: RuntimeParameter[str] = Field(
        default="train",
        description="The split of the dataset that will be pushed. Defaults to 'train'.",
    )
    private: RuntimeParameter[bool] = Field(
        default=False,
        description="Whether the dataset to be pushed should be private or not. Defaults"
        " to `False`.",
    )
    token: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The token that will be used to authenticate in the Hub. If not provided,"
        " the token will be tried to be obtained from the environment variable `HF_TOKEN`."
        " If not provided using one of the previous methods, then `huggingface_hub` library"
        " will try to use the token from the local Hugging Face CLI configuration. Defaults"
        " to `None`",
    )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Method that processes the input data, respecting the `datasets.Dataset` formatting,
        and pushes it to the Hugging Face Hub based on the `RuntimeParameter`s attributes.

        Args:
            inputs: that input data within a single object (as it's a GlobalStep) that
                will be transformed into a `datasets.Dataset`.

        Yields:
            Propagates the received inputs so that the `Distiset` can be generated if this is
            the last step of the `Pipeline`, or if this is not a leaf step and has follow up
            steps.
        """
        dataset_dict = defaultdict(list)
        for input in inputs:
            for key, value in input.items():
                dataset_dict[key].append(value)
        dataset_dict = dict(dataset_dict)
        dataset = Dataset.from_dict(dataset_dict)
        dataset.push_to_hub(
            self.repo_id,  # type: ignore
            split=self.split,
            private=self.private,
            token=self.token or os.getenv("HF_TOKEN"),
        )
        yield inputs

process(inputs)

处理输入数据的方法,尊重 datasets.Dataset 格式,并根据 RuntimeParameter 属性将其推送到 Hugging Face Hub。

参数

名称 类型 描述 默认
inputs StepInput

单个对象中的输入数据(因为它是一个 GlobalStep),将被转换为 datasets.Dataset

required

生成

类型 描述
StepOutput

传播接收到的输入,以便在此为

StepOutput

Pipeline 的最后一步时,或者如果这不是叶子步骤并且有后续

StepOutput

步骤时,可以生成 Distiset

源代码位于 src/distilabel/steps/globals/huggingface.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Method that processes the input data, respecting the `datasets.Dataset` formatting,
    and pushes it to the Hugging Face Hub based on the `RuntimeParameter`s attributes.

    Args:
        inputs: that input data within a single object (as it's a GlobalStep) that
            will be transformed into a `datasets.Dataset`.

    Yields:
        Propagates the received inputs so that the `Distiset` can be generated if this is
        the last step of the `Pipeline`, or if this is not a leaf step and has follow up
        steps.
    """
    dataset_dict = defaultdict(list)
    for input in inputs:
        for key, value in input.items():
            dataset_dict[key].append(value)
    dataset_dict = dict(dataset_dict)
    dataset = Dataset.from_dict(dataset_dict)
    dataset.push_to_hub(
        self.repo_id,  # type: ignore
        split=self.split,
        private=self.private,
        token=self.token or os.getenv("HF_TOKEN"),
    )
    yield inputs