跳到内容

Distiset

本节包含 Distiset 的 API 参考。有关如何使用 CLI 的更多信息,请参阅教程 - CLI

Distiset

基类:dict

datasets.Dataset 的便捷包装器,用于推送到 Hugging Face Hub。

它是一个字典,其中键对应于内部 DAG 中的不同 leaf_steps,值是 datasets.Dataset

属性

名称 类型 描述
_pipeline_path 可选[Path]

生成数据集的 pipeline.yaml 文件的可选路径。默认为 None

_artifacts_path 可选[Path]

包含 pipeline steps 生成的 artifacts 的目录的可选路径。默认为 None

_log_filename_path 可选[Path]

pipeline 写入的生成 pipeline 的 pipeline.log 文件的可选路径。默认为 None

_citations 可选[List[str]]

包含将包含在数据集卡片中的引用的可选列表。默认为 None

源代码位于 src/distilabel/distiset.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
class Distiset(dict):
    """Convenient wrapper around `datasets.Dataset` to push to the Hugging Face Hub.

    It's a dictionary where the keys correspond to the different leaf_steps from the internal
    `DAG` and the values are `datasets.Dataset`.

    Attributes:
        _pipeline_path: Optional path to the `pipeline.yaml` file that generated the dataset.
            Defaults to `None`.
        _artifacts_path: Optional path to the directory containing the generated artifacts
            by the pipeline steps. Defaults to `None`.
        _log_filename_path: Optional path to the `pipeline.log` file that generated was written
            by the pipeline. Defaults to `None`.
        _citations: Optional list containing citations that will be included in the dataset
            card. Defaults to `None`.
    """

    _pipeline_path: Optional[Path] = None
    _artifacts_path: Optional[Path] = None
    _log_filename_path: Optional[Path] = None
    _citations: Optional[List[str]] = None

    def push_to_hub(
        self,
        repo_id: str,
        private: bool = False,
        token: Optional[str] = None,
        generate_card: bool = True,
        include_script: bool = False,
        **kwargs: Any,
    ) -> None:
        """Pushes the `Distiset` to the Hugging Face Hub, each dataset will be pushed as a different configuration
        corresponding to the leaf step that generated it.

        Args:
            repo_id:
                The ID of the repository to push to in the following format: `<user>/<dataset_name>` or
                `<org>/<dataset_name>`. Also accepts `<dataset_name>`, which will default to the namespace
                of the logged-in user.
            private:
                Whether the dataset repository should be set to private or not. Only affects repository creation:
                a repository that already exists will not be affected by that parameter.
            token:
                An optional authentication token for the Hugging Face Hub. If no token is passed, will default
                to the token saved locally when logging in with `huggingface-cli login`. Will raise an error
                if no token is passed and the user is not logged-in.
            generate_card:
                Whether to generate a dataset card or not. Defaults to True.
            include_script:
                Whether you want to push the pipeline script to the hugging face hub to share it.
                If set to True, the name of the script that was run to create the distiset will be
                automatically determined, and that will be the name of the file uploaded to your
                repository. Take into account, this operation only makes sense for a distiset obtained
                from calling `Pipeline.run()` method. Defaults to False.
            **kwargs:
                Additional keyword arguments to pass to the `push_to_hub` method of the `datasets.Dataset` object.

        Raises:
            ValueError: If no token is provided and couldn't be retrieved automatically.
        """
        script_filename = sys.argv[0]
        filename_py = (
            script_filename.split("/")[-1]
            if "/" in script_filename
            else script_filename
        )
        script_path = Path.cwd() / script_filename

        if token is None:
            token = get_hf_token(self.__class__.__name__, "token")

        for name, dataset in self.items():
            dataset.push_to_hub(
                repo_id=repo_id,
                config_name=name,
                private=private,
                token=token,
                **kwargs,
            )

        if self.artifacts_path:
            upload_folder(
                repo_id=repo_id,
                folder_path=self.artifacts_path,
                path_in_repo="artifacts",
                token=token,
                repo_type="dataset",
                commit_message="Include pipeline artifacts",
            )

        if include_script and script_path.exists():
            upload_file(
                path_or_fileobj=script_path,
                path_in_repo=filename_py,
                repo_id=repo_id,
                repo_type="dataset",
                token=token,
                commit_message="Include pipeline script",
            )

        if generate_card:
            self._generate_card(
                repo_id, token, include_script=include_script, filename_py=filename_py
            )

    def _get_card(
        self,
        repo_id: str,
        token: Optional[str] = None,
        include_script: bool = False,
        filename_py: Optional[str] = None,
    ) -> DistilabelDatasetCard:
        """Generates the dataset card for the `Distiset`.

        Note:
            If `repo_id` and `token` are provided, it will extract the metadata from the README.md file
            on the hub.

        Args:
            repo_id: Name of the repository to push to, or the path for the distiset if saved to disk.
            token: The token to authenticate with the Hugging Face Hub.
                We assume that if it's provided, the dataset will be in the Hugging Face Hub,
                so the README metadata will be extracted from there.
            include_script: Whether to upload the script to the hugging face repository.
            filename_py: The name of the script. If `include_script` is True, the script will
                be uploaded to the repository using this name, otherwise it won't be used.

        Returns:
            The dataset card for the `Distiset`.
        """
        sample_records = {}
        for name, dataset in self.items():
            record = (
                dataset[0] if not isinstance(dataset, dict) else dataset["train"][0]
            )
            if is_PIL_available():
                from PIL import ImageFile
            else:
                ImageFile = None

            for key, value in record.items():
                # If the value is an image, we set it to an empty string to avoid the `README.md` to huge
                if ImageFile and isinstance(value, ImageFile.ImageFile):
                    value = ""
                # If list is too big, the `README.md` generated will be huge so we truncate it
                elif isinstance(value, list):
                    length = len(value)
                    if length < 10:
                        continue
                    record[key] = value[:10]
                    record[key].append(
                        f"... (truncated - showing 10 of {length} elements)"
                    )
            sample_records[name] = record

        readme_metadata = {}
        if repo_id and token:
            readme_metadata = self._extract_readme_metadata(repo_id, token)

        metadata = {
            **readme_metadata,
            "size_categories": size_categories_parser(
                max(len(dataset) for dataset in self.values())
            ),
            "tags": ["synthetic", "distilabel", "rlaif"],
        }

        card = DistilabelDatasetCard.from_template(
            card_data=DatasetCardData(**metadata),
            repo_id=repo_id,
            sample_records=sample_records,
            include_script=include_script,
            filename_py=filename_py,
            artifacts=self._get_artifacts_metadata(),
            references=self.citations,
        )

        return card

    def _get_artifacts_metadata(self) -> Dict[str, List[Dict[str, Any]]]:
        """Gets a dictionary with the metadata of the artifacts generated by the pipeline steps.

        Returns:
            A dictionary in which the key is the name of the step and the value is a list
            of dictionaries, each of them containing the name and metadata of the step artifact.
        """
        if not self.artifacts_path:
            return {}

        def iterdir_ignore_hidden(path: Path) -> Generator[Path, None, None]:
            return (f for f in Path(path).iterdir() if not f.name.startswith("."))

        artifacts_metadata = defaultdict(list)
        for step_artifacts_dir in iterdir_ignore_hidden(self.artifacts_path):
            step_name = step_artifacts_dir.stem
            for artifact_dir in iterdir_ignore_hidden(step_artifacts_dir):
                artifact_name = artifact_dir.stem
                metadata_path = artifact_dir / "metadata.json"
                metadata = json.loads(metadata_path.read_text())
                artifacts_metadata[step_name].append(
                    {"name": artifact_name, "metadata": metadata}
                )

        return dict(artifacts_metadata)

    def _extract_readme_metadata(
        self, repo_id: str, token: Optional[str]
    ) -> Dict[str, Any]:
        """Extracts the metadata from the README.md file of the dataset repository.

        We have to download the previous README.md file in the repo, extract the metadata from it,
        and generate a dict again to be passed thorough the `DatasetCardData` object.

        Args:
            repo_id: The ID of the repository to push to, from the `push_to_hub` method.
            token: The token to authenticate with the Hugging Face Hub, from the `push_to_hub` method.

        Returns:
            The metadata extracted from the README.md file of the dataset repository as a dict.
        """
        readme_path = Path(
            hf_hub_download(repo_id, "README.md", repo_type="dataset", token=token)
        )
        # Remove the '---' from the metadata
        metadata = re.findall(r"---\n(.*?)\n---", readme_path.read_text(), re.DOTALL)[0]
        metadata = yaml.safe_load(metadata)
        return metadata

    def _generate_card(
        self,
        repo_id: str,
        token: str,
        include_script: bool = False,
        filename_py: Optional[str] = None,
    ) -> None:
        """Generates a dataset card and pushes it to the Hugging Face Hub, and
        if the `pipeline.yaml` path is available in the `Distiset`, uploads that
        to the same repository.

        Args:
            repo_id: The ID of the repository to push to, from the `push_to_hub` method.
            token: The token to authenticate with the Hugging Face Hub, from the `push_to_hub` method.
            include_script: Whether to upload the script to the hugging face repository.
            filename_py: The name of the script. If `include_script` is True, the script will
                be uploaded to the repository using this name, otherwise it won't be used.
        """
        card = self._get_card(
            repo_id=repo_id,
            token=token,
            include_script=include_script,
            filename_py=filename_py,
        )

        card.push_to_hub(
            repo_id,
            repo_type="dataset",
            token=token,
        )

        if self.pipeline_path:
            # If the pipeline.yaml is available, upload it to the Hugging Face Hub as well.
            HfApi().upload_file(
                path_or_fileobj=self.pipeline_path,
                path_in_repo=PIPELINE_CONFIG_FILENAME,
                repo_id=repo_id,
                repo_type="dataset",
                token=token,
            )

        if self.log_filename_path:
            # The same we had with "pipeline.yaml" but with the log file.
            HfApi().upload_file(
                path_or_fileobj=self.log_filename_path,
                path_in_repo=PIPELINE_LOG_FILENAME,
                repo_id=repo_id,
                repo_type="dataset",
                token=token,
            )

    def train_test_split(
        self,
        train_size: float,
        shuffle: bool = True,
        seed: Optional[int] = None,
    ) -> Self:
        """Return a `Distiset` whose values will be a `datasets.DatasetDict` with two random train and test subsets.
        Splits are created from the dataset according to `train_size` and `shuffle`.

        Args:
            train_size:
                Float between `0.0` and `1.0` representing the proportion of the dataset to include in the test split.
                It will be applied to all the datasets in the `Distiset`.
            shuffle: Whether or not to shuffle the data before splitting
            seed:
                A seed to initialize the default BitGenerator, passed to the underlying method.

        Returns:
            The `Distiset` with the train-test split applied to all the datasets.
        """
        assert 0 < train_size < 1, "train_size must be a float between 0 and 1"
        for name, dataset in self.items():
            self[name] = dataset.train_test_split(
                train_size=train_size,
                shuffle=shuffle,
                seed=seed,
            )
        return self

    def save_to_disk(
        self,
        distiset_path: PathLike,
        max_shard_size: Optional[Union[str, int]] = None,
        num_shards: Optional[int] = None,
        num_proc: Optional[int] = None,
        storage_options: Optional[dict] = None,
        save_card: bool = True,
        save_pipeline_config: bool = True,
        save_pipeline_log: bool = True,
    ) -> None:
        r"""
        Saves a `Distiset` to a dataset directory, or in a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`.

        In case you want to save the `Distiset` in a remote filesystem, you can pass the `storage_options` parameter
        as you would do with `datasets`'s `Dataset.save_to_disk` method: [see example](https://hugging-face.cn/docs/datasets/filesystems#saving-serialized-datasets)

        Args:
            distiset_path: Path where you want to save the `Distiset`. It can be a local path
                (e.g. `dataset/train`) or remote URI (e.g. `s3://my-bucket/dataset/train`)
            max_shard_size: The maximum size of the dataset shards to be uploaded to the hub.
                If expressed as a string, needs to be digits followed by a unit (like `"50MB"`).
                Defaults to `None`.
            num_shards: Number of shards to write. By default the number of shards depends on
                `max_shard_size` and `num_proc`. Defaults to `None`.
            num_proc: Number of processes when downloading and generating the dataset locally.
                Multiprocessing is disabled by default. Defaults to `None`.
            storage_options: Key/value pairs to be passed on to the file-system backend, if any.
                Defaults to `None`.
            save_card: Whether to save the dataset card. Defaults to `True`.
            save_pipeline_config: Whether to save the pipeline configuration file (aka the `pipeline.yaml` file).
                Defaults to `True`.
            save_pipeline_log: Whether to save the pipeline log file (aka the `pipeline.log` file).
                Defaults to `True`.

        Examples:
            ```python
            # Save your distiset in a local folder:
            distiset.save_to_disk(distiset_path="my-distiset")
            # Save your distiset in a remote storage:
            storage_options = {
                "key": os.environ["S3_ACCESS_KEY"],
                "secret": os.environ["S3_SECRET_KEY"],
                "client_kwargs": {
                    "endpoint_url": os.environ["S3_ENDPOINT_URL"],
                    "region_name": os.environ["S3_REGION"],
                },
            }
            distiset.save_to_disk(distiset_path="my-distiset", storage_options=storage_options)
            ```
        """
        distiset_path = str(distiset_path)
        for name, dataset in self.items():
            dataset.save_to_disk(
                f"{distiset_path}/{name}",
                max_shard_size=max_shard_size,
                num_shards=num_shards,
                num_proc=num_proc,
                storage_options=storage_options,
            )

        distiset_config_folder = posixpath.join(distiset_path, DISTISET_CONFIG_FOLDER)

        fs: fsspec.AbstractFileSystem
        fs, _, _ = fsspec.get_fs_token_paths(
            distiset_config_folder, storage_options=storage_options
        )
        fs.makedirs(distiset_config_folder, exist_ok=True)

        if self.artifacts_path:
            distiset_artifacts_folder = posixpath.join(
                distiset_path, DISTISET_ARTIFACTS_FOLDER
            )
            fs.copy(str(self.artifacts_path), distiset_artifacts_folder, recursive=True)

        if save_card:
            # NOTE: Currently the card is not the same if we write to disk or push to the HF hub,
            # as we aren't generating the README copying/updating the data from the dataset repo.
            card = self._get_card(repo_id=Path(distiset_path).stem, token=None)
            new_filename = posixpath.join(distiset_config_folder, "README.md")
            if storage_options:
                # Write the card the same way as DatasetCard.save does:
                with fs.open(new_filename, "w", newline="", encoding="utf-8") as f:
                    f.write(str(card))
            else:
                card.save(new_filename)

        # Write our internal files to the distiset folder by copying them to the distiset folder.
        if save_pipeline_config and self.pipeline_path:
            new_filename = posixpath.join(
                distiset_config_folder, PIPELINE_CONFIG_FILENAME
            )
            if self.pipeline_path.exists() and (not fs.isfile(new_filename)):
                data = yaml.safe_load(self.pipeline_path.read_text())
                with fs.open(new_filename, "w", encoding="utf-8") as f:
                    yaml.dump(data, f, default_flow_style=False)

        if save_pipeline_log and self.log_filename_path:
            new_filename = posixpath.join(distiset_config_folder, PIPELINE_LOG_FILENAME)
            if self.log_filename_path.exists() and (not fs.isfile(new_filename)):
                data = self.log_filename_path.read_text()
                with fs.open(new_filename, "w", encoding="utf-8") as f:
                    f.write(data)

    @classmethod
    def load_from_disk(
        cls,
        distiset_path: PathLike,
        keep_in_memory: Optional[bool] = None,
        storage_options: Optional[Dict[str, Any]] = None,
        download_dir: Optional[PathLike] = None,
    ) -> Self:
        """Loads a dataset that was previously saved using `Distiset.save_to_disk` from a dataset
        directory, or from a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`.

        Args:
            distiset_path: Path ("dataset/train") or remote URI ("s3://bucket/dataset/train").
            keep_in_memory: Whether to copy the dataset in-memory, see `datasets.Dataset.load_from_disk``
                for more information. Defaults to `None`.
            storage_options: Key/value pairs to be passed on to the file-system backend, if any.
                Defaults to `None`.
            download_dir: Optional directory to download the dataset to. Defaults to None,
                in which case it will create a temporary directory.

        Returns:
            A `Distiset` loaded from disk, it should be a `Distiset` object created using `Distiset.save_to_disk`.
        """
        original_distiset_path = str(distiset_path)

        fs: fsspec.AbstractFileSystem
        fs, _, [distiset_path] = fsspec.get_fs_token_paths(  # type: ignore
            original_distiset_path, storage_options=storage_options
        )
        dest_distiset_path = distiset_path

        assert fs.isdir(
            original_distiset_path
        ), "`distiset_path` must be a `PathLike` object pointing to a folder or a URI of a remote filesystem."

        has_config = False
        has_artifacts = False
        distiset = cls()

        if is_remote_filesystem(fs):
            src_dataset_path = distiset_path
            if download_dir:
                dest_distiset_path = download_dir
            else:
                dest_distiset_path = Dataset._build_local_temp_path(src_dataset_path)  # type: ignore
            fs.download(src_dataset_path, dest_distiset_path.as_posix(), recursive=True)  # type: ignore

        # Now we should have the distiset locally, so we can read those files
        for folder in Path(dest_distiset_path).iterdir():
            if folder.stem == DISTISET_CONFIG_FOLDER:
                has_config = True
                continue
            elif folder.stem == DISTISET_ARTIFACTS_FOLDER:
                has_artifacts = True
                continue
            distiset[folder.stem] = load_from_disk(
                str(folder),
                keep_in_memory=keep_in_memory,
            )

        # From the config folder we just need to point to the files. Once downloaded we set the path to point to point to the files. Once downloaded we set the path
        # to wherever they are.
        if has_config:
            distiset_config_folder = posixpath.join(
                dest_distiset_path, DISTISET_CONFIG_FOLDER
            )

            pipeline_path = posixpath.join(
                distiset_config_folder, PIPELINE_CONFIG_FILENAME
            )
            if Path(pipeline_path).exists():
                distiset.pipeline_path = Path(pipeline_path)

            log_filename_path = posixpath.join(
                distiset_config_folder, PIPELINE_LOG_FILENAME
            )
            if Path(log_filename_path).exists():
                distiset.log_filename_path = Path(log_filename_path)

        if has_artifacts:
            distiset.artifacts_path = Path(
                posixpath.join(dest_distiset_path, DISTISET_ARTIFACTS_FOLDER)
            )

        return distiset

    @property
    def pipeline_path(self) -> Union[Path, None]:
        """Returns the path to the `pipeline.yaml` file that generated the `Pipeline`."""
        return self._pipeline_path

    @pipeline_path.setter
    def pipeline_path(self, path: PathLike) -> None:
        self._pipeline_path = Path(path)

    @property
    def artifacts_path(self) -> Union[Path, None]:
        """Returns the path to the directory containing the artifacts generated by the steps
        of the pipeline."""
        return self._artifacts_path

    @artifacts_path.setter
    def artifacts_path(self, path: PathLike) -> None:
        self._artifacts_path = Path(path)

    @property
    def log_filename_path(self) -> Union[Path, None]:
        """Returns the path to the `pipeline.log` file that generated the `Pipeline`."""
        return self._log_filename_path

    @log_filename_path.setter
    def log_filename_path(self, path: PathLike) -> None:
        self._log_filename_path = Path(path)

    @property
    def citations(self) -> Union[List[str], None]:
        """Bibtex references to be included in the README."""
        return self._citations

    @citations.setter
    def citations(self, citations_: List[str]) -> None:
        self._citations = sorted(set(citations_))

    def __repr__(self):
        # Copy from `datasets.DatasetDict.__repr__`.
        repr = "\n".join([f"{k}: {v}" for k, v in self.items()])
        repr = re.sub(r"^", " " * 4, repr, count=0, flags=re.M)
        return f"Distiset({{\n{repr}\n}})"

    def transform_columns_to_image(self, columns: Union[str, list[str]]) -> Self:
        """Transforms the columns of the dataset to `PIL.Image` objects.

        Args:
            columns: Column or list of columns to transform.

        Returns:
            Transforms the columns of the dataset to `PIL.Image` objects before pushing,
            so the Hub treats them as Image objects and can be rendered in the dataset
            viewer, and cast them to be automatically transformed when downloading
            the dataset back.
        """
        from datasets import Image

        from distilabel.models.image_generation.utils import image_from_str

        columns = [columns] if isinstance(columns, str) else columns

        def cast_to_image(row: dict) -> dict:
            for column in columns:
                row[column] = image_from_str(row[column])
            return row

        for name, dataset in self.items():
            # In case train_test_split was called
            if isinstance(dataset, DatasetDict):
                for split, dataset_split in dataset.items():
                    dataset_split = dataset_split.map(cast_to_image)
                    for column in columns:
                        if column in dataset_split.column_names:
                            dataset_split = dataset_split.cast_column(
                                column, Image(decode=True)
                            )
                    self[name][split] = dataset_split
            else:
                dataset = dataset.map(cast_to_image)

                for column in columns:
                    if column in dataset.column_names:
                        dataset = dataset.cast_column(column, Image(decode=True))

                self[name] = dataset

        return self

pipeline_path property writable

返回生成 Pipelinepipeline.yaml 文件的路径。

artifacts_path property writable

返回包含 pipeline 的 steps 生成的 artifacts 的目录的路径。

log_filename_path property writable

返回生成 Pipelinepipeline.log 文件的路径。

citations property writable

要包含在 README 中的 Bibtex 参考。

push_to_hub(repo_id, private=False, token=None, generate_card=True, include_script=False, **kwargs)

Distiset 推送到 Hugging Face Hub,每个数据集将作为不同的配置推送,对应于生成它的 leaf step。

参数

名称 类型 描述 默认值
repo_id str

要推送到的仓库的 ID,格式如下:<user>/<dataset_name><org>/<dataset_name>。也接受 <dataset_name>,它将默认为已登录用户的命名空间。

必需
private bool

数据集仓库是否应设置为私有。仅影响仓库创建:已存在的仓库不受该参数影响。

False
token 可选[str]

Hugging Face Hub 的可选身份验证令牌。如果未传递令牌,则默认为使用 huggingface-cli login 登录时本地保存的令牌。如果未传递令牌且用户未登录,则会引发错误。

None
generate_card bool

是否生成数据集卡片。默认为 True。

True
include_script bool

是否要将 pipeline 脚本推送到 hugging face hub 以共享它。如果设置为 True,则将自动确定运行以创建 distiset 的脚本的名称,这将是上传到您的仓库的文件名。请注意,此操作仅对从调用 Pipeline.run() 方法获得的 distiset 有意义。默认为 False。

False
**kwargs Any

要传递给 datasets.Dataset 对象的 push_to_hub 方法的其他关键字参数。

{}

引发

类型 描述
ValueError

如果未提供令牌且无法自动检索。

源代码位于 src/distilabel/distiset.py
def push_to_hub(
    self,
    repo_id: str,
    private: bool = False,
    token: Optional[str] = None,
    generate_card: bool = True,
    include_script: bool = False,
    **kwargs: Any,
) -> None:
    """Pushes the `Distiset` to the Hugging Face Hub, each dataset will be pushed as a different configuration
    corresponding to the leaf step that generated it.

    Args:
        repo_id:
            The ID of the repository to push to in the following format: `<user>/<dataset_name>` or
            `<org>/<dataset_name>`. Also accepts `<dataset_name>`, which will default to the namespace
            of the logged-in user.
        private:
            Whether the dataset repository should be set to private or not. Only affects repository creation:
            a repository that already exists will not be affected by that parameter.
        token:
            An optional authentication token for the Hugging Face Hub. If no token is passed, will default
            to the token saved locally when logging in with `huggingface-cli login`. Will raise an error
            if no token is passed and the user is not logged-in.
        generate_card:
            Whether to generate a dataset card or not. Defaults to True.
        include_script:
            Whether you want to push the pipeline script to the hugging face hub to share it.
            If set to True, the name of the script that was run to create the distiset will be
            automatically determined, and that will be the name of the file uploaded to your
            repository. Take into account, this operation only makes sense for a distiset obtained
            from calling `Pipeline.run()` method. Defaults to False.
        **kwargs:
            Additional keyword arguments to pass to the `push_to_hub` method of the `datasets.Dataset` object.

    Raises:
        ValueError: If no token is provided and couldn't be retrieved automatically.
    """
    script_filename = sys.argv[0]
    filename_py = (
        script_filename.split("/")[-1]
        if "/" in script_filename
        else script_filename
    )
    script_path = Path.cwd() / script_filename

    if token is None:
        token = get_hf_token(self.__class__.__name__, "token")

    for name, dataset in self.items():
        dataset.push_to_hub(
            repo_id=repo_id,
            config_name=name,
            private=private,
            token=token,
            **kwargs,
        )

    if self.artifacts_path:
        upload_folder(
            repo_id=repo_id,
            folder_path=self.artifacts_path,
            path_in_repo="artifacts",
            token=token,
            repo_type="dataset",
            commit_message="Include pipeline artifacts",
        )

    if include_script and script_path.exists():
        upload_file(
            path_or_fileobj=script_path,
            path_in_repo=filename_py,
            repo_id=repo_id,
            repo_type="dataset",
            token=token,
            commit_message="Include pipeline script",
        )

    if generate_card:
        self._generate_card(
            repo_id, token, include_script=include_script, filename_py=filename_py
        )

train_test_split(train_size, shuffle=True, seed=None)

返回一个 Distiset,其值将是 datasets.DatasetDict,其中包含两个随机的训练和测试子集。拆分是根据 train_sizeshuffle 从数据集创建的。

参数

名称 类型 描述 默认值
train_size float

介于 0.01.0 之间的浮点数,表示要包含在测试拆分中的数据集的比例。它将应用于 Distiset 中的所有数据集。

必需
shuffle bool

在拆分之前是否打乱数据

True
seed 可选[int]

用于初始化默认 BitGenerator 的种子,传递给底层方法。

None

返回

类型 描述
Self

应用了训练-测试拆分的所有数据集的 Distiset

源代码位于 src/distilabel/distiset.py
def train_test_split(
    self,
    train_size: float,
    shuffle: bool = True,
    seed: Optional[int] = None,
) -> Self:
    """Return a `Distiset` whose values will be a `datasets.DatasetDict` with two random train and test subsets.
    Splits are created from the dataset according to `train_size` and `shuffle`.

    Args:
        train_size:
            Float between `0.0` and `1.0` representing the proportion of the dataset to include in the test split.
            It will be applied to all the datasets in the `Distiset`.
        shuffle: Whether or not to shuffle the data before splitting
        seed:
            A seed to initialize the default BitGenerator, passed to the underlying method.

    Returns:
        The `Distiset` with the train-test split applied to all the datasets.
    """
    assert 0 < train_size < 1, "train_size must be a float between 0 and 1"
    for name, dataset in self.items():
        self[name] = dataset.train_test_split(
            train_size=train_size,
            shuffle=shuffle,
            seed=seed,
        )
    return self

save_to_disk(distiset_path, max_shard_size=None, num_shards=None, num_proc=None, storage_options=None, save_card=True, save_pipeline_config=True, save_pipeline_log=True)

Distiset 保存到数据集目录,或使用 fsspec.spec.AbstractFileSystem 的任何实现的 文件系统中。

如果您想将 Distiset 保存在远程文件系统中,您可以像使用 datasetsDataset.save_to_disk 方法一样传递 storage_options 参数:请参阅示例

参数

名称 类型 描述 默认值
distiset_path PathLike

您想要保存 Distiset 的路径。它可以是本地路径(例如 dataset/train)或远程 URI(例如 s3://my-bucket/dataset/train

必需
max_shard_size 可选[Union[str, int]]

要上传到 hub 的数据集 shards 的最大大小。如果表示为字符串,则需要是数字后跟单位(例如 "50MB")。默认为 None

None
num_shards 可选[int]

要写入的 shards 数量。默认情况下,shards 的数量取决于 max_shard_sizenum_proc。默认为 None

None
num_proc 可选[int]

在本地下载和生成数据集时的进程数。默认情况下禁用多处理。默认为 None

None
storage_options 可选[dict]

要传递到文件系统后端的键/值对(如果有)。默认为 None

None
save_card bool

是否保存数据集卡片。默认为 True

True
save_pipeline_config bool

是否保存 pipeline 配置文件(即 pipeline.yaml 文件)。默认为 True

True
save_pipeline_log bool

是否保存 pipeline 日志文件(即 pipeline.log 文件)。默认为 True

True

示例

# Save your distiset in a local folder:
distiset.save_to_disk(distiset_path="my-distiset")
# Save your distiset in a remote storage:
storage_options = {
    "key": os.environ["S3_ACCESS_KEY"],
    "secret": os.environ["S3_SECRET_KEY"],
    "client_kwargs": {
        "endpoint_url": os.environ["S3_ENDPOINT_URL"],
        "region_name": os.environ["S3_REGION"],
    },
}
distiset.save_to_disk(distiset_path="my-distiset", storage_options=storage_options)
源代码位于 src/distilabel/distiset.py
def save_to_disk(
    self,
    distiset_path: PathLike,
    max_shard_size: Optional[Union[str, int]] = None,
    num_shards: Optional[int] = None,
    num_proc: Optional[int] = None,
    storage_options: Optional[dict] = None,
    save_card: bool = True,
    save_pipeline_config: bool = True,
    save_pipeline_log: bool = True,
) -> None:
    r"""
    Saves a `Distiset` to a dataset directory, or in a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`.

    In case you want to save the `Distiset` in a remote filesystem, you can pass the `storage_options` parameter
    as you would do with `datasets`'s `Dataset.save_to_disk` method: [see example](https://hugging-face.cn/docs/datasets/filesystems#saving-serialized-datasets)

    Args:
        distiset_path: Path where you want to save the `Distiset`. It can be a local path
            (e.g. `dataset/train`) or remote URI (e.g. `s3://my-bucket/dataset/train`)
        max_shard_size: The maximum size of the dataset shards to be uploaded to the hub.
            If expressed as a string, needs to be digits followed by a unit (like `"50MB"`).
            Defaults to `None`.
        num_shards: Number of shards to write. By default the number of shards depends on
            `max_shard_size` and `num_proc`. Defaults to `None`.
        num_proc: Number of processes when downloading and generating the dataset locally.
            Multiprocessing is disabled by default. Defaults to `None`.
        storage_options: Key/value pairs to be passed on to the file-system backend, if any.
            Defaults to `None`.
        save_card: Whether to save the dataset card. Defaults to `True`.
        save_pipeline_config: Whether to save the pipeline configuration file (aka the `pipeline.yaml` file).
            Defaults to `True`.
        save_pipeline_log: Whether to save the pipeline log file (aka the `pipeline.log` file).
            Defaults to `True`.

    Examples:
        ```python
        # Save your distiset in a local folder:
        distiset.save_to_disk(distiset_path="my-distiset")
        # Save your distiset in a remote storage:
        storage_options = {
            "key": os.environ["S3_ACCESS_KEY"],
            "secret": os.environ["S3_SECRET_KEY"],
            "client_kwargs": {
                "endpoint_url": os.environ["S3_ENDPOINT_URL"],
                "region_name": os.environ["S3_REGION"],
            },
        }
        distiset.save_to_disk(distiset_path="my-distiset", storage_options=storage_options)
        ```
    """
    distiset_path = str(distiset_path)
    for name, dataset in self.items():
        dataset.save_to_disk(
            f"{distiset_path}/{name}",
            max_shard_size=max_shard_size,
            num_shards=num_shards,
            num_proc=num_proc,
            storage_options=storage_options,
        )

    distiset_config_folder = posixpath.join(distiset_path, DISTISET_CONFIG_FOLDER)

    fs: fsspec.AbstractFileSystem
    fs, _, _ = fsspec.get_fs_token_paths(
        distiset_config_folder, storage_options=storage_options
    )
    fs.makedirs(distiset_config_folder, exist_ok=True)

    if self.artifacts_path:
        distiset_artifacts_folder = posixpath.join(
            distiset_path, DISTISET_ARTIFACTS_FOLDER
        )
        fs.copy(str(self.artifacts_path), distiset_artifacts_folder, recursive=True)

    if save_card:
        # NOTE: Currently the card is not the same if we write to disk or push to the HF hub,
        # as we aren't generating the README copying/updating the data from the dataset repo.
        card = self._get_card(repo_id=Path(distiset_path).stem, token=None)
        new_filename = posixpath.join(distiset_config_folder, "README.md")
        if storage_options:
            # Write the card the same way as DatasetCard.save does:
            with fs.open(new_filename, "w", newline="", encoding="utf-8") as f:
                f.write(str(card))
        else:
            card.save(new_filename)

    # Write our internal files to the distiset folder by copying them to the distiset folder.
    if save_pipeline_config and self.pipeline_path:
        new_filename = posixpath.join(
            distiset_config_folder, PIPELINE_CONFIG_FILENAME
        )
        if self.pipeline_path.exists() and (not fs.isfile(new_filename)):
            data = yaml.safe_load(self.pipeline_path.read_text())
            with fs.open(new_filename, "w", encoding="utf-8") as f:
                yaml.dump(data, f, default_flow_style=False)

    if save_pipeline_log and self.log_filename_path:
        new_filename = posixpath.join(distiset_config_folder, PIPELINE_LOG_FILENAME)
        if self.log_filename_path.exists() and (not fs.isfile(new_filename)):
            data = self.log_filename_path.read_text()
            with fs.open(new_filename, "w", encoding="utf-8") as f:
                f.write(data)

load_from_disk(distiset_path, keep_in_memory=None, storage_options=None, download_dir=None) classmethod

从数据集目录或使用 fsspec.spec.AbstractFileSystem 的任何实现的文件系统中,加载先前使用 Distiset.save_to_disk 保存的数据集。

参数

名称 类型 描述 默认值
distiset_path PathLike

路径(“dataset/train”)或远程 URI(“s3://bucket/dataset/train”)。

必需
keep_in_memory 可选[bool]

是否将数据集复制到内存中,有关更多信息,请参阅 datasets.Dataset.load_from_disk。默认为 None

None
storage_options 可选[Dict[str, Any]]

要传递到文件系统后端的键/值对(如果有)。默认为 None

None
download_dir 可选[PathLike]

可选的下载数据集的目录。默认为 None,在这种情况下,它将创建一个临时目录。

None

返回

类型 描述
Self

从磁盘加载的 Distiset,它应该是使用 Distiset.save_to_disk 创建的 Distiset 对象。

源代码位于 src/distilabel/distiset.py
@classmethod
def load_from_disk(
    cls,
    distiset_path: PathLike,
    keep_in_memory: Optional[bool] = None,
    storage_options: Optional[Dict[str, Any]] = None,
    download_dir: Optional[PathLike] = None,
) -> Self:
    """Loads a dataset that was previously saved using `Distiset.save_to_disk` from a dataset
    directory, or from a filesystem using any implementation of `fsspec.spec.AbstractFileSystem`.

    Args:
        distiset_path: Path ("dataset/train") or remote URI ("s3://bucket/dataset/train").
        keep_in_memory: Whether to copy the dataset in-memory, see `datasets.Dataset.load_from_disk``
            for more information. Defaults to `None`.
        storage_options: Key/value pairs to be passed on to the file-system backend, if any.
            Defaults to `None`.
        download_dir: Optional directory to download the dataset to. Defaults to None,
            in which case it will create a temporary directory.

    Returns:
        A `Distiset` loaded from disk, it should be a `Distiset` object created using `Distiset.save_to_disk`.
    """
    original_distiset_path = str(distiset_path)

    fs: fsspec.AbstractFileSystem
    fs, _, [distiset_path] = fsspec.get_fs_token_paths(  # type: ignore
        original_distiset_path, storage_options=storage_options
    )
    dest_distiset_path = distiset_path

    assert fs.isdir(
        original_distiset_path
    ), "`distiset_path` must be a `PathLike` object pointing to a folder or a URI of a remote filesystem."

    has_config = False
    has_artifacts = False
    distiset = cls()

    if is_remote_filesystem(fs):
        src_dataset_path = distiset_path
        if download_dir:
            dest_distiset_path = download_dir
        else:
            dest_distiset_path = Dataset._build_local_temp_path(src_dataset_path)  # type: ignore
        fs.download(src_dataset_path, dest_distiset_path.as_posix(), recursive=True)  # type: ignore

    # Now we should have the distiset locally, so we can read those files
    for folder in Path(dest_distiset_path).iterdir():
        if folder.stem == DISTISET_CONFIG_FOLDER:
            has_config = True
            continue
        elif folder.stem == DISTISET_ARTIFACTS_FOLDER:
            has_artifacts = True
            continue
        distiset[folder.stem] = load_from_disk(
            str(folder),
            keep_in_memory=keep_in_memory,
        )

    # From the config folder we just need to point to the files. Once downloaded we set the path to point to point to the files. Once downloaded we set the path
    # to wherever they are.
    if has_config:
        distiset_config_folder = posixpath.join(
            dest_distiset_path, DISTISET_CONFIG_FOLDER
        )

        pipeline_path = posixpath.join(
            distiset_config_folder, PIPELINE_CONFIG_FILENAME
        )
        if Path(pipeline_path).exists():
            distiset.pipeline_path = Path(pipeline_path)

        log_filename_path = posixpath.join(
            distiset_config_folder, PIPELINE_LOG_FILENAME
        )
        if Path(log_filename_path).exists():
            distiset.log_filename_path = Path(log_filename_path)

    if has_artifacts:
        distiset.artifacts_path = Path(
            posixpath.join(dest_distiset_path, DISTISET_ARTIFACTS_FOLDER)
        )

    return distiset

transform_columns_to_image(columns)

将数据集的列转换为 PIL.Image 对象。

参数

名称 类型 描述 默认值
columns Union[str, list[str]]

要转换的列或列列表。

必需

返回

类型 描述
Self

在推送之前,将数据集的列转换为 PIL.Image 对象,

Self

因此 Hub 将它们视为 Image 对象,并且可以在数据集

Self

查看器中呈现,并在下载时将它们强制转换为自动转换

Self

数据集返回。

源代码位于 src/distilabel/distiset.py
def transform_columns_to_image(self, columns: Union[str, list[str]]) -> Self:
    """Transforms the columns of the dataset to `PIL.Image` objects.

    Args:
        columns: Column or list of columns to transform.

    Returns:
        Transforms the columns of the dataset to `PIL.Image` objects before pushing,
        so the Hub treats them as Image objects and can be rendered in the dataset
        viewer, and cast them to be automatically transformed when downloading
        the dataset back.
    """
    from datasets import Image

    from distilabel.models.image_generation.utils import image_from_str

    columns = [columns] if isinstance(columns, str) else columns

    def cast_to_image(row: dict) -> dict:
        for column in columns:
            row[column] = image_from_str(row[column])
        return row

    for name, dataset in self.items():
        # In case train_test_split was called
        if isinstance(dataset, DatasetDict):
            for split, dataset_split in dataset.items():
                dataset_split = dataset_split.map(cast_to_image)
                for column in columns:
                    if column in dataset_split.column_names:
                        dataset_split = dataset_split.cast_column(
                            column, Image(decode=True)
                        )
                self[name][split] = dataset_split
        else:
            dataset = dataset.map(cast_to_image)

            for column in columns:
                if column in dataset.column_names:
                    dataset = dataset.cast_column(column, Image(decode=True))

            self[name] = dataset

    return self

create_distiset(data_dir, pipeline_path=None, log_filename_path=None, enable_metadata=False, dag=None)

从缓冲区文件夹创建 Distiset

此函数旨在用作辅助函数,用于从 _WriteBuffer 写入缓存数据的文件夹中创建 Distiset

参数

名称 类型 描述 默认值
data_dir Path

_WriteBuffer 写入数据缓冲区的文件夹。它应对应于 CacheLocation.data

必需
pipeline_path 可选[Path]

生成数据集的 pipeline.yaml 文件的可选路径。在内部,这将传递给创建时的 Distiset 对象,以便在 Distiset.push_to_hub 时允许将 pipeline.yaml 文件上传到 repo。

None
log_filename_path 可选[Path]

在 pipeline 运行期间生成的 pipeline.log 文件的可选路径。在内部,这将传递给创建时的 Distiset 对象,以便在 Distiset.push_to_hub 时允许将 pipeline.log 文件上传到 repo。

None
enable_metadata bool

是否在数据集中包含 distilabel 元数据列。默认为 False

False
dag 可选[DAG]

Pipeline 中包含的 DAG。如果已告知,将用于从中提取 references/citations。

None

返回

类型 描述
Distiset

从缓冲区文件夹创建的数据集,其中不同的 leaf steps 将

Distiset

对应于数据集的不同配置。

示例

from pathlib import Path
distiset = create_distiset(Path.home() / ".cache/distilabel/pipelines/path-to-pipe-hashname")
源代码位于 src/distilabel/distiset.py
def create_distiset(  # noqa: C901
    data_dir: Path,
    pipeline_path: Optional[Path] = None,
    log_filename_path: Optional[Path] = None,
    enable_metadata: bool = False,
    dag: Optional["DAG"] = None,
) -> Distiset:
    """Creates a `Distiset` from the buffer folder.

    This function is intended to be used as a helper to create a `Distiset` from from the folder
    where the cached data was written by the `_WriteBuffer`.

    Args:
        data_dir: Folder where the data buffers were written by the `_WriteBuffer`.
            It should correspond to `CacheLocation.data`.
        pipeline_path: Optional path to the pipeline.yaml file that generated the dataset.
            Internally this will be passed to the `Distiset` object on creation to allow
            uploading the `pipeline.yaml` file to the repo upon `Distiset.push_to_hub`.
        log_filename_path: Optional path to the pipeline.log file that was generated during the pipeline run.
            Internally this will be passed to the `Distiset` object on creation to allow
            uploading the `pipeline.log` file to the repo upon `Distiset.push_to_hub`.
        enable_metadata: Whether to include the distilabel metadata column in the dataset or not.
            Defaults to `False`.
        dag: DAG contained in a `Pipeline`. If informed, will be used to extract the references/
            citations from it.

    Returns:
        The dataset created from the buffer folder, where the different leaf steps will
        correspond to different configurations of the dataset.

    Examples:
        ```python
        from pathlib import Path
        distiset = create_distiset(Path.home() / ".cache/distilabel/pipelines/path-to-pipe-hashname")
        ```
    """
    from distilabel.constants import DISTILABEL_METADATA_KEY

    logger = logging.getLogger("distilabel.distiset")

    steps_outputs_dir = data_dir / STEPS_OUTPUTS_PATH

    distiset = Distiset()
    for file in steps_outputs_dir.iterdir():
        if file.is_file():
            continue

        files = [str(file) for file in list_files_in_dir(file)]
        if files:
            try:
                ds = load_dataset(
                    "parquet", name=file.stem, data_files={"train": files}
                )
                if not enable_metadata and DISTILABEL_METADATA_KEY in ds.column_names:
                    ds = ds.remove_columns(DISTILABEL_METADATA_KEY)
                distiset[file.stem] = ds
            except ArrowInvalid:
                logger.warning(f"❌ Failed to load the subset from '{file}' directory.")
                continue
        else:
            logger.warning(
                f"No output files for step '{file.stem}', can't create a dataset."
                " Did the step produce any data?"
            )

    # If there's only one dataset i.e. one config, then set the config name to `default`
    if len(distiset.keys()) == 1:
        distiset["default"] = distiset.pop(list(distiset.keys())[0])

    # If there's any artifact set the `artifacts_path` so they can be uploaded
    steps_artifacts_dir = data_dir / STEPS_ARTIFACTS_PATH
    if any(steps_artifacts_dir.rglob("*")):
        distiset.artifacts_path = steps_artifacts_dir

    # Include `pipeline.yaml` if exists
    if pipeline_path:
        distiset.pipeline_path = pipeline_path
    else:
        # If the pipeline path is not provided, try to find it in the parent directory
        # and assume that's the wanted file.
        pipeline_path = steps_outputs_dir.parent / "pipeline.yaml"
        if pipeline_path.exists():
            distiset.pipeline_path = pipeline_path

    # Include `pipeline.log` if exists
    if log_filename_path:
        distiset.log_filename_path = log_filename_path
    else:
        log_filename_path = steps_outputs_dir.parent / "pipeline.log"
        if log_filename_path.exists():
            distiset.log_filename_path = log_filename_path

    if dag:
        distiset._citations = _grab_citations(dag)

    return distiset