跳到内容

Extra

steps

DBSCAN

Bases: GlobalStep

DBSCAN(基于密度的空间聚类应用与噪声)在regions of high density中查找核心样本,并从中扩展集群。此算法适用于包含相似密度集群的数据。

这是一个 GlobalStep,它使用来自 sklearn 的 DBSCAN 算法对 embeddings 进行聚类。访问 TextClustering step 以获取使用示例。训练后的模型在创建 distiset 并将其推送到 Hugging Face Hub 时保存为工件。

输入列
  • projection (List[float]): 文本的向量表示,通常是 UMAP step 的输出。
输出列
  • cluster_label (int): 表示给定集群标签的整数。-1 表示未聚类。
类别
  • clustering
  • text-classification
参考文献

属性

名称 类型 描述
- eps

两个样本之间被认为在彼此邻域内的最大距离。 这不是集群内点距离的最大界限。 这是最重要的 DBSCAN 参数,需要根据您的数据集和距离函数适当选择。

- min_samples

一个点被认为是核心点的邻域中的样本数(或总权重)。 这包括点本身。 如果 min_samples 设置为较高的值,DBSCAN 将找到更密集的集群,而如果设置为较低的值,则找到的集群将更加稀疏。

- metric

在特征数组中计算实例之间距离时使用的度量。 如果 metric 是字符串或可调用对象,则它必须是 sklearn.metrics.pairwise_distances 允许的 metric 参数的选项之一。

- n_jobs

要运行的并行作业数。

运行时参数
  • eps: 两个样本之间被认为在彼此邻域内的最大距离。 这不是集群内点距离的最大界限。 这是最重要的 DBSCAN 参数,需要根据您的数据集和距离函数适当选择。
  • min_samples: 一个点被认为是核心点的邻域中的样本数(或总权重)。 这包括点本身。 如果 min_samples 设置为较高的值,DBSCAN 将找到更密集的集群,而如果设置为较低的值,则找到的集群将更加稀疏。
  • metric: 在特征数组中计算实例之间距离时使用的度量。 如果 metric 是字符串或可调用对象,则它必须是 sklearn.metrics.pairwise_distances 允许的 metric 参数的选项之一。
  • n_jobs: 要运行的并行作业数。
源代码位于 src/distilabel/steps/clustering/dbscan.py
class DBSCAN(GlobalStep):
    r"""DBSCAN (Density-Based Spatial Clustering of Applications with Noise) finds core
    samples in regions of high density and expands clusters from them. This algorithm
    is good for data which contains clusters of similar density.

    This is a `GlobalStep` that clusters the embeddings using the DBSCAN algorithm
    from `sklearn`. Visit `TextClustering` step for an example of use.
    The trained model is saved as an artifact when creating a distiset
    and pushing it to the Hugging Face Hub.

    Input columns:
        - projection (`List[float]`): Vector representation of the text to cluster,
            normally the output from the `UMAP` step.

    Output columns:
        - cluster_label (`int`): Integer representing the label of a given cluster. -1
            means it wasn't clustered.

    Categories:
        - clustering
        - text-classification

    References:
        - [`DBSCAN demo of sklearn`](https://scikit-learn.cn/stable/auto_examples/cluster/plot_dbscan.html#demo-of-dbscan-clustering-algorithm)
        - [`sklearn dbscan`](https://scikit-learn.cn/stable/modules/clustering.html#dbscan)

    Attributes:
        - eps: The maximum distance between two samples for one to be considered as in the
            neighborhood of the other. This is not a maximum bound on the distances of
            points within a cluster. This is the most important DBSCAN parameter to
            choose appropriately for your data set and distance function.
        - min_samples: The number of samples (or total weight) in a neighborhood for a point
            to be considered as a core point. This includes the point itself. If `min_samples`
            is set to a higher value, DBSCAN will find denser clusters, whereas if it is set
            to a lower value, the found clusters will be more sparse.
        - metric: The metric to use when calculating distance between instances in a feature
            array. If metric is a string or callable, it must be one of the options allowed
            by `sklearn.metrics.pairwise_distances` for its metric parameter.
        - n_jobs: The number of parallel jobs to run.

    Runtime parameters:
        - `eps`: The maximum distance between two samples for one to be considered as in the
            neighborhood of the other. This is not a maximum bound on the distances of
            points within a cluster. This is the most important DBSCAN parameter to
            choose appropriately for your data set and distance function.
        - `min_samples`: The number of samples (or total weight) in a neighborhood for a point
            to be considered as a core point. This includes the point itself. If `min_samples`
            is set to a higher value, DBSCAN will find denser clusters, whereas if it is set
            to a lower value, the found clusters will be more sparse.
        - `metric`: The metric to use when calculating distance between instances in a feature
            array. If metric is a string or callable, it must be one of the options allowed
            by `sklearn.metrics.pairwise_distances` for its metric parameter.
        - `n_jobs`: The number of parallel jobs to run.
    """

    eps: Optional[RuntimeParameter[float]] = Field(
        default=0.3,
        description=(
            "The maximum distance between two samples for one to be considered "
            "as in the neighborhood of the other. This is not a maximum bound "
            "on the distances of points within a cluster. This is the most "
            "important DBSCAN parameter to choose appropriately for your data set "
            "and distance function."
        ),
    )
    min_samples: Optional[RuntimeParameter[int]] = Field(
        default=30,
        description=(
            "The number of samples (or total weight) in a neighborhood for a point to "
            "be considered as a core point. This includes the point itself. If "
            "`min_samples` is set to a higher value, DBSCAN will find denser clusters, "
            "whereas if it is set to a lower value, the found clusters will be more "
            "sparse."
        ),
    )
    metric: Optional[RuntimeParameter[str]] = Field(
        default="euclidean",
        description=(
            "The metric to use when calculating distance between instances in a "
            "feature array. If metric is a string or callable, it must be one of "
            "the options allowed by `sklearn.metrics.pairwise_distances` for "
            "its metric parameter."
        ),
    )
    n_jobs: Optional[RuntimeParameter[int]] = Field(
        default=8, description="The number of parallel jobs to run."
    )

    _clusterer: Optional["_DBSCAN"] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if importlib.util.find_spec("sklearn") is None:
            raise ImportError(
                "`sklearn` package is not installed. Please install it using `pip install 'distilabel[text-clustering]'`."
            )
        from sklearn.cluster import DBSCAN as _DBSCAN

        self._clusterer = _DBSCAN(
            eps=self.eps,
            min_samples=self.min_samples,
            metric=self.metric,
            n_jobs=self.n_jobs,
        )

    def unload(self) -> None:
        self._clusterer = None

    @property
    def inputs(self) -> List[str]:
        return ["projection"]

    @property
    def outputs(self) -> List[str]:
        return ["cluster_label"]

    def _save_model(self, model: Any) -> None:
        import joblib

        def save_model(path):
            with open(str(path / "DBSCAN.joblib"), "wb") as f:
                joblib.dump(model, f)

        self.save_artifact(
            name="DBSCAN_model",
            write_function=lambda path: save_model(path),
            metadata={
                "eps": self.eps,
                "min_samples": self.min_samples,
                "metric": self.metric,
            },
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        projections = np.array([input["projection"] for input in inputs])

        self._logger.info("🏋️‍♀️ Start training DBSCAN...")
        fitted_clusterer = self._clusterer.fit(projections)
        cluster_labels = fitted_clusterer.labels_
        # Sets the cluster labels for each input, -1 means it wasn't clustered
        for input, cluster_label in zip(inputs, cluster_labels):
            input["cluster_label"] = cluster_label
        self._logger.info(f"DBSCAN labels assigned: {len(set(cluster_labels))}")
        self._save_model(fitted_clusterer)
        yield inputs

TextClustering

Bases: TextClassification, GlobalTask

对一组文本进行聚类并为每个集群生成摘要标签的任务。

这是一个从 TextClassification 继承的 GlobalTask,这意味着该类中的所有属性在此处都可用。 此外,在这种情况下,我们一次处理所有输入,而不是使用批次。 input_batch_size 在此处用于将示例分批发送到 LLM(与更常见的 Task 定义略有不同)。 该任务在每个集群中查找给定数量的代表性示例(数量由 samples_per_cluster 属性设置),并将它们发送到 LLM 以获取代表集群的标签。 然后将标签分配给集群中的每个文本。 此步骤中使用的集群和 projections 假定是从 UMAP + DBSCAN steps 获得的,但可以为类似的 steps 生成,只要它们代表相同的概念即可。 此 step 运行类似于此存储库中的 pipeline:https://github.com/huggingface/text-clustering

输入列
  • text (str): 我们要获取标签的参考文本。
  • projection (List[float]): 文本的向量表示,通常是 UMAP step 的输出。
  • cluster_label (int): 表示给定集群标签的整数。-1 表示未聚类。
输出列
  • summary_label (str): 文本的标签或标签列表。
  • model_name (str): 用于生成标签的模型名称。
类别
  • clustering
  • text-classification
参考文献

属性

名称 类型 描述
- savefig

是否生成并保存包含文本聚类的图形。

- samples_per_cluster

在 LLM 中用作集群样本的示例数。

示例

使用聚类为一组文本生成标签

from distilabel.models import InferenceEndpointsLLM
from distilabel.steps import UMAP, DBSCAN, TextClustering
from distilabel.pipeline import Pipeline

ds_name = "argilla-warehouse/personahub-fineweb-edu-4-clustering-100k"

with Pipeline(name="Text clustering dataset") as pipeline:
    batch_size = 500

    ds = load_dataset(ds_name, split="train").select(range(10000))
    loader = make_generator_step(ds, batch_size=batch_size, repo_id=ds_name)

    umap = UMAP(n_components=2, metric="cosine")
    dbscan = DBSCAN(eps=0.3, min_samples=30)

    text_clustering = TextClustering(
        llm=InferenceEndpointsLLM(
            model_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
            tokenizer_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
        ),
        n=3,  # 3 labels per example
        query_title="Examples of Personas",
        samples_per_cluster=10,
        context=(
            "Describe the main themes, topics, or categories that could describe the "
            "following types of personas. All the examples of personas must share "
            "the same set of labels."
        ),
        default_label="None",
        savefig=True,
        input_batch_size=8,
        input_mappings={"text": "persona"},
        use_default_structured_output=True,
    )

    loader >> umap >> dbscan >> text_clustering
源代码位于 src/distilabel/steps/clustering/text_clustering.py
class TextClustering(TextClassification, GlobalTask):
    """Task that clusters a set of texts and generates summary labels for each cluster.

    This is a `GlobalTask` that inherits from `TextClassification`, this means that all
    the attributes from that class are available here. Also, in this case we deal
    with all the inputs at once, instead of using batches. The `input_batch_size` is
    used here to send the examples to the LLM in batches (a subtle difference with the
    more common `Task` definitions).
    The task looks in each cluster for a given number of representative examples (the number
    is set by the `samples_per_cluster` attribute), and sends them to the LLM to get a label/s
    that represent the cluster. The labels are then assigned to each text in the cluster.
    The clusters and projections used in the step, are assumed to be obtained from the `UMAP`
    + `DBSCAN` steps, but could be generated for similar steps, as long as they represent the
    same concepts.
    This step runs a pipeline like the one in this repository:
    https://github.com/huggingface/text-clustering

    Input columns:
        - text (`str`): The reference text we want to obtain labels for.
        - projection (`List[float]`): Vector representation of the text to cluster,
            normally the output from the `UMAP` step.
        - cluster_label (`int`): Integer representing the label of a given cluster. -1
            means it wasn't clustered.

    Output columns:
        - summary_label (`str`): The label or list of labels for the text.
        - model_name (`str`): The name of the model used to generate the label/s.

    Categories:
        - clustering
        - text-classification

    References:
        - [`text-clustering repository`](https://github.com/huggingface/text-clustering)

    Attributes:
        - savefig: Whether to generate and save a figure with the clustering of the texts.
        - samples_per_cluster: The number of examples to use in the LLM as a sample of the cluster.

    Examples:
        Generate labels for a set of texts using clustering:

        ```python
        from distilabel.models import InferenceEndpointsLLM
        from distilabel.steps import UMAP, DBSCAN, TextClustering
        from distilabel.pipeline import Pipeline

        ds_name = "argilla-warehouse/personahub-fineweb-edu-4-clustering-100k"

        with Pipeline(name="Text clustering dataset") as pipeline:
            batch_size = 500

            ds = load_dataset(ds_name, split="train").select(range(10000))
            loader = make_generator_step(ds, batch_size=batch_size, repo_id=ds_name)

            umap = UMAP(n_components=2, metric="cosine")
            dbscan = DBSCAN(eps=0.3, min_samples=30)

            text_clustering = TextClustering(
                llm=InferenceEndpointsLLM(
                    model_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
                    tokenizer_id="meta-llama/Meta-Llama-3.1-70B-Instruct",
                ),
                n=3,  # 3 labels per example
                query_title="Examples of Personas",
                samples_per_cluster=10,
                context=(
                    "Describe the main themes, topics, or categories that could describe the "
                    "following types of personas. All the examples of personas must share "
                    "the same set of labels."
                ),
                default_label="None",
                savefig=True,
                input_batch_size=8,
                input_mappings={"text": "persona"},
                use_default_structured_output=True,
            )

            loader >> umap >> dbscan >> text_clustering
        ```
    """

    savefig: Optional[RuntimeParameter[bool]] = Field(
        default=True,
        description="Whether to generate and save a figure with the clustering of the texts.",
    )
    samples_per_cluster: int = Field(
        default=10,
        description="The number of examples to use in the LLM as a sample of the cluster.",
    )

    @property
    def inputs(self) -> List[str]:
        """The input for the task are the same as those for `TextClassification` plus
        the `projection` and `cluster_label` columns (which can be obtained from
        UMAP + DBSCAN steps).
        """
        return super().inputs + ["projection", "cluster_label"]

    @property
    def outputs(self) -> List[str]:
        """The output for the task is the `summary_label` and the `model_name`."""
        return ["summary_label", "model_name"]

    def load(self) -> None:
        super().load()
        if self.savefig and (importlib.util.find_spec("matplotlib") is None):
            raise ImportError(
                "`matplotlib` package is not installed. Please install it using `pip install matplotlib`."
            )

    def _save_figure(
        self,
        data: pd.DataFrame,
        cluster_centers: Dict[str, Tuple[float, float]],
        cluster_summaries: Dict[int, str],
    ) -> None:
        """Saves the figure starting from the dataframe, using matplotlib.

        Args:
            data: pd.DataFrame with the columns 'X', 'Y' and 'labels' representing
                the projections and the label of each text respectively.
            cluster_centers: Dictionary mapping from each label the center of a cluster,
                to help with the placement of the annotations.
            cluster_summaries: The summaries of the clusters, obtained from the LLM.
        """
        import matplotlib.pyplot as plt

        fig, ax = plt.subplots(figsize=(12, 8), dpi=300)
        unique_labels = data["labels"].unique()
        # Map of colors for each label (-1 is black)
        colormap = dict(
            zip(unique_labels, plt.cm.Spectral(np.linspace(0, 1, len(unique_labels))))
        )
        colormap[-1] = np.array([0, 0, 0, 0])
        data["color"] = data["labels"].map(colormap)

        data.plot(
            kind="scatter",
            x="X",
            y="Y",
            c="color",
            s=0.75,
            alpha=0.8,
            linewidth=0.4,
            ax=ax,
            colorbar=False,
        )

        for label in cluster_summaries.keys():
            if label == -1:
                continue
            summary = str(cluster_summaries[label])  # These are obtained from the LLM
            position = cluster_centers[label]
            t = ax.text(
                position[0],
                position[1],
                summary,
                horizontalalignment="center",
                verticalalignment="center",
                fontsize=4,
            )
            t.set_bbox(
                {
                    "facecolor": "white",
                    "alpha": 0.9,
                    "linewidth": 0,
                    "boxstyle": "square,pad=0.1",
                }
            )

        ax.set_axis_off()
        # Save the plot as an artifact of the step
        self.save_artifact(
            name="Text clusters",
            write_function=lambda path: fig.savefig(path / "figure_clustering.png"),
            metadata={"type": "image", "library": "matplotlib"},
        )
        plt.close()

    def _create_figure(
        self,
        inputs: StepInput,
        label2docs: Dict[int, List[str]],
        cluster_summaries: Dict[int, str],
    ) -> None:
        """Creates a figure of the clustered texts and save it as an artifact.

        Args:
            inputs: The inputs of the step, as we will extract information from them again.
            label2docs: Map from each label to the list of documents (texts) that belong to that cluster.
            cluster_summaries: The summaries of the clusters, obtained from the LLM.
        """
        self._logger.info("🖼️ Creating figure for the clusters...")

        labels = []
        projections = []
        id2cluster = {}
        for i, input in enumerate(inputs):
            label = input["cluster_label"]
            id2cluster[i] = label
            labels.append(label)
            projections.append(input["projection"])

        projections = np.array(projections)

        # Contains the placement of the cluster centers in the figure
        cluster_centers: Dict[str, Tuple[float, float]] = {}
        for label in label2docs.keys():
            x = np.mean([projections[doc, 0] for doc in label2docs[label]])
            y = np.mean([projections[doc, 1] for doc in label2docs[label]])
            cluster_centers[label] = (x, y)

        df = pd.DataFrame(
            data={
                "X": projections[:, 0],
                "Y": projections[:, 1],
                "labels": labels,
            }
        )

        self._save_figure(
            df, cluster_centers=cluster_centers, cluster_summaries=cluster_summaries
        )

    def _prepare_input_texts(
        self,
        inputs: StepInput,
        label2docs: Dict[int, List[int]],
        unique_labels: List[int],
    ) -> List[Dict[str, Union[str, int]]]:
        """Prepares a batch of inputs to send to the LLM, with the examples of each cluster.

        Args:
            inputs: Inputs from the step.
            label2docs: Map from each label to the list of documents (texts) that
                belong to that cluster.
            unique_labels: The unique labels of the clusters.

        Returns:
            The input texts to send to the LLM, with the examples of each cluster
            prepared to be used in the prompt, and an additional key to store the
            labels (that will be needed to find the data after the batches are
            returned from the LLM).
        """
        input_texts = []
        for label in range(unique_labels):  # The label -1 is implicitly excluded
            # Get the ids but remove possible duplicates, which could happen with bigger probability
            # the bigger the number of examples requested, and the smaller the subset of examples
            ids = set(
                np.random.choice(label2docs[label], size=self.samples_per_cluster)
            )  # Grab the number of examples
            examples = [inputs[i]["text"] for i in ids]
            input_text = {
                "text": "\n\n".join(
                    [f"Example {i}:\n{t}" for i, t in enumerate(examples, start=1)]
                ),
                "__LABEL": label,
            }
            input_texts.append(input_text)
        return input_texts

    def process(self, inputs: StepInput) -> "StepOutput":
        labels = [input["cluster_label"] for input in inputs]
        # -1 because -1 is the label for the unclassified
        unique_labels = len(set(labels)) - 1
        # This will be the output of the LLM, the set of labels for each cluster
        cluster_summaries: Dict[int, str] = {-1: self.default_label}

        # Map from label to list of documents, will use them to select examples from each cluster
        label2docs = defaultdict(list)
        for i, label in enumerate(labels):
            label2docs[label].append(i)

        input_texts = self._prepare_input_texts(inputs, label2docs, unique_labels)

        # Send the texts in batches to the LLM, and get the labels for each cluster
        for i, batched_inputs in enumerate(batched(input_texts, self.input_batch_size)):
            self._logger.info(f"📦 Processing internal batch of inputs {i}...")
            results = super().process(batched_inputs)
            for result in next(results):  # Extract the elements from the generator
                cluster_summaries[result["__LABEL"]] = result["labels"]

        # Assign the labels to each text
        for input in inputs:
            input["summary_label"] = json.dumps(
                cluster_summaries[input["cluster_label"]]
            )

        if self.savefig:
            self._create_figure(inputs, label2docs, cluster_summaries)

        yield inputs
inputs property

该任务的输入与 TextClassification 的输入相同,外加 projectioncluster_label 列(可以从 UMAP + DBSCAN steps 获得)。

outputs property

该任务的输出是 summary_labelmodel_name

_save_figure(data, cluster_centers, cluster_summaries)

使用 matplotlib 从 dataframe 开始保存图形。

参数

名称 类型 描述 默认
data DataFrame

pd.DataFrame,其中包含 'X'、'Y' 和 'labels' 列,分别代表 projections 和每个文本的标签。

required
cluster_centers Dict[str, Tuple[float, float]]

从每个标签到集群中心的字典映射,以帮助注释的放置。

required
cluster_summaries Dict[int, str]

从 LLM 获取的集群摘要。

required
源代码位于 src/distilabel/steps/clustering/text_clustering.py
def _save_figure(
    self,
    data: pd.DataFrame,
    cluster_centers: Dict[str, Tuple[float, float]],
    cluster_summaries: Dict[int, str],
) -> None:
    """Saves the figure starting from the dataframe, using matplotlib.

    Args:
        data: pd.DataFrame with the columns 'X', 'Y' and 'labels' representing
            the projections and the label of each text respectively.
        cluster_centers: Dictionary mapping from each label the center of a cluster,
            to help with the placement of the annotations.
        cluster_summaries: The summaries of the clusters, obtained from the LLM.
    """
    import matplotlib.pyplot as plt

    fig, ax = plt.subplots(figsize=(12, 8), dpi=300)
    unique_labels = data["labels"].unique()
    # Map of colors for each label (-1 is black)
    colormap = dict(
        zip(unique_labels, plt.cm.Spectral(np.linspace(0, 1, len(unique_labels))))
    )
    colormap[-1] = np.array([0, 0, 0, 0])
    data["color"] = data["labels"].map(colormap)

    data.plot(
        kind="scatter",
        x="X",
        y="Y",
        c="color",
        s=0.75,
        alpha=0.8,
        linewidth=0.4,
        ax=ax,
        colorbar=False,
    )

    for label in cluster_summaries.keys():
        if label == -1:
            continue
        summary = str(cluster_summaries[label])  # These are obtained from the LLM
        position = cluster_centers[label]
        t = ax.text(
            position[0],
            position[1],
            summary,
            horizontalalignment="center",
            verticalalignment="center",
            fontsize=4,
        )
        t.set_bbox(
            {
                "facecolor": "white",
                "alpha": 0.9,
                "linewidth": 0,
                "boxstyle": "square,pad=0.1",
            }
        )

    ax.set_axis_off()
    # Save the plot as an artifact of the step
    self.save_artifact(
        name="Text clusters",
        write_function=lambda path: fig.savefig(path / "figure_clustering.png"),
        metadata={"type": "image", "library": "matplotlib"},
    )
    plt.close()
_create_figure(inputs, label2docs, cluster_summaries)

创建聚类文本的图形并将其另存为工件。

参数

名称 类型 描述 默认
inputs StepInput

step 的输入,因为我们将再次从中提取信息。

required
label2docs Dict[int, List[str]]

从每个标签到属于该集群的文档(文本)列表的映射。

required
cluster_summaries Dict[int, str]

从 LLM 获取的集群摘要。

required
源代码位于 src/distilabel/steps/clustering/text_clustering.py
def _create_figure(
    self,
    inputs: StepInput,
    label2docs: Dict[int, List[str]],
    cluster_summaries: Dict[int, str],
) -> None:
    """Creates a figure of the clustered texts and save it as an artifact.

    Args:
        inputs: The inputs of the step, as we will extract information from them again.
        label2docs: Map from each label to the list of documents (texts) that belong to that cluster.
        cluster_summaries: The summaries of the clusters, obtained from the LLM.
    """
    self._logger.info("🖼️ Creating figure for the clusters...")

    labels = []
    projections = []
    id2cluster = {}
    for i, input in enumerate(inputs):
        label = input["cluster_label"]
        id2cluster[i] = label
        labels.append(label)
        projections.append(input["projection"])

    projections = np.array(projections)

    # Contains the placement of the cluster centers in the figure
    cluster_centers: Dict[str, Tuple[float, float]] = {}
    for label in label2docs.keys():
        x = np.mean([projections[doc, 0] for doc in label2docs[label]])
        y = np.mean([projections[doc, 1] for doc in label2docs[label]])
        cluster_centers[label] = (x, y)

    df = pd.DataFrame(
        data={
            "X": projections[:, 0],
            "Y": projections[:, 1],
            "labels": labels,
        }
    )

    self._save_figure(
        df, cluster_centers=cluster_centers, cluster_summaries=cluster_summaries
    )
_prepare_input_texts(inputs, label2docs, unique_labels)

准备一批输入以发送到 LLM,其中包含每个集群的示例。

参数

名称 类型 描述 默认
inputs StepInput

来自 step 的输入。

required
label2docs Dict[int, List[int]]

从每个标签到属于该集群的文档(文本)列表的映射。

required
unique_labels List[int]

集群的唯一标签。

required

返回

类型 描述
List[Dict[str, Union[str, int]]]

要发送到 LLM 的输入文本,其中包含每个集群的示例

List[Dict[str, Union[str, int]]]

准备在 prompt 中使用,以及一个额外的键来存储

List[Dict[str, Union[str, int]]]

标签(在批次返回后需要找到数据)。

List[Dict[str, Union[str, int]]]

从 LLM 返回)。

源代码位于 src/distilabel/steps/clustering/text_clustering.py
def _prepare_input_texts(
    self,
    inputs: StepInput,
    label2docs: Dict[int, List[int]],
    unique_labels: List[int],
) -> List[Dict[str, Union[str, int]]]:
    """Prepares a batch of inputs to send to the LLM, with the examples of each cluster.

    Args:
        inputs: Inputs from the step.
        label2docs: Map from each label to the list of documents (texts) that
            belong to that cluster.
        unique_labels: The unique labels of the clusters.

    Returns:
        The input texts to send to the LLM, with the examples of each cluster
        prepared to be used in the prompt, and an additional key to store the
        labels (that will be needed to find the data after the batches are
        returned from the LLM).
    """
    input_texts = []
    for label in range(unique_labels):  # The label -1 is implicitly excluded
        # Get the ids but remove possible duplicates, which could happen with bigger probability
        # the bigger the number of examples requested, and the smaller the subset of examples
        ids = set(
            np.random.choice(label2docs[label], size=self.samples_per_cluster)
        )  # Grab the number of examples
        examples = [inputs[i]["text"] for i in ids]
        input_text = {
            "text": "\n\n".join(
                [f"Example {i}:\n{t}" for i, t in enumerate(examples, start=1)]
            ),
            "__LABEL": label,
        }
        input_texts.append(input_text)
    return input_texts

UMAP

Bases: GlobalStep

UMAP 是一种通用的流形学习和降维算法。

这是一个 GlobalStep,它使用 embeddings 降低维度。 访问 TextClustering step 以获取使用示例。 训练后的模型在创建 distiset 并将其推送到 Hugging Face Hub 时保存为工件。

输入列
  • embedding (List[float]): 我们要降低维度的原始 embeddings。
输出列
  • projection (List[float]): 降低到指定组件数的 Embedding,新 embeddings 的大小将由 n_components 确定。
类别
  • clustering
  • text-classification
参考文献

属性

名称 类型 描述
- n_components

要嵌入的空间的维度。 默认为 2 以提供简单的可视化(这可能是您想要的),但可以合理地设置为 2 到 100 范围内的任何整数值。

- metric

用于计算高维空间距离的度量。 有关更多信息,请访问 UMAP 的文档。 默认为 euclidean

- n_jobs

要运行的并行作业数。 默认为 8

- random_state

用于 UMAP 算法的随机状态。

运行时参数
  • n_components: 要嵌入的空间的维度。 默认为 2 以提供简单的可视化(这可能是您想要的),但可以合理地设置为 2 到 100 范围内的任何整数值。
  • metric: 用于计算高维空间距离的度量。 有关更多信息,请访问 UMAP 的文档。 默认为 euclidean
  • n_jobs: 要运行的并行作业数。 默认为 8
  • random_state: 用于 UMAP 算法的随机状态。
引文
@misc{mcinnes2020umapuniformmanifoldapproximation,
    title={UMAP: Uniform Manifold Approximation and Projection for Dimension Reduction},
    author={Leland McInnes and John Healy and James Melville},
    year={2020},
    eprint={1802.03426},
    archivePrefix={arXiv},
    primaryClass={stat.ML},
    url={https://arxiv.org/abs/1802.03426},
}
源代码位于 src/distilabel/steps/clustering/umap.py
class UMAP(GlobalStep):
    r"""UMAP is a general purpose manifold learning and dimension reduction algorithm.

    This is a `GlobalStep` that reduces the dimensionality of the embeddings using. Visit
    the `TextClustering` step for an example of use. The trained model is saved as an artifact
    when creating a distiset and pushing it to the Hugging Face Hub.

    Input columns:
        - embedding (`List[float]`): The original embeddings we want to reduce the dimension.

    Output columns:
        - projection (`List[float]`): Embedding reduced to the number of components specified,
            the size of the new embeddings will be determined by the `n_components`.

    Categories:
        - clustering
        - text-classification

    References:
        - [`UMAP repository`](https://github.com/lmcinnes/umap/tree/master)
        - [`UMAP documentation`](https://umap-learn.readthedocs.io/en/latest/)

    Attributes:
        - n_components: The dimension of the space to embed into. This defaults to 2 to
            provide easy visualization (that's probably what you want), but can
            reasonably be set to any integer value in the range 2 to 100.
        - metric: The metric to use to compute distances in high dimensional space.
            Visit UMAP's documentation for more information. Defaults to `euclidean`.
        - n_jobs: The number of parallel jobs to run. Defaults to `8`.
        - random_state: The random state to use for the UMAP algorithm.

    Runtime parameters:
        - `n_components`: The dimension of the space to embed into. This defaults to 2 to
            provide easy visualization (that's probably what you want), but can
            reasonably be set to any integer value in the range 2 to 100.
        - `metric`: The metric to use to compute distances in high dimensional space.
            Visit UMAP's documentation for more information. Defaults to `euclidean`.
        - `n_jobs`: The number of parallel jobs to run. Defaults to `8`.
        - `random_state`: The random state to use for the UMAP algorithm.

    Citations:
        ```
        @misc{mcinnes2020umapuniformmanifoldapproximation,
            title={UMAP: Uniform Manifold Approximation and Projection for Dimension Reduction},
            author={Leland McInnes and John Healy and James Melville},
            year={2020},
            eprint={1802.03426},
            archivePrefix={arXiv},
            primaryClass={stat.ML},
            url={https://arxiv.org/abs/1802.03426},
        }
        ```
    """

    n_components: Optional[RuntimeParameter[int]] = Field(
        default=2,
        description=(
            "The dimension of the space to embed into. This defaults to 2 to "
            "provide easy visualization, but can reasonably be set to any "
            "integer value in the range 2 to 100."
        ),
    )
    metric: Optional[RuntimeParameter[str]] = Field(
        default="euclidean",
        description=(
            "The metric to use to compute distances in high dimensional space. "
            "Visit UMAP's documentation for more information."
        ),
    )
    n_jobs: Optional[RuntimeParameter[int]] = Field(
        default=8, description="The number of parallel jobs to run."
    )
    random_state: Optional[RuntimeParameter[int]] = Field(
        default=None, description="The random state to use for the UMAP algorithm."
    )

    _umap: Optional["_UMAP"] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if importlib.util.find_spec("umap") is None:
            raise ImportError(
                "`umap` package is not installed. Please install it using `pip install 'distilabel[text-clustering]'`."
            )
        from umap import UMAP as _UMAP

        self._umap = _UMAP(
            n_components=self.n_components,
            metric=self.metric,
            n_jobs=self.n_jobs,
            random_state=self.random_state,
        )

    def unload(self) -> None:
        self._umap = None

    @property
    def inputs(self) -> List[str]:
        return ["embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["projection"]

    def _save_model(self, model: Any) -> None:
        import joblib

        def save_model(path):
            with open(str(path / "UMAP.joblib"), "wb") as f:
                joblib.dump(model, f)

        self.save_artifact(
            name="UMAP_model",
            write_function=lambda path: save_model(path),
            metadata={
                "n_components": self.n_components,
                "metric": self.metric,
            },
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        # Shape of the embeddings is (n_samples, n_features)
        embeddings = np.array([input["embedding"] for input in inputs])

        self._logger.info("🏋️‍♀️ Start UMAP training...")
        mapper = self._umap.fit(embeddings)
        # Shape of the projection will be (n_samples, n_components)
        for input, projection in zip(inputs, mapper.embedding_):
            input["projection"] = projection

        self._save_model(mapper)
        yield inputs

CombineOutputs

Bases: Step

组合多个上游 steps 的输出。

CombineOutputs 是一个 Step,它获取多个上游 steps 的输出并将它们组合起来,生成一个包含上游 steps 输出的所有键/列的新字典。

输入列
  • 动态的(基于上游 Steps):上游 steps 输出的所有列。
输出列
  • 动态的(基于上游 Steps):上游 steps 输出的所有列。
类别
  • columns

示例

Combine dictionaries of a dataset:

```python
from distilabel.steps import CombineOutputs

combine_outputs = CombineOutputs()
combine_outputs.load()

result = next(
    combine_outputs.process(
        [{"a": 1, "b": 2}, {"a": 3, "b": 4}],
        [{"c": 5, "d": 6}, {"c": 7, "d": 8}],
    )
)
# [
#   {"a": 1, "b": 2, "c": 5, "d": 6},
#   {"a": 3, "b": 4, "c": 7, "d": 8},
# ]
```

Combine upstream steps outputs in a pipeline:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs

with Pipeline() as pipeline:
    step_1 = ...
    step_2 = ...
    step_3 = ...
    combine = CombineOutputs()

    [step_1, step_2, step_3] >> combine
```
源代码位于 src/distilabel/steps/columns/combine.py
class CombineOutputs(Step):
    """Combine the outputs of several upstream steps.

    `CombineOutputs` is a `Step` that takes the outputs of several upstream steps and combines
    them to generate a new dictionary with all keys/columns of the upstream steps outputs.

    Input columns:
        - dynamic (based on the upstream `Step`s): All the columns of the upstream steps outputs.

    Output columns:
        - dynamic (based on the upstream `Step`s): All the columns of the upstream steps outputs.

    Categories:
        - columns

    Examples:

        Combine dictionaries of a dataset:

        ```python
        from distilabel.steps import CombineOutputs

        combine_outputs = CombineOutputs()
        combine_outputs.load()

        result = next(
            combine_outputs.process(
                [{"a": 1, "b": 2}, {"a": 3, "b": 4}],
                [{"c": 5, "d": 6}, {"c": 7, "d": 8}],
            )
        )
        # [
        #   {"a": 1, "b": 2, "c": 5, "d": 6},
        #   {"a": 3, "b": 4, "c": 7, "d": 8},
        # ]
        ```

        Combine upstream steps outputs in a pipeline:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import CombineOutputs

        with Pipeline() as pipeline:
            step_1 = ...
            step_2 = ...
            step_3 = ...
            combine = CombineOutputs()

            [step_1, step_2, step_3] >> combine
        ```
    """

    def process(self, *inputs: StepInput) -> "StepOutput":
        combined_outputs = []
        for output_dicts in zip(*inputs):
            combined_dict = {}
            for output_dict in output_dicts:
                combined_dict.update(
                    {
                        k: v
                        for k, v in output_dict.items()
                        if k != DISTILABEL_METADATA_KEY
                    }
                )

            if any(
                DISTILABEL_METADATA_KEY in output_dict for output_dict in output_dicts
            ):
                combined_dict[DISTILABEL_METADATA_KEY] = merge_distilabel_metadata(
                    *output_dicts
                )
            combined_outputs.append(combined_dict)

        yield combined_outputs

DeitaFiltering

Bases: GlobalStep

使用 DEITA 过滤策略过滤数据集行。

根据 DEITA 分数和 embeddings 之间的余弦距离过滤数据集。 它是论文“What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning”中过滤步骤的实现。

属性

名称 类型 描述
data_budget RuntimeParameter[int]

过滤后数据集的期望大小。

diversity_threshold RuntimeParameter[float]

如果某行与其最近邻居的余弦距离大于此值,则会将其包含在过滤后的数据集中。 默认为 0.9

normalize_embeddings RuntimeParameter[bool]

是否在计算余弦距离之前对 embeddings 进行归一化。 默认为 True

运行时参数
  • data_budget: 过滤后数据集的期望大小。
  • diversity_threshold: 如果某行与其最近邻居的余弦距离大于此值,则会将其包含在过滤后的数据集中。
输入列
  • evol_instruction_score (float): 由 ComplexityScorer step 生成的 instruction 的分数。
  • evol_response_score (float): 由 QualityScorer step 生成的 response 的分数。
  • embedding (List[float]): 使用 GenerateEmbeddings step 为 instruction-response 对的 conversation 生成的 embedding。
输出列
  • deita_score (float): instruction-response 对的 DEITA 分数。
  • deita_score_computed_with (List[str]): 用于计算 DEITA 分数的分数。
  • nearest_neighbor_distance (float): instruction-response 对的 embeddings 之间的余弦距离。
类别
  • filtering
参考文献

示例

根据 DEITA 分数和 embeddings 之间的余弦距离过滤数据集

from distilabel.steps import DeitaFiltering

deita_filtering = DeitaFiltering(data_budget=1)

deita_filtering.load()

result = next(
    deita_filtering.process(
        [
            {
                "evol_instruction_score": 0.5,
                "evol_response_score": 0.5,
                "embedding": [-8.12729941, -5.24642847, -6.34003029],
            },
            {
                "evol_instruction_score": 0.6,
                "evol_response_score": 0.6,
                "embedding": [2.99329242, 0.7800932, 0.7799726],
            },
            {
                "evol_instruction_score": 0.7,
                "evol_response_score": 0.7,
                "embedding": [10.29041806, 14.33088073, 13.00557506],
            },
        ],
    )
)
# >>> result
# [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
引文
@misc{liu2024makesgooddataalignment,
    title={What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning},
    author={Wei Liu and Weihao Zeng and Keqing He and Yong Jiang and Junxian He},
    year={2024},
    eprint={2312.15685},
    archivePrefix={arXiv},
    primaryClass={cs.CL},
    url={https://arxiv.org/abs/2312.15685},
}
源代码位于 src/distilabel/steps/deita.py
class DeitaFiltering(GlobalStep):
    """Filter dataset rows using DEITA filtering strategy.

    Filter the dataset based on the DEITA score and the cosine distance between the embeddings.
    It's an implementation of the filtering step from the paper 'What Makes Good Data
    for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning'.

    Attributes:
        data_budget: The desired size of the dataset after filtering.
        diversity_threshold: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.
            Defaults to `0.9`.
        normalize_embeddings: Whether to normalize the embeddings before computing the cosine
            distance. Defaults to `True`.

    Runtime parameters:
        - `data_budget`: The desired size of the dataset after filtering.
        - `diversity_threshold`: If a row has a cosine distance with respect to it's nearest
            neighbor greater than this value, it will be included in the filtered dataset.

    Input columns:
        - evol_instruction_score (`float`): The score of the instruction generated by
            `ComplexityScorer` step.
        - evol_response_score (`float`): The score of the response generated by
            `QualityScorer` step.
        - embedding (`List[float]`): The embedding generated for the conversation of the
            instruction-response pair using `GenerateEmbeddings` step.

    Output columns:
        - deita_score (`float`): The DEITA score for the instruction-response pair.
        - deita_score_computed_with (`List[str]`): The scores used to compute the DEITA
            score.
        - nearest_neighbor_distance (`float`): The cosine distance between the embeddings
            of the instruction-response pair.

    Categories:
        - filtering

    References:
        - [`What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning`](https://arxiv.org/abs/2312.15685)

    Examples:
        Filter the dataset based on the DEITA score and the cosine distance between the embeddings:

        ```python
        from distilabel.steps import DeitaFiltering

        deita_filtering = DeitaFiltering(data_budget=1)

        deita_filtering.load()

        result = next(
            deita_filtering.process(
                [
                    {
                        "evol_instruction_score": 0.5,
                        "evol_response_score": 0.5,
                        "embedding": [-8.12729941, -5.24642847, -6.34003029],
                    },
                    {
                        "evol_instruction_score": 0.6,
                        "evol_response_score": 0.6,
                        "embedding": [2.99329242, 0.7800932, 0.7799726],
                    },
                    {
                        "evol_instruction_score": 0.7,
                        "evol_response_score": 0.7,
                        "embedding": [10.29041806, 14.33088073, 13.00557506],
                    },
                ],
            )
        )
        # >>> result
        # [{'evol_instruction_score': 0.5, 'evol_response_score': 0.5, 'embedding': [-8.12729941, -5.24642847, -6.34003029], 'deita_score': 0.25, 'deita_score_computed_with': ['evol_instruction_score', 'evol_response_score'], 'nearest_neighbor_distance': 1.9042812683723933}]
        ```

    Citations:
        ```
        @misc{liu2024makesgooddataalignment,
            title={What Makes Good Data for Alignment? A Comprehensive Study of Automatic Data Selection in Instruction Tuning},
            author={Wei Liu and Weihao Zeng and Keqing He and Yong Jiang and Junxian He},
            year={2024},
            eprint={2312.15685},
            archivePrefix={arXiv},
            primaryClass={cs.CL},
            url={https://arxiv.org/abs/2312.15685},
        }
        ```
    """

    data_budget: RuntimeParameter[int] = Field(
        default=None, description="The desired size of the dataset after filtering."
    )
    diversity_threshold: RuntimeParameter[float] = Field(
        default=0.9,
        description="If a row has a cosine distance with respect to it's nearest neighbor"
        " greater than this value, it will be included in the filtered dataset.",
    )
    normalize_embeddings: RuntimeParameter[bool] = Field(
        default=True,
        description="Whether to normalize the embeddings before computing the cosine distance.",
    )
    distance_metric: RuntimeParameter[Literal["cosine", "manhattan"]] = Field(
        default="cosine",
        description="The distance metric to use. Currently only 'cosine' is supported.",
    )

    @property
    def inputs(self) -> List[str]:
        return ["evol_instruction_score", "evol_response_score", "embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["deita_score", "nearest_neighbor_distance", "deita_score_computed_with"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Filter the dataset based on the DEITA score and the cosine distance between the
        embeddings.

        Args:
            inputs: The input data.

        Returns:
            The filtered dataset.
        """
        inputs = self._compute_deita_score(inputs)
        inputs = self._compute_nearest_neighbor(inputs)
        inputs.sort(key=lambda x: x["deita_score"], reverse=True)

        selected_rows = []
        for input in inputs:
            if len(selected_rows) >= self.data_budget:  # type: ignore
                break
            if input["nearest_neighbor_distance"] >= self.diversity_threshold:
                selected_rows.append(input)
        yield selected_rows

    def _compute_deita_score(self, inputs: StepInput) -> StepInput:
        """Computes the DEITA score for each instruction-response pair. The DEITA score is
        the product of the instruction score and the response score.

        Args:
            inputs: The input data.

        Returns:
            The input data with the DEITA score computed.
        """
        for input_ in inputs:
            evol_instruction_score = input_.get("evol_instruction_score")
            evol_response_score = input_.get("evol_response_score")

            if evol_instruction_score and evol_response_score:
                deita_score = evol_instruction_score * evol_response_score
                score_computed_with = ["evol_instruction_score", "evol_response_score"]
            elif evol_instruction_score:
                self._logger.warning(
                    "Response score is missing for the instruction-response pair. Using"
                    " instruction score as DEITA score."
                )
                deita_score = evol_instruction_score
                score_computed_with = ["evol_instruction_score"]
            elif evol_response_score:
                self._logger.warning(
                    "Instruction score is missing for the instruction-response pair. Using"
                    " response score as DEITA score."
                )
                deita_score = evol_response_score
                score_computed_with = ["evol_response_score"]
            else:
                self._logger.warning(
                    "Instruction and response scores are missing for the instruction-response"
                    " pair. Setting DEITA score to 0."
                )
                deita_score = 0
                score_computed_with = []

            input_.update(
                {
                    "deita_score": deita_score,
                    "deita_score_computed_with": score_computed_with,
                }
            )
        return inputs

    def _compute_nearest_neighbor(self, inputs: StepInput) -> StepInput:
        """Computes the cosine distance between the embeddings of the instruction-response
        pairs and the nearest neighbor.

        Args:
            inputs: The input data.

        Returns:
            The input data with the cosine distance computed.
        """
        embeddings = np.array([input["embedding"] for input in inputs])
        if self.normalize_embeddings:
            embeddings = self._normalize_embeddings(embeddings)
        self._logger.info("📏 Computing nearest neighbor distance...")

        if self.distance_metric == "cosine":
            self._logger.info("📏 Using cosine distance.")
            distances = self._cosine_distance(embeddings)
        else:
            self._logger.info("📏 Using manhattan distance.")
            distances = self._manhattan_distance(embeddings)

        for distance, input in zip(distances, inputs):
            input["nearest_neighbor_distance"] = distance
        return inputs

    def _normalize_embeddings(self, embeddings: np.ndarray) -> np.ndarray:
        """Normalize the embeddings.

        Args:
            embeddings: The embeddings to normalize.

        Returns:
            The normalized embeddings.
        """
        self._logger.info("⚖️ Normalizing embeddings...")
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / norms

    def _cosine_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the cosine distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The cosine distance between the embeddings.
        """
        cosine_similarity = np.dot(embeddings, embeddings.T)
        cosine_distance = 1 - cosine_similarity
        # Ignore self-distance
        np.fill_diagonal(cosine_distance, np.inf)
        return np.min(cosine_distance, axis=1)

    def _manhattan_distance(self, embeddings: np.array) -> np.array:  # type: ignore
        """Computes the manhattan distance between the embeddings.

        Args:
            embeddings: The embeddings.

        Returns:
            The manhattan distance between the embeddings.
        """
        manhattan_distance = np.abs(embeddings[:, None] - embeddings).sum(-1)
        # Ignore self-distance
        np.fill_diagonal(manhattan_distance, np.inf)
        return np.min(manhattan_distance, axis=1)
process(inputs)

根据 DEITA 分数和 embeddings 之间的余弦距离过滤数据集。

参数

名称 类型 描述 默认
inputs StepInput

输入数据。

required

返回

类型 描述
StepOutput

过滤后的数据集。

源代码位于 src/distilabel/steps/deita.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Filter the dataset based on the DEITA score and the cosine distance between the
    embeddings.

    Args:
        inputs: The input data.

    Returns:
        The filtered dataset.
    """
    inputs = self._compute_deita_score(inputs)
    inputs = self._compute_nearest_neighbor(inputs)
    inputs.sort(key=lambda x: x["deita_score"], reverse=True)

    selected_rows = []
    for input in inputs:
        if len(selected_rows) >= self.data_budget:  # type: ignore
            break
        if input["nearest_neighbor_distance"] >= self.diversity_threshold:
            selected_rows.append(input)
    yield selected_rows
_compute_deita_score(inputs)

计算每个 instruction-response 对的 DEITA 分数。 DEITA 分数是 instruction 分数和 response 分数的乘积。

参数

名称 类型 描述 默认
inputs StepInput

输入数据。

required

返回

类型 描述
StepInput

计算了 DEITA 分数的输入数据。

源代码位于 src/distilabel/steps/deita.py
def _compute_deita_score(self, inputs: StepInput) -> StepInput:
    """Computes the DEITA score for each instruction-response pair. The DEITA score is
    the product of the instruction score and the response score.

    Args:
        inputs: The input data.

    Returns:
        The input data with the DEITA score computed.
    """
    for input_ in inputs:
        evol_instruction_score = input_.get("evol_instruction_score")
        evol_response_score = input_.get("evol_response_score")

        if evol_instruction_score and evol_response_score:
            deita_score = evol_instruction_score * evol_response_score
            score_computed_with = ["evol_instruction_score", "evol_response_score"]
        elif evol_instruction_score:
            self._logger.warning(
                "Response score is missing for the instruction-response pair. Using"
                " instruction score as DEITA score."
            )
            deita_score = evol_instruction_score
            score_computed_with = ["evol_instruction_score"]
        elif evol_response_score:
            self._logger.warning(
                "Instruction score is missing for the instruction-response pair. Using"
                " response score as DEITA score."
            )
            deita_score = evol_response_score
            score_computed_with = ["evol_response_score"]
        else:
            self._logger.warning(
                "Instruction and response scores are missing for the instruction-response"
                " pair. Setting DEITA score to 0."
            )
            deita_score = 0
            score_computed_with = []

        input_.update(
            {
                "deita_score": deita_score,
                "deita_score_computed_with": score_computed_with,
            }
        )
    return inputs
_compute_nearest_neighbor(inputs)

计算 instruction-response 对的 embeddings 与最近邻居之间的余弦距离。

参数

名称 类型 描述 默认
inputs StepInput

输入数据。

required

返回

类型 描述
StepInput

计算了余弦距离的输入数据。

源代码位于 src/distilabel/steps/deita.py
def _compute_nearest_neighbor(self, inputs: StepInput) -> StepInput:
    """Computes the cosine distance between the embeddings of the instruction-response
    pairs and the nearest neighbor.

    Args:
        inputs: The input data.

    Returns:
        The input data with the cosine distance computed.
    """
    embeddings = np.array([input["embedding"] for input in inputs])
    if self.normalize_embeddings:
        embeddings = self._normalize_embeddings(embeddings)
    self._logger.info("📏 Computing nearest neighbor distance...")

    if self.distance_metric == "cosine":
        self._logger.info("📏 Using cosine distance.")
        distances = self._cosine_distance(embeddings)
    else:
        self._logger.info("📏 Using manhattan distance.")
        distances = self._manhattan_distance(embeddings)

    for distance, input in zip(distances, inputs):
        input["nearest_neighbor_distance"] = distance
    return inputs
_normalize_embeddings(embeddings)

归一化 embeddings。

参数

名称 类型 描述 默认
embeddings ndarray

要归一化的 embeddings。

required

返回

类型 描述
ndarray

归一化后的 embeddings。

源代码位于 src/distilabel/steps/deita.py
def _normalize_embeddings(self, embeddings: np.ndarray) -> np.ndarray:
    """Normalize the embeddings.

    Args:
        embeddings: The embeddings to normalize.

    Returns:
        The normalized embeddings.
    """
    self._logger.info("⚖️ Normalizing embeddings...")
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    return embeddings / norms
_cosine_distance(embeddings)

计算 embeddings 之间的余弦距离。

参数

名称 类型 描述 默认
embeddings array

embeddings。

required

返回

类型 描述
array

embeddings 之间的余弦距离。

源代码位于 src/distilabel/steps/deita.py
def _cosine_distance(self, embeddings: np.array) -> np.array:  # type: ignore
    """Computes the cosine distance between the embeddings.

    Args:
        embeddings: The embeddings.

    Returns:
        The cosine distance between the embeddings.
    """
    cosine_similarity = np.dot(embeddings, embeddings.T)
    cosine_distance = 1 - cosine_similarity
    # Ignore self-distance
    np.fill_diagonal(cosine_distance, np.inf)
    return np.min(cosine_distance, axis=1)
_manhattan_distance(embeddings)

计算 embeddings 之间的曼哈顿距离。

参数

名称 类型 描述 默认
embeddings array

embeddings。

required

返回

类型 描述
array

embeddings 之间的曼哈顿距离。

源代码位于 src/distilabel/steps/deita.py
def _manhattan_distance(self, embeddings: np.array) -> np.array:  # type: ignore
    """Computes the manhattan distance between the embeddings.

    Args:
        embeddings: The embeddings.

    Returns:
        The manhattan distance between the embeddings.
    """
    manhattan_distance = np.abs(embeddings[:, None] - embeddings).sum(-1)
    # Ignore self-distance
    np.fill_diagonal(manhattan_distance, np.inf)
    return np.min(manhattan_distance, axis=1)

EmbeddingGeneration

Bases: Step

使用 Embeddings 模型生成 embeddings。

EmbeddingGeneration 是一个 Step,它使用 Embeddings 模型为提供的输入文本生成句子 embeddings。

属性

名称 类型 描述
embeddings Embeddings

用于生成句子 embeddings 的 Embeddings 模型。

输入列
  • text (str): 要为其生成句子 embedding 的文本。
输出列
  • embedding (List[Union[float, int]]): 生成的句子 embedding。
类别
  • embedding

示例

使用 Sentence Transformers 生成句子 embeddings

from distilabel.models import SentenceTransformerEmbeddings
from distilabel.steps import EmbeddingGeneration

embedding_generation = EmbeddingGeneration(
    embeddings=SentenceTransformerEmbeddings(
        model="mixedbread-ai/mxbai-embed-large-v1",
    )
)

embedding_generation.load()

result = next(embedding_generation.process([{"text": "Hello, how are you?"}]))
# [{'text': 'Hello, how are you?', 'embedding': [0.06209656596183777, -0.015797119587659836, ...]}]
源代码位于 src/distilabel/steps/embeddings/embedding_generation.py
class EmbeddingGeneration(Step):
    """Generate embeddings using an `Embeddings` model.

    `EmbeddingGeneration` is a `Step` that using an `Embeddings` model generates sentence
    embeddings for the provided input texts.

    Attributes:
        embeddings: the `Embeddings` model used to generate the sentence embeddings.

    Input columns:
        - text (`str`): The text for which the sentence embedding has to be generated.

    Output columns:
        - embedding (`List[Union[float, int]]`): the generated sentence embedding.

    Categories:
        - embedding

    Examples:
        Generate sentence embeddings with Sentence Transformers:

        ```python
        from distilabel.models import SentenceTransformerEmbeddings
        from distilabel.steps import EmbeddingGeneration

        embedding_generation = EmbeddingGeneration(
            embeddings=SentenceTransformerEmbeddings(
                model="mixedbread-ai/mxbai-embed-large-v1",
            )
        )

        embedding_generation.load()

        result = next(embedding_generation.process([{"text": "Hello, how are you?"}]))
        # [{'text': 'Hello, how are you?', 'embedding': [0.06209656596183777, -0.015797119587659836, ...]}]
        ```

    """

    embeddings: Embeddings

    @property
    def inputs(self) -> "StepColumns":
        return ["text"]

    @property
    def outputs(self) -> "StepColumns":
        return ["embedding", "model_name"]

    def load(self) -> None:
        """Loads the `Embeddings` model."""
        super().load()

        self.embeddings.load()

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        embeddings = self.embeddings.encode(inputs=[input["text"] for input in inputs])
        for input, embedding in zip(inputs, embeddings):
            input["embedding"] = embedding
            input["model_name"] = self.embeddings.model_name
        yield inputs

    def unload(self) -> None:
        super().unload()
        self.embeddings.unload()
load()

加载 Embeddings 模型。

源代码位于 src/distilabel/steps/embeddings/embedding_generation.py
def load(self) -> None:
    """Loads the `Embeddings` model."""
    super().load()

    self.embeddings.load()

FaissNearestNeighbour

Bases: GlobalStep

创建 faiss 索引以获取最近邻居。

FaissNearestNeighbour 是一个 GlobalStep,它使用 Hugging Face datasets 库集成创建 faiss 索引,然后获取最近邻居以及每个输入行的最近邻居的分数或距离。

属性

名称 类型 描述
device Optional[RuntimeParameter[Union[int, List[int]]]]

要使用的 CUDA 设备 ID 或 ID 列表。 如果为负整数,它将使用所有可用的 GPU。 默认为 None

string_factory Optional[RuntimeParameter[str]]

用于构建 faiss 索引的工厂名称。 可用的字符串工厂可以在此处查看:https://github.com/facebookresearch/faiss/wiki/Faiss-indexes。 默认为 None

metric_type Optional[RuntimeParameter[int]]

用于测量点之间距离的度量。 这是一个整数,推荐的传递方式是导入 faiss,然后传递 faiss.METRIC_x 变量之一。 默认为 None

k Optional[RuntimeParameter[int]]

要为每个输入行搜索的最近邻居的数量。 默认为 1

search_batch_size Optional[RuntimeParameter[int]]

搜索批次中包含的行数。 可以调整该值以最大化资源使用率或避免 OOM 问题。 默认为 50

train_size Optional[RuntimeParameter[int]]

如果索引需要训练步骤,则指定将使用多少向量来训练索引。

运行时参数
  • device: 要使用的 CUDA 设备 ID 或 ID 列表。 如果为负整数,它将使用所有可用的 GPU。 默认为 None
  • string_factory: 用于构建 faiss 索引的工厂名称。 可用的字符串工厂可以在此处查看:https://github.com/facebookresearch/faiss/wiki/Faiss-indexes。 默认为 None
  • metric_type: 用于测量点之间距离的度量。 这是一个整数,推荐的传递方式是导入 faiss,然后传递 faiss.METRIC_x 变量之一。 默认为 None
  • k: 要为每个输入行搜索的最近邻居的数量。 默认为 1
  • search_batch_size: 搜索批次中包含的行数。 可以调整该值以最大化资源使用率或避免 OOM 问题。 默认为 50
  • train_size: 如果索引需要训练步骤,则指定将使用多少向量来训练索引。
输入列
  • embedding (List[Union[float, int]]): 句子 embedding。
输出列
  • nn_indices (List[int]): 包含行输入中 k 个最近邻居索引的列表。
  • nn_scores (List[float]): 包含到输入中每个 k 个最近邻居的分数或距离的列表。
类别
  • embedding
参考文献

示例

生成 embeddings 并获取最近邻居

from distilabel.models import SentenceTransformerEmbeddings
from distilabel.pipeline import Pipeline
from distilabel.steps import EmbeddingGeneration, FaissNearestNeighbour, LoadDataFromHub

with Pipeline(name="hello") as pipeline:
    load_data = LoadDataFromHub(output_mappings={"prompt": "text"})

    embeddings = EmbeddingGeneration(
        embeddings=SentenceTransformerEmbeddings(
            model="mixedbread-ai/mxbai-embed-large-v1"
        )
    )

    nearest_neighbours = FaissNearestNeighbour()

    load_data >> embeddings >> nearest_neighbours

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            load_data.name: {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
        },
        use_cache=False,
    )
引文
@misc{douze2024faisslibrary,
    title={The Faiss library},
    author={Matthijs Douze and Alexandr Guzhva and Chengqi Deng and Jeff Johnson and Gergely Szilvasy and Pierre-Emmanuel Mazaré and Maria Lomeli and Lucas Hosseini and Hervé Jégou},
    year={2024},
    eprint={2401.08281},
    archivePrefix={arXiv},
    primaryClass={cs.LG},
    url={https://arxiv.org/abs/2401.08281},
}
源代码位于 src/distilabel/steps/embeddings/nearest_neighbour.py
class FaissNearestNeighbour(GlobalStep):
    """Create a `faiss` index to get the nearest neighbours.

    `FaissNearestNeighbour` is a `GlobalStep` that creates a `faiss` index using the Hugging
    Face `datasets` library integration, and then gets the nearest neighbours and the scores
    or distance of the nearest neighbours for each input row.

    Attributes:
        device: the CUDA device ID or a list of IDs to be used. If negative integer, it
            will use all the available GPUs. Defaults to `None`.
        string_factory: the name of the factory to be used to build the `faiss` index.
            Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.
            Defaults to `None`.
        metric_type: the metric to be used to measure the distance between the points. It's
            an integer and the recommend way to pass it is importing `faiss` and then passing
            one of `faiss.METRIC_x` variables. Defaults to `None`.
        k: the number of nearest neighbours to search for each input row. Defaults to `1`.
        search_batch_size: the number of rows to include in a search batch. The value can
            be adjusted to maximize the resources usage or to avoid OOM issues. Defaults
            to `50`.
        train_size: If the index needs a training step, specifies how many vectors will be
            used to train the index.

    Runtime parameters:
        - `device`: the CUDA device ID or a list of IDs to be used. If negative integer,
            it will use all the available GPUs. Defaults to `None`.
        - `string_factory`: the name of the factory to be used to build the `faiss` index.
            Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.
            Defaults to `None`.
        - `metric_type`: the metric to be used to measure the distance between the points.
            It's an integer and the recommend way to pass it is importing `faiss` and then
            passing one of `faiss.METRIC_x` variables. Defaults to `None`.
        - `k`: the number of nearest neighbours to search for each input row. Defaults to `1`.
        - `search_batch_size`: the number of rows to include in a search batch. The value
            can be adjusted to maximize the resources usage or to avoid OOM issues. Defaults
            to `50`.
        - `train_size`: If the index needs a training step, specifies how many vectors will
            be used to train the index.

    Input columns:
        - embedding (`List[Union[float, int]]`): a sentence embedding.

    Output columns:
        - nn_indices (`List[int]`): a list containing the indices of the `k` nearest neighbours
            in the inputs for the row.
        - nn_scores (`List[float]`): a list containing the score or distance to each `k`
            nearest neighbour in the inputs.

    Categories:
        - embedding

    References:
        - [`The Faiss library`](https://arxiv.org/abs/2401.08281)

    Examples:
        Generating embeddings and getting the nearest neighbours:

        ```python
        from distilabel.models import SentenceTransformerEmbeddings
        from distilabel.pipeline import Pipeline
        from distilabel.steps import EmbeddingGeneration, FaissNearestNeighbour, LoadDataFromHub

        with Pipeline(name="hello") as pipeline:
            load_data = LoadDataFromHub(output_mappings={"prompt": "text"})

            embeddings = EmbeddingGeneration(
                embeddings=SentenceTransformerEmbeddings(
                    model="mixedbread-ai/mxbai-embed-large-v1"
                )
            )

            nearest_neighbours = FaissNearestNeighbour()

            load_data >> embeddings >> nearest_neighbours

        if __name__ == "__main__":
            distiset = pipeline.run(
                parameters={
                    load_data.name: {
                        "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                        "split": "test",
                    },
                },
                use_cache=False,
            )
        ```

    Citations:
        ```
        @misc{douze2024faisslibrary,
            title={The Faiss library},
            author={Matthijs Douze and Alexandr Guzhva and Chengqi Deng and Jeff Johnson and Gergely Szilvasy and Pierre-Emmanuel Mazaré and Maria Lomeli and Lucas Hosseini and Hervé Jégou},
            year={2024},
            eprint={2401.08281},
            archivePrefix={arXiv},
            primaryClass={cs.LG},
            url={https://arxiv.org/abs/2401.08281},
        }
        ```
    """

    device: Optional[RuntimeParameter[Union[int, List[int]]]] = Field(
        default=None,
        description="The CUDA device ID or a list of IDs to be used. If negative integer,"
        " it will use all the available GPUs.",
    )
    string_factory: Optional[RuntimeParameter[str]] = Field(
        default=None,
        description="The name of the factory to be used to build the `faiss` index."
        "Available string factories can be checked here: https://github.com/facebookresearch/faiss/wiki/Faiss-indexes.",
    )
    metric_type: Optional[RuntimeParameter[int]] = Field(
        default=None,
        description="The metric to be used to measure the distance between the points. It's"
        " an integer and the recommend way to pass it is importing `faiss` and thenpassing"
        " one of `faiss.METRIC_x` variables.",
    )
    k: Optional[RuntimeParameter[int]] = Field(
        default=1,
        description="The number of nearest neighbours to search for each input row.",
    )
    search_batch_size: Optional[RuntimeParameter[int]] = Field(
        default=50,
        description="The number of rows to include in a search batch. The value can be adjusted"
        " to maximize the resources usage or to avoid OOM issues.",
    )
    train_size: Optional[RuntimeParameter[int]] = Field(
        default=None,
        description="If the index needs a training step, specifies how many vectors will be used to train the index.",
    )

    def load(self) -> None:
        super().load()

        if importlib.util.find_spec("faiss") is None:
            raise ImportError(
                "`faiss` package is not installed. Please install it using `pip install"
                " 'distilabel[faiss-cpu]' or 'distilabel[faiss-gpu]'`."
            )

    @property
    def inputs(self) -> List[str]:
        return ["embedding"]

    @property
    def outputs(self) -> List[str]:
        return ["nn_indices", "nn_scores"]

    def _build_index(self, inputs: List[Dict[str, Any]]) -> Dataset:
        """Builds a `faiss` index using `datasets` integration.

        Args:
            inputs: a list of dictionaries.

        Returns:
            The build `datasets.Dataset` with its `faiss` index.
        """
        dataset = Dataset.from_list(inputs)
        if self.train_size is not None and self.string_factory:
            self._logger.info("🏋️‍♀️ Starting Faiss index training...")
        dataset.add_faiss_index(
            column="embedding",
            device=self.device,  # type: ignore
            string_factory=self.string_factory,
            metric_type=self.metric_type,
            train_size=self.train_size,
        )
        return dataset

    def _save_index(self, dataset: Dataset) -> None:
        """Save the generated Faiss index as an artifact of the step.

        Args:
            dataset: the dataset with the `faiss` index built.
        """
        self.save_artifact(
            name="faiss_index",
            write_function=lambda path: dataset.save_faiss_index(
                index_name="embedding", file=path / "index.faiss"
            ),
            metadata={
                "num_rows": len(dataset),
                "embedding_dim": len(dataset[0]["embedding"]),
            },
        )

    def _search(self, dataset: Dataset) -> Dataset:
        """Search the top `k` nearest neighbours for each row in the dataset.

        Args:
            dataset: the dataset with the `faiss` index built.

        Returns:
            The updated dataset containing the top `k` nearest neighbours for each row,
            as well as the score or distance.
        """

        def add_search_results(examples: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
            queries = np.array(examples["embedding"])
            results = dataset.search_batch(
                index_name="embedding",
                queries=queries,
                k=self.k + 1,  # type: ignore
            )
            examples["nn_indices"] = [indices[1:] for indices in results.total_indices]
            examples["nn_scores"] = [scores[1:] for scores in results.total_scores]
            return examples

        return dataset.map(
            add_search_results, batched=True, batch_size=self.search_batch_size
        )

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        dataset = self._build_index(inputs)
        dataset_with_search_results = self._search(dataset)
        self._save_index(dataset)
        yield dataset_with_search_results.to_list()
_build_index(inputs)

使用 datasets 集成构建 faiss 索引。

参数

名称 类型 描述 默认
inputs List[Dict[str, Any]]

字典列表。

required

返回

类型 描述
Dataset

构建的带有 faiss 索引的 datasets.Dataset

源代码位于 src/distilabel/steps/embeddings/nearest_neighbour.py
def _build_index(self, inputs: List[Dict[str, Any]]) -> Dataset:
    """Builds a `faiss` index using `datasets` integration.

    Args:
        inputs: a list of dictionaries.

    Returns:
        The build `datasets.Dataset` with its `faiss` index.
    """
    dataset = Dataset.from_list(inputs)
    if self.train_size is not None and self.string_factory:
        self._logger.info("🏋️‍♀️ Starting Faiss index training...")
    dataset.add_faiss_index(
        column="embedding",
        device=self.device,  # type: ignore
        string_factory=self.string_factory,
        metric_type=self.metric_type,
        train_size=self.train_size,
    )
    return dataset
_save_index(dataset)

将生成的 Faiss 索引另存为 step 的工件。

参数

名称 类型 描述 默认
dataset Dataset

构建了 faiss 索引的数据集。

required
源代码位于 src/distilabel/steps/embeddings/nearest_neighbour.py
def _save_index(self, dataset: Dataset) -> None:
    """Save the generated Faiss index as an artifact of the step.

    Args:
        dataset: the dataset with the `faiss` index built.
    """
    self.save_artifact(
        name="faiss_index",
        write_function=lambda path: dataset.save_faiss_index(
            index_name="embedding", file=path / "index.faiss"
        ),
        metadata={
            "num_rows": len(dataset),
            "embedding_dim": len(dataset[0]["embedding"]),
        },
    )

搜索数据集中每行的前 k 个最近邻居。

参数

名称 类型 描述 默认
dataset Dataset

构建了 faiss 索引的数据集。

required

返回

类型 描述
Dataset

更新后的数据集,其中包含每行的前 k 个最近邻居,

Dataset

以及分数或距离。

源代码位于 src/distilabel/steps/embeddings/nearest_neighbour.py
def _search(self, dataset: Dataset) -> Dataset:
    """Search the top `k` nearest neighbours for each row in the dataset.

    Args:
        dataset: the dataset with the `faiss` index built.

    Returns:
        The updated dataset containing the top `k` nearest neighbours for each row,
        as well as the score or distance.
    """

    def add_search_results(examples: Dict[str, List[Any]]) -> Dict[str, List[Any]]:
        queries = np.array(examples["embedding"])
        results = dataset.search_batch(
            index_name="embedding",
            queries=queries,
            k=self.k + 1,  # type: ignore
        )
        examples["nn_indices"] = [indices[1:] for indices in results.total_indices]
        examples["nn_scores"] = [scores[1:] for scores in results.total_scores]
        return examples

    return dataset.map(
        add_search_results, batched=True, batch_size=self.search_batch_size
    )

EmbeddingDedup

Bases: GlobalStep

使用 embeddings 去重文本。

EmbeddingDedup 是一个 Step,它使用 embeddings 来检测数据集中的近似重复项,以比较文本之间的相似性。 此 step 的典型工作流程包括拥有一个预先计算了 embeddings 的数据集,然后(可能使用 FaissNearestNeighbour)使用 nn_indicesnn_scores 来确定重复的文本。

属性

名称 类型 描述
threshold Optional[RuntimeParameter[float]]

将 2 个示例视为重复项的阈值。 它取决于用于生成 embeddings 的索引类型。 例如,如果 embeddings 是使用余弦相似度生成的,则阈值 0.9 会使所有余弦相似度高于该值的文本成为重复项。 在此类索引中,较高的值检测到的重复项较少,但在构建索引时应考虑到这一点。 默认为 0.9

运行时参数
  • threshold: 将 2 个示例视为重复项的阈值。
输入列
  • nn_indices (List[int]): 包含行输入中 k 个最近邻居索引的列表。
  • nn_scores (List[float]): 包含到输入中每个 k 个最近邻居的分数或距离的列表。
输出列
  • keep_row_after_embedding_filtering (bool): 布尔值,指示 piece text 是否不是重复项,即应保留此文本。
类别
  • filtering

示例

Deduplicate a list of texts using embedding information:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import EmbeddingDedup
from distilabel.steps import LoadDataFromDicts

with Pipeline() as pipeline:
    data = LoadDataFromDicts(
        data=[
            {
                "persona": "A chemistry student or academic researcher interested in inorganic or physical chemistry, likely at an advanced undergraduate or graduate level, studying acid-base interactions and chemical bonding.",
                "embedding": [
                    0.018477669046149742,
                    -0.03748236608841726,
                    0.001919870620352492,
                    0.024918478063770535,
                    0.02348063521315178,
                    0.0038251285566308375,
                    -0.01723884983037716,
                    0.02881971942372201,
                ],
                "nn_indices": [0, 1],
                "nn_scores": [
                    0.9164746999740601,
                    0.782106876373291,
                ],
            },
            {
                "persona": "A music teacher or instructor focused on theoretical and practical piano lessons.",
                "embedding": [
                    -0.0023464179614082125,
                    -0.07325472251663565,
                    -0.06058678419516501,
                    -0.02100326928586996,
                    -0.013462744792362657,
                    0.027368447064244242,
                    -0.003916070100455717,
                    0.01243614518480423,
                ],
                "nn_indices": [0, 2],
                "nn_scores": [
                    0.7552462220191956,
                    0.7261884808540344,
                ],
            },
            {
                "persona": "A classical guitar teacher or instructor, likely with experience teaching beginners, who focuses on breaking down complex music notation into understandable steps for their students.",
                "embedding": [
                    -0.01630817942328242,
                    -0.023760151552345232,
                    -0.014249650090627883,
                    -0.005713686451446624,
                    -0.016033059279131567,
                    0.0071440908501058786,
                    -0.05691099643425161,
                    0.01597412704817784,
                ],
                "nn_indices": [1, 2],
                "nn_scores": [
                    0.8107735514640808,
                    0.7172299027442932,
                ],
            },
        ],
        batch_size=batch_size,
    )
    # In general you should do something like this before the deduplication step, to obtain the
    # `nn_indices` and `nn_scores`. In this case the embeddings are already normalized, so there's
    # no need for it.
    # nn = FaissNearestNeighbour(
    #     k=30,
    #     metric_type=faiss.METRIC_INNER_PRODUCT,
    #     search_batch_size=50,
    #     train_size=len(dataset),              # The number of embeddings to use for training
    #     string_factory="IVF300_HNSW32,Flat"   # To use an index (optional, maybe required for big datasets)
    # )
    # Read more about the `string_factory` here:
    # https://github.com/facebookresearch/faiss/wiki/Guidelines-to-choose-an-index

    embedding_dedup = EmbeddingDedup(
        threshold=0.8,
        input_batch_size=batch_size,
    )

    data >> embedding_dedup

if __name__ == "__main__":
    distiset = pipeline.run(use_cache=False)
    ds = distiset["default"]["train"]
    # Filter out the duplicates
    ds_dedup = ds.filter(lambda x: x["keep_row_after_embedding_filtering"])
```
源代码位于 src/distilabel/steps/filtering/embedding.py
class EmbeddingDedup(GlobalStep):
    """Deduplicates text using embeddings.

    `EmbeddingDedup` is a Step that detects near-duplicates in datasets, using
    embeddings to compare the similarity between the texts. The typical workflow with this step
    would include having a dataset with embeddings precomputed, and then (possibly using the
    `FaissNearestNeighbour`) using the `nn_indices` and `nn_scores`, determine the texts that
    are duplicate.

    Attributes:
        threshold: the threshold to consider 2 examples as duplicates.
            It's dependent on the type of index that was used to generate the embeddings.
            For example, if the embeddings were generated using cosine similarity, a threshold
            of `0.9` would make all the texts with a cosine similarity above the value
            duplicates. Higher values detect less duplicates in such an index, but that should
            be taken into account when building it. Defaults to `0.9`.

    Runtime Parameters:
        - `threshold`: the threshold to consider 2 examples as duplicates.

    Input columns:
        - nn_indices (`List[int]`): a list containing the indices of the `k` nearest neighbours
            in the inputs for the row.
        - nn_scores (`List[float]`): a list containing the score or distance to each `k`
            nearest neighbour in the inputs.

    Output columns:
        - keep_row_after_embedding_filtering (`bool`): boolean indicating if the piece `text` is
            not a duplicate i.e. this text should be kept.

    Categories:
        - filtering

    Examples:

        Deduplicate a list of texts using embedding information:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import EmbeddingDedup
        from distilabel.steps import LoadDataFromDicts

        with Pipeline() as pipeline:
            data = LoadDataFromDicts(
                data=[
                    {
                        "persona": "A chemistry student or academic researcher interested in inorganic or physical chemistry, likely at an advanced undergraduate or graduate level, studying acid-base interactions and chemical bonding.",
                        "embedding": [
                            0.018477669046149742,
                            -0.03748236608841726,
                            0.001919870620352492,
                            0.024918478063770535,
                            0.02348063521315178,
                            0.0038251285566308375,
                            -0.01723884983037716,
                            0.02881971942372201,
                        ],
                        "nn_indices": [0, 1],
                        "nn_scores": [
                            0.9164746999740601,
                            0.782106876373291,
                        ],
                    },
                    {
                        "persona": "A music teacher or instructor focused on theoretical and practical piano lessons.",
                        "embedding": [
                            -0.0023464179614082125,
                            -0.07325472251663565,
                            -0.06058678419516501,
                            -0.02100326928586996,
                            -0.013462744792362657,
                            0.027368447064244242,
                            -0.003916070100455717,
                            0.01243614518480423,
                        ],
                        "nn_indices": [0, 2],
                        "nn_scores": [
                            0.7552462220191956,
                            0.7261884808540344,
                        ],
                    },
                    {
                        "persona": "A classical guitar teacher or instructor, likely with experience teaching beginners, who focuses on breaking down complex music notation into understandable steps for their students.",
                        "embedding": [
                            -0.01630817942328242,
                            -0.023760151552345232,
                            -0.014249650090627883,
                            -0.005713686451446624,
                            -0.016033059279131567,
                            0.0071440908501058786,
                            -0.05691099643425161,
                            0.01597412704817784,
                        ],
                        "nn_indices": [1, 2],
                        "nn_scores": [
                            0.8107735514640808,
                            0.7172299027442932,
                        ],
                    },
                ],
                batch_size=batch_size,
            )
            # In general you should do something like this before the deduplication step, to obtain the
            # `nn_indices` and `nn_scores`. In this case the embeddings are already normalized, so there's
            # no need for it.
            # nn = FaissNearestNeighbour(
            #     k=30,
            #     metric_type=faiss.METRIC_INNER_PRODUCT,
            #     search_batch_size=50,
            #     train_size=len(dataset),              # The number of embeddings to use for training
            #     string_factory="IVF300_HNSW32,Flat"   # To use an index (optional, maybe required for big datasets)
            # )
            # Read more about the `string_factory` here:
            # https://github.com/facebookresearch/faiss/wiki/Guidelines-to-choose-an-index

            embedding_dedup = EmbeddingDedup(
                threshold=0.8,
                input_batch_size=batch_size,
            )

            data >> embedding_dedup

        if __name__ == "__main__":
            distiset = pipeline.run(use_cache=False)
            ds = distiset["default"]["train"]
            # Filter out the duplicates
            ds_dedup = ds.filter(lambda x: x["keep_row_after_embedding_filtering"])
        ```
    """

    threshold: Optional[RuntimeParameter[float]] = Field(
        default=0.9,
        description="The threshold to consider 2 examples as duplicates. It's dependent "
        "on the type of index that was used to generate the embeddings. For example, if "
        "the embeddings were generated using cosine similarity, a threshold of `0.9` "
        "would make all the texts with a cosine similarity above the value duplicates. "
        "Higher values detect less duplicates in such an index, but that should be "
        "taken into account when building it.",
    )

    @property
    def inputs(self) -> List[str]:
        return ["nn_scores", "nn_indices"]

    @property
    def outputs(self) -> List[str]:
        return ["keep_row_after_embedding_filtering"]

    @override
    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        rows_to_remove = set()

        for input in track(inputs, description="Running Embedding deduplication..."):
            input["keep_row_after_embedding_filtering"] = True
            indices_scores = np.array(input["nn_scores"]) > self.threshold
            indices = np.array(input["nn_indices"])[indices_scores]
            if len(indices) > 0:  # If there are any rows found over the threshold
                rows_to_remove.update(list(indices))

        # Remove duplicates and get the list of rows to remove
        for idx in rows_to_remove:
            inputs[idx]["keep_row_after_embedding_filtering"] = False

        yield inputs

MinHashDedup

Bases: Step

使用 MinHashMinHashLSH 去重文本。

MinHashDedup 是一个 Step,它检测数据集中的近似重复项。 该想法大致转化为以下步骤:1. 将文本标记化为单词或 n 元语法。 2. 为每个文本创建一个 MinHash。 3. 将 MinHashes 存储在 MinHashLSH 中。 4. 检查 MinHash 是否已在 LSH 中,如果是,则它是重复项。

属性

名称 类型 描述
num_perm int

要使用的排列数。 默认为 128

seed int

用于 MinHash 的种子。 默认为 1

tokenizer Literal['words', 'ngrams']

要使用的 tokenizer。 可用的有 wordsngrams。 如果选择 words,它将使用 nltk 的单词 tokenizer 将文本标记化为单词。 ngram 估计 n 元语法(以及大小 n)。 默认为 words

n Optional[int]

要使用的 n 元语法的大小。 仅当 tokenizer="ngrams" 时才相关。 默认为 5

threshold float

将两个 MinHashes 视为重复项的阈值。 值越接近 0,检测到的重复项越多。 默认为 0.9

storage Literal['dict', 'disk']

用于 LSH 的存储。 可以是 dict 以将索引存储在内存中,也可以是 disk。 请记住,disk 是一个实验性功能,未在 datasketch 中定义,它基于 DiskCache 的 Index 类。 它应该像 dict 一样工作,但由磁盘支持,但根据系统,它可能会更慢。 默认为 dict

输入列
  • text (str): 要过滤的文本。
输出列
  • keep_row_after_minhash_filtering (bool): 布尔值,指示 piece text 是否不是重复项,即应保留此文本。
类别
  • filtering
参考文献

示例

Deduplicate a list of texts using MinHash and MinHashLSH:

```python
from distilabel.pipeline import Pipeline
from distilabel.steps import MinHashDedup
from distilabel.steps import LoadDataFromDicts

with Pipeline() as pipeline:
    ds_size = 1000
    batch_size = 500  # Bigger batch sizes work better for this step
    data = LoadDataFromDicts(
        data=[
            {"text": "This is a test document."},
            {"text": "This document is a test."},
            {"text": "Test document for duplication."},
            {"text": "Document for duplication test."},
            {"text": "This is another unique document."},
        ]
        * (ds_size // 5),
        batch_size=batch_size,
    )
    minhash_dedup = MinHashDedup(
        tokenizer="words",
        threshold=0.9,      # lower values will increase the number of duplicates
        storage="dict",     # or "disk" for bigger datasets
    )

    data >> minhash_dedup

if __name__ == "__main__":
    distiset = pipeline.run(use_cache=False)
    ds = distiset["default"]["train"]
    # Filter out the duplicates
    ds_dedup = ds.filter(lambda x: x["keep_row_after_minhash_filtering"])
```
源代码位于 src/distilabel/steps/filtering/minhash.py
class MinHashDedup(Step):
    """Deduplicates text using `MinHash` and `MinHashLSH`.

    `MinHashDedup` is a Step that detects near-duplicates in datasets. The idea roughly translates
    to the following steps:
    1. Tokenize the text into words or ngrams.
    2. Create a `MinHash` for each text.
    3. Store the `MinHashes` in a `MinHashLSH`.
    4. Check if the `MinHash` is already in the `LSH`, if so, it is a duplicate.

    Attributes:
        num_perm: the number of permutations to use. Defaults to `128`.
        seed: the seed to use for the MinHash. Defaults to `1`.
        tokenizer: the tokenizer to use. Available ones are `words` or `ngrams`.
            If `words` is selected, it tokenizes the text into words using nltk's
            word tokenizer. `ngram` estimates the ngrams (together with the size
            `n`). Defaults to `words`.
        n: the size of the ngrams to use. Only relevant if `tokenizer="ngrams"`. Defaults to `5`.
        threshold: the threshold to consider two MinHashes as duplicates.
            Values closer to 0 detect more duplicates. Defaults to `0.9`.
        storage: the storage to use for the LSH. Can be `dict` to store the index
            in memory, or `disk`. Keep in mind, `disk` is an experimental feature
            not defined in `datasketch`, that is based on DiskCache's `Index` class.
            It should work as a `dict`, but backed by disk, but depending on the system
            it can be slower. Defaults to `dict`.

    Input columns:
        - text (`str`): the texts to be filtered.

    Output columns:
        - keep_row_after_minhash_filtering (`bool`): boolean indicating if the piece `text` is
            not a duplicate i.e. this text should be kept.

    Categories:
        - filtering

    References:
        - [`datasketch documentation`](https://ekzhu.github.io/datasketch/lsh.html)
        - [Identifying and Filtering Near-Duplicate Documents](https://cs.brown.edu/courses/cs253/papers/nearduplicate.pdf)
        - [Diskcache's Index](https://grantjenks.com/docs/diskcache/api.html#diskcache.Index)

    Examples:

        Deduplicate a list of texts using MinHash and MinHashLSH:

        ```python
        from distilabel.pipeline import Pipeline
        from distilabel.steps import MinHashDedup
        from distilabel.steps import LoadDataFromDicts

        with Pipeline() as pipeline:
            ds_size = 1000
            batch_size = 500  # Bigger batch sizes work better for this step
            data = LoadDataFromDicts(
                data=[
                    {"text": "This is a test document."},
                    {"text": "This document is a test."},
                    {"text": "Test document for duplication."},
                    {"text": "Document for duplication test."},
                    {"text": "This is another unique document."},
                ]
                * (ds_size // 5),
                batch_size=batch_size,
            )
            minhash_dedup = MinHashDedup(
                tokenizer="words",
                threshold=0.9,      # lower values will increase the number of duplicates
                storage="dict",     # or "disk" for bigger datasets
            )

            data >> minhash_dedup

        if __name__ == "__main__":
            distiset = pipeline.run(use_cache=False)
            ds = distiset["default"]["train"]
            # Filter out the duplicates
            ds_dedup = ds.filter(lambda x: x["keep_row_after_minhash_filtering"])
        ```
    """

    num_perm: int = 128
    seed: int = 1
    tokenizer: Literal["words", "ngrams"] = "words"
    n: Optional[int] = 5
    threshold: float = 0.9
    storage: Literal["dict", "disk"] = "dict"

    _hasher: Union["MinHash", None] = PrivateAttr(None)
    _tokenizer: Union[Callable, None] = PrivateAttr(None)
    _lhs: Union["MinHashLSH", None] = PrivateAttr(None)

    def load(self) -> None:
        super().load()
        if not importlib.import_module("datasketch"):
            raise ImportError(
                "`datasketch` is needed to deduplicate with MinHash, but is not installed. "
                "Please install it using `pip install 'distilabel[minhash]'`."
            )
        from datasketch import MinHash

        from distilabel.steps.filtering._datasketch import MinHashLSH

        self._hasher = MinHash.bulk
        self._lsh = MinHashLSH(
            num_perm=self.num_perm,
            threshold=self.threshold,
            storage_config={"type": self.storage},
        )

        if self.tokenizer == "words":
            if not importlib.import_module("nltk"):
                raise ImportError(
                    "`nltk` is needed to tokenize based on words, but is not installed. "
                    "Please install it using `pip install 'distilabel[minhash]'`. Then run `nltk.download('punkt_tab')`."
                )
            self._tokenizer = tokenized_on_words
        else:
            self._tokenizer = partial(tokenize_on_ngrams, n=self.n)

    def unload(self) -> None:
        super().unload()
        # In case of LSH being stored in disk, we need to close the file.
        if self.storage == "disk":
            self._lsh.close()

    @property
    def inputs(self) -> List[str]:
        return ["text"]

    @property
    def outputs(self) -> List[str]:
        return ["keep_row_after_minhash_filtering"]

    def process(self, inputs: StepInput) -> "StepOutput":
        tokenized_texts = []
        for input in inputs:
            tokenized_texts.append(self._tokenizer([input[self.inputs[0]]])[0])

        minhashes = self._hasher(
            tokenized_texts, num_perm=self.num_perm, seed=self.seed
        )

        for input, minhash in zip(inputs, minhashes):
            # Check if the text is already in the LSH index
            if self._lsh.query(minhash):
                input["keep_row_after_minhash_filtering"] = False
            else:
                self._lsh.insert(str(uuid.uuid4()), minhash)
                input["keep_row_after_minhash_filtering"] = True

        yield inputs

ConversationTemplate

Bases: Step

从 instruction 和 response 生成 conversation template。

输入列
  • instruction (str): 要在 conversation 中使用的 instruction。
  • response (str): 要在 conversation 中使用的 response。
输出列
  • conversation (ChatType): conversation template。
类别
  • format
  • chat
  • template

示例

从 instruction 和 response 创建 conversation

from distilabel.steps import ConversationTemplate

conv_template = ConversationTemplate()
conv_template.load()

result = next(
    conv_template.process(
        [
            {
                "instruction": "Hello",
                "response": "Hi",
            }
        ],
    )
)
# >>> result
# [{'instruction': 'Hello', 'response': 'Hi', 'conversation': [{'role': 'user', 'content': 'Hello'}, {'role': 'assistant', 'content': 'Hi'}]}]
源代码位于 src/distilabel/steps/formatting/conversation.py
class ConversationTemplate(Step):
    """Generate a conversation template from an instruction and a response.

    Input columns:
        - instruction (`str`): The instruction to be used in the conversation.
        - response (`str`): The response to be used in the conversation.

    Output columns:
        - conversation (`ChatType`): The conversation template.

    Categories:
        - format
        - chat
        - template

    Examples:
        Create a conversation from an instruction and a response:

        ```python
        from distilabel.steps import ConversationTemplate

        conv_template = ConversationTemplate()
        conv_template.load()

        result = next(
            conv_template.process(
                [
                    {
                        "instruction": "Hello",
                        "response": "Hi",
                    }
                ],
            )
        )
        # >>> result
        # [{'instruction': 'Hello', 'response': 'Hi', 'conversation': [{'role': 'user', 'content': 'Hello'}, {'role': 'assistant', 'content': 'Hi'}]}]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """The instruction and response."""
        return ["instruction", "response"]

    @property
    def outputs(self) -> "StepColumns":
        """The conversation template."""
        return ["conversation"]

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        """Generate a conversation template from an instruction and a response.

        Args:
            inputs: The input data.

        Yields:
            The input data with the conversation template.
        """
        for input in inputs:
            input["conversation"] = [
                {"role": "user", "content": input["instruction"]},
                {"role": "assistant", "content": input["response"]},
            ]
        yield inputs
inputs property

instruction 和 response。

outputs property

conversation template。

process(inputs)

从 instruction 和 response 生成 conversation template。

参数

名称 类型 描述 默认
inputs StepInput

输入数据。

required

Yields

类型 描述
StepOutput

带有 conversation template 的输入数据。

源代码位于 src/distilabel/steps/formatting/conversation.py
def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
    """Generate a conversation template from an instruction and a response.

    Args:
        inputs: The input data.

    Yields:
        The input data with the conversation template.
    """
    for input in inputs:
        input["conversation"] = [
            {"role": "user", "content": input["instruction"]},
            {"role": "assistant", "content": input["response"]},
        ]
    yield inputs

FormatChatGenerationDPO

Bases: Step

格式化 Direct Preference Optimization (DPO) 的 ChatGeneration 和偏好任务的组合输出。

FormatChatGenerationDPO 是一个 Step,用于格式化 ChatGeneration 任务与偏好 Task 的组合输出,例如生成 ratings 的任务(如 UltraFeedback),遵循 axolotlalignment-handbook 等框架的标准格式。这样,这些评分将被用于对现有生成结果进行排序,并基于 ratings 提供 chosenrejected 生成结果。

注意

messages 列应至少包含一条来自用户的消息,generations 列应至少包含两个生成结果,ratings 列应包含与生成结果数量相同的评分。

输入列
  • messages (List[Dict[str, str]]): 对话消息。
  • generations (List[str]): 由 LLM 生成的结果。
  • generation_models (List[str], 可选): 用于生成 generations 的模型名称,仅当来自 ChatGeneration 任务的模型名称被合并到名为此名称的单个列中时可用,否则将被忽略。
  • ratings (List[float]): 每个 generations 的评分,由偏好任务(如 UltraFeedback)生成。
输出列
  • prompt (str): 用于使用 LLM 生成 generations 的用户消息。
  • prompt_id (str): promptSHA256 哈希值。
  • chosen (List[Dict[str, str]]): 基于 ratings 选择的 chosen 生成结果。
  • chosen_model (str, 可选): 用于生成 chosen 生成结果的模型名称,如果 generation_models 可用。
  • chosen_rating (float): chosen 生成结果的评分。
  • rejected (List[Dict[str, str]]): 基于 ratings 拒绝的 rejected 生成结果。
  • rejected_model (str, 可选): 用于生成 rejected 生成结果的模型名称,如果 generation_models 可用。
  • rejected_rating (float): rejected 生成结果的评分。
类别
  • format
  • chat-generation
  • preference
  • messages
  • generations

示例

格式化您的数据集以进行 DPO 微调

from distilabel.steps import FormatChatGenerationDPO

format_dpo = FormatChatGenerationDPO()
format_dpo.load()

# NOTE: "generation_models" can be added optionally.
result = next(
    format_dpo.process(
        [
            {
                "messages": [{"role": "user", "content": "What's 2+2?"}],
                "generations": ["4", "5", "6"],
                "ratings": [1, 0, -1],
            }
        ]
    )
)
# >>> result
# [
#     {
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}],
#         'generations': ['4', '5', '6'],
#         'ratings': [1, 0, -1],
#         'prompt': "What's 2+2?",
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#         'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#         'chosen_rating': 1,
#         'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
#         'rejected_rating': -1
#     }
# ]
源代码位于 src/distilabel/steps/formatting/dpo.py
class FormatChatGenerationDPO(Step):
    """Format the output of a combination of a `ChatGeneration` + a preference task for Direct Preference Optimization (DPO).

    `FormatChatGenerationDPO` is a `Step` that formats the output of the combination of a `ChatGeneration`
    task with a preference `Task` i.e. a task generating `ratings` such as `UltraFeedback` following the standard
    formatting from frameworks such as `axolotl` or `alignment-handbook`., so that those are used to rank the
    existing generations and provide the `chosen` and `rejected` generations based on the `ratings`.

    Note:
        The `messages` column should contain at least one message from the user, the `generations`
        column should contain at least two generations, the `ratings` column should contain the same
        number of ratings as generations.

    Input columns:
        - messages (`List[Dict[str, str]]`): The conversation messages.
        - generations (`List[str]`): The generations produced by the `LLM`.
        - generation_models (`List[str]`, optional): The model names used to generate the `generations`,
            only available if the `model_name` from the `ChatGeneration` task/s is combined into a single
            column named this way, otherwise, it will be ignored.
        - ratings (`List[float]`): The ratings for each of the `generations`, produced by a preference
            task such as `UltraFeedback`.

    Output columns:
        - prompt (`str`): The user message used to generate the `generations` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - chosen (`List[Dict[str, str]]`): The `chosen` generation based on the `ratings`.
        - chosen_model (`str`, optional): The model name used to generate the `chosen` generation,
            if the `generation_models` are available.
        - chosen_rating (`float`): The rating of the `chosen` generation.
        - rejected (`List[Dict[str, str]]`): The `rejected` generation based on the `ratings`.
        - rejected_model (`str`, optional): The model name used to generate the `rejected` generation,
            if the `generation_models` are available.
        - rejected_rating (`float`): The rating of the `rejected` generation.

    Categories:
        - format
        - chat-generation
        - preference
        - messages
        - generations

    Examples:
        Format your dataset for DPO fine tuning:

        ```python
        from distilabel.steps import FormatChatGenerationDPO

        format_dpo = FormatChatGenerationDPO()
        format_dpo.load()

        # NOTE: "generation_models" can be added optionally.
        result = next(
            format_dpo.process(
                [
                    {
                        "messages": [{"role": "user", "content": "What's 2+2?"}],
                        "generations": ["4", "5", "6"],
                        "ratings": [1, 0, -1],
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}],
        #         'generations': ['4', '5', '6'],
        #         'ratings': [1, 0, -1],
        #         'prompt': "What's 2+2?",
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #         'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #         'chosen_rating': 1,
        #         'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
        #         'rejected_rating': -1
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `messages`, `generations`,
        and `ratings`."""
        return ["messages", "generations", "ratings"]

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case is: `generation_models`."""
        return ["generation_models"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `chosen`,
        `chosen_model`, `chosen_rating`, `rejected`, `rejected_model`, `rejected_rating`. Both
        the `chosen_model` and `rejected_model` being optional and only used if `generation_models`
        is available.

        Reference:
            - Format inspired in https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
        """
        return [
            "prompt",
            "prompt_id",
            "chosen",
            "chosen_model",
            "chosen_rating",
            "rejected",
            "rejected_model",
            "rejected_rating",
        ]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the DPO formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = next(
                    (
                        turn["content"]
                        for turn in item["messages"]
                        if turn["role"] == "user"
                    ),
                    None,
                )
                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["chosen"] = item["messages"] + [
                    {
                        "role": "assistant",
                        "content": item["generations"][chosen_idx],
                    }
                ]
                if "generation_models" in item:
                    item["chosen_model"] = item["generation_models"][chosen_idx]
                item["chosen_rating"] = item["ratings"][chosen_idx]

                rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["rejected"] = item["messages"] + [
                    {
                        "role": "assistant",
                        "content": item["generations"][rejected_idx],
                    }
                ]
                if "generation_models" in item:
                    item["rejected_model"] = item["generation_models"][rejected_idx]
                item["rejected_rating"] = item["ratings"][rejected_idx]

            yield input
inputs property

Step 所需的输入列表,在本例中为:messagesgenerationsratings

optional_inputs property

可选输入列表,Step 不是必需的,但在可用时使用,在本例中为:generation_models

outputs property

Step 生成的输出列表,包括:promptprompt_idchosenchosen_modelchosen_ratingrejectedrejected_modelrejected_ratingchosen_modelrejected_model 都是可选的,仅在 generation_models 可用时使用。

参考
  • 格式灵感来自 https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

process 方法根据 DPO 格式标准格式化接收到的 StepInputStepInput 列表。

参数

名称 类型 描述 默认
*inputs StepInput

要组合的 StepInput 列表。

()

Yields

类型 描述
StepOutput

一个 StepOutput,其中包含遵循 DPO 标准的格式化 StepInput 批次。

源代码位于 src/distilabel/steps/formatting/dpo.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the DPO formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = next(
                (
                    turn["content"]
                    for turn in item["messages"]
                    if turn["role"] == "user"
                ),
                None,
            )
            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["chosen"] = item["messages"] + [
                {
                    "role": "assistant",
                    "content": item["generations"][chosen_idx],
                }
            ]
            if "generation_models" in item:
                item["chosen_model"] = item["generation_models"][chosen_idx]
            item["chosen_rating"] = item["ratings"][chosen_idx]

            rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["rejected"] = item["messages"] + [
                {
                    "role": "assistant",
                    "content": item["generations"][rejected_idx],
                }
            ]
            if "generation_models" in item:
                item["rejected_model"] = item["generation_models"][rejected_idx]
            item["rejected_rating"] = item["ratings"][rejected_idx]

        yield input

FormatTextGenerationDPO

Bases: Step

格式化您的 LLM 输出以进行 Direct Preference Optimization (DPO)。

FormatTextGenerationDPO 是一个 Step,用于格式化 TextGeneration 任务与偏好 Task 的组合输出,例如生成 ratings 的任务。这些评分将被用于对现有生成结果进行排序,并基于 ratings 提供 chosenrejected 生成结果。使用此步骤转换 TextGeneration 和偏好任务(如 UltraFeedback)的组合输出,遵循 axolotlalignment-handbook 等框架的标准格式。

注意

generations 列应至少包含两个生成结果,ratings 列应包含与生成结果数量相同的评分。

输入列
  • system_prompt (str, 可选): 在 LLM 中用于生成 generations 的系统提示(如果可用)。
  • instruction (str): 用于使用 LLM 生成 generations 的指令。
  • generations (List[str]): 由 LLM 生成的结果。
  • generation_models (List[str], 可选): 用于生成 generations 的模型名称,仅当来自 TextGeneration 任务的模型名称被合并到名为此名称的单个列中时可用,否则将被忽略。
  • ratings (List[float]): 每个 generations 的评分,由偏好任务(如 UltraFeedback)生成。
输出列
  • prompt (str): 用于使用 LLM 生成 generations 的指令。
  • prompt_id (str): promptSHA256 哈希值。
  • chosen (List[Dict[str, str]]): 基于 ratings 选择的 chosen 生成结果。
  • chosen_model (str, 可选): 用于生成 chosen 生成结果的模型名称,如果 generation_models 可用。
  • chosen_rating (float): chosen 生成结果的评分。
  • rejected (List[Dict[str, str]]): 基于 ratings 拒绝的 rejected 生成结果。
  • rejected_model (str, 可选): 用于生成 rejected 生成结果的模型名称,如果 generation_models 可用。
  • rejected_rating (float): rejected 生成结果的评分。
类别
  • format
  • text-generation
  • preference
  • instruction
  • generations

示例

格式化您的数据集以进行 DPO 微调

from distilabel.steps import FormatTextGenerationDPO

format_dpo = FormatTextGenerationDPO()
format_dpo.load()

# NOTE: Both "system_prompt" and "generation_models" can be added optionally.
result = next(
    format_dpo.process(
        [
            {
                "instruction": "What's 2+2?",
                "generations": ["4", "5", "6"],
                "ratings": [1, 0, -1],
            }
        ]
    )
)
# >>> result
# [
#    {   'instruction': "What's 2+2?",
#        'generations': ['4', '5', '6'],
#        'ratings': [1, 0, -1],
#        'prompt': "What's 2+2?",
#        'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#        'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#        'chosen_rating': 1,
#        'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
#        'rejected_rating': -1
#    }
# ]
源代码位于 src/distilabel/steps/formatting/dpo.py
class FormatTextGenerationDPO(Step):
    """Format the output of your LLMs for Direct Preference Optimization (DPO).

    `FormatTextGenerationDPO` is a `Step` that formats the output of the combination of a `TextGeneration`
    task with a preference `Task` i.e. a task generating `ratings`, so that those are used to rank the
    existing generations and provide the `chosen` and `rejected` generations based on the `ratings`.
    Use this step to transform the output of a combination of a `TextGeneration` + a preference task such as
    `UltraFeedback` following the standard formatting from frameworks such as `axolotl` or `alignment-handbook`.

    Note:
        The `generations` column should contain at least two generations, the `ratings` column should
        contain the same number of ratings as generations.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generations`, if available.
        - instruction (`str`): The instruction used to generate the `generations` with the `LLM`.
        - generations (`List[str]`): The generations produced by the `LLM`.
        - generation_models (`List[str]`, optional): The model names used to generate the `generations`,
            only available if the `model_name` from the `TextGeneration` task/s is combined into a single
            column named this way, otherwise, it will be ignored.
        - ratings (`List[float]`): The ratings for each of the `generations`, produced by a preference
            task such as `UltraFeedback`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generations` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - chosen (`List[Dict[str, str]]`): The `chosen` generation based on the `ratings`.
        - chosen_model (`str`, optional): The model name used to generate the `chosen` generation,
            if the `generation_models` are available.
        - chosen_rating (`float`): The rating of the `chosen` generation.
        - rejected (`List[Dict[str, str]]`): The `rejected` generation based on the `ratings`.
        - rejected_model (`str`, optional): The model name used to generate the `rejected` generation,
            if the `generation_models` are available.
        - rejected_rating (`float`): The rating of the `rejected` generation.

    Categories:
        - format
        - text-generation
        - preference
        - instruction
        - generations

    Examples:
        Format your dataset for DPO fine tuning:

        ```python
        from distilabel.steps import FormatTextGenerationDPO

        format_dpo = FormatTextGenerationDPO()
        format_dpo.load()

        # NOTE: Both "system_prompt" and "generation_models" can be added optionally.
        result = next(
            format_dpo.process(
                [
                    {
                        "instruction": "What's 2+2?",
                        "generations": ["4", "5", "6"],
                        "ratings": [1, 0, -1],
                    }
                ]
            )
        )
        # >>> result
        # [
        #    {   'instruction': "What's 2+2?",
        #        'generations': ['4', '5', '6'],
        #        'ratings': [1, 0, -1],
        #        'prompt': "What's 2+2?",
        #        'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #        'chosen': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #        'chosen_rating': 1,
        #        'rejected': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '6'}],
        #        'rejected_rating': -1
        #    }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, `generations`,
        and `ratings`."""
        return {
            "system_prompt": False,
            "instruction": True,
            "generations": True,
            "generation_models": False,
            "ratings": True,
        }

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case are: `system_prompt`, and `generation_models`."""
        return ["system_prompt", "generation_models"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `chosen`,
        `chosen_model`, `chosen_rating`, `rejected`, `rejected_model`, `rejected_rating`. Both
        the `chosen_model` and `rejected_model` being optional and only used if `generation_models`
        is available.

        Reference:
            - Format inspired in https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
        """
        return [
            "prompt",
            "prompt_id",
            "chosen",
            "chosen_model",
            "chosen_rating",
            "rejected",
            "rejected_model",
            "rejected_rating",
        ]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the DPO formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
        """
        for input in inputs:
            for item in input:
                messages = [
                    {"role": "user", "content": item["instruction"]},  # type: ignore
                ]
                if (
                    "system_prompt" in item
                    and isinstance(item["system_prompt"], str)  # type: ignore
                    and len(item["system_prompt"]) > 0  # type: ignore
                ):
                    messages.insert(
                        0,
                        {"role": "system", "content": item["system_prompt"]},  # type: ignore
                    )

                item["prompt"] = item["instruction"]
                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["chosen"] = messages + [
                    {
                        "role": "assistant",
                        "content": item["generations"][chosen_idx],
                    }
                ]
                if "generation_models" in item:
                    item["chosen_model"] = item["generation_models"][chosen_idx]
                item["chosen_rating"] = item["ratings"][chosen_idx]

                rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
                item["rejected"] = messages + [
                    {
                        "role": "assistant",
                        "content": item["generations"][rejected_idx],
                    }
                ]
                if "generation_models" in item:
                    item["rejected_model"] = item["generation_models"][rejected_idx]
                item["rejected_rating"] = item["ratings"][rejected_idx]

            yield input
inputs property

Step 所需的输入列表,在本例中为:instructiongenerationsratings

optional_inputs property

可选输入列表,Step 不是必需的,但在可用时使用,在本例中为:system_promptgeneration_models

outputs property

Step 生成的输出列表,包括:promptprompt_idchosenchosen_modelchosen_ratingrejectedrejected_modelrejected_ratingchosen_modelrejected_model 都是可选的,仅在 generation_models 可用时使用。

参考
  • 格式灵感来自 https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

process 方法根据 DPO 格式标准格式化接收到的 StepInputStepInput 列表。

参数

名称 类型 描述 默认
*inputs StepInput

要组合的 StepInput 列表。

()

Yields

类型 描述
StepOutput

一个 StepOutput,其中包含遵循 DPO 标准的格式化 StepInput 批次。

源代码位于 src/distilabel/steps/formatting/dpo.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the DPO formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the DPO standard.
    """
    for input in inputs:
        for item in input:
            messages = [
                {"role": "user", "content": item["instruction"]},  # type: ignore
            ]
            if (
                "system_prompt" in item
                and isinstance(item["system_prompt"], str)  # type: ignore
                and len(item["system_prompt"]) > 0  # type: ignore
            ):
                messages.insert(
                    0,
                    {"role": "system", "content": item["system_prompt"]},  # type: ignore
                )

            item["prompt"] = item["instruction"]
            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            chosen_idx = max(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["chosen"] = messages + [
                {
                    "role": "assistant",
                    "content": item["generations"][chosen_idx],
                }
            ]
            if "generation_models" in item:
                item["chosen_model"] = item["generation_models"][chosen_idx]
            item["chosen_rating"] = item["ratings"][chosen_idx]

            rejected_idx = min(enumerate(item["ratings"]), key=lambda x: x[1])[0]
            item["rejected"] = messages + [
                {
                    "role": "assistant",
                    "content": item["generations"][rejected_idx],
                }
            ]
            if "generation_models" in item:
                item["rejected_model"] = item["generation_models"][rejected_idx]
            item["rejected_rating"] = item["ratings"][rejected_idx]

        yield input

FormatChatGenerationSFT

Bases: Step

格式化 ChatGeneration 任务的输出以进行 Supervised Fine-Tuning (SFT)。

FormatChatGenerationSFT 是一个 Step,用于格式化 ChatGeneration 任务的输出,以进行 Supervised Fine-Tuning (SFT),遵循 axolotlalignment-handbook 等框架的标准格式。ChatGeneration 任务的输出被格式化为类聊天对话,其中 instruction 作为用户消息,generation 作为助手消息。可选地,如果 system_prompt 可用,它将被包含为对话中的第一条消息。

输入列
  • system_prompt (str, 可选): 在 LLM 中用于生成 generation 的系统提示(如果可用)。
  • instruction (str): 用于使用 LLM 生成 generation 的指令。
  • generation (str): 由 LLM 生成的结果。
输出列
  • prompt (str): 用于使用 LLM 生成 generation 的指令。
  • prompt_id (str): promptSHA256 哈希值。
  • messages (List[Dict[str, str]]): 类聊天对话,其中 instruction 作为用户消息,generation 作为助手消息。
类别
  • format
  • chat-generation
  • instruction
  • generation

示例

格式化您的数据集以进行 SFT

from distilabel.steps import FormatChatGenerationSFT

format_sft = FormatChatGenerationSFT()
format_sft.load()

# NOTE: "system_prompt" can be added optionally.
result = next(
    format_sft.process(
        [
            {
                "messages": [{"role": "user", "content": "What's 2+2?"}],
                "generation": "4"
            }
        ]
    )
)
# >>> result
# [
#     {
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
#         'generation': '4',
#         'prompt': 'What's 2+2?',
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#     }
# ]
源代码位于 src/distilabel/steps/formatting/sft.py
class FormatChatGenerationSFT(Step):
    """Format the output of a `ChatGeneration` task for Supervised Fine-Tuning (SFT).

    `FormatChatGenerationSFT` is a `Step` that formats the output of a `ChatGeneration` task for
    Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as `axolotl`
    or `alignment-handbook`. The output of the `ChatGeneration` task is formatted into a chat-like
    conversation with the `instruction` as the user message and the `generation` as the assistant
    message. Optionally, if the `system_prompt` is available, it is included as the first message
    in the conversation.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generation`, if available.
        - instruction (`str`): The instruction used to generate the `generation` with the `LLM`.
        - generation (`str`): The generation produced by the `LLM`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generation` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - messages (`List[Dict[str, str]]`): The chat-like conversation with the `instruction` as
            the user message and the `generation` as the assistant message.

    Categories:
        - format
        - chat-generation
        - instruction
        - generation

    Examples:
        Format your dataset for SFT:

        ```python
        from distilabel.steps import FormatChatGenerationSFT

        format_sft = FormatChatGenerationSFT()
        format_sft.load()

        # NOTE: "system_prompt" can be added optionally.
        result = next(
            format_sft.process(
                [
                    {
                        "messages": [{"role": "user", "content": "What's 2+2?"}],
                        "generation": "4"
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}],
        #         'generation': '4',
        #         'prompt': 'What's 2+2?',
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, and `generation`."""
        return ["messages", "generation"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `messages`.

        Reference:
            - Format inspired in https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
        """
        return ["prompt", "prompt_id", "messages"]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the SFT formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = next(
                    (
                        turn["content"]
                        for turn in item["messages"]
                        if turn["role"] == "user"
                    ),
                    None,
                )

                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                item["messages"] = item["messages"] + [
                    {"role": "assistant", "content": item["generation"]},  # type: ignore
                ]
            yield input
inputs property

Step 所需的输入列表,在本例中为:instructiongeneration

outputs property

Step 生成的输出列表,包括:promptprompt_idmessages

参考
  • 格式灵感来自 https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

process 方法根据 SFT 格式标准格式化接收到的 StepInputStepInput 列表。

参数

名称 类型 描述 默认
*inputs StepInput

要组合的 StepInput 列表。

()

Yields

类型 描述
StepOutput

一个 StepOutput,其中包含遵循 SFT 标准的格式化 StepInput 批次。

源代码位于 src/distilabel/steps/formatting/sft.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the SFT formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = next(
                (
                    turn["content"]
                    for turn in item["messages"]
                    if turn["role"] == "user"
                ),
                None,
            )

            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            item["messages"] = item["messages"] + [
                {"role": "assistant", "content": item["generation"]},  # type: ignore
            ]
        yield input

FormatTextGenerationSFT

Bases: Step

格式化 TextGeneration 任务的输出以进行 Supervised Fine-Tuning (SFT)。

FormatTextGenerationSFT 是一个 Step,用于格式化 TextGeneration 任务的输出,以进行 Supervised Fine-Tuning (SFT),遵循 axolotlalignment-handbook 等框架的标准格式。TextGeneration 任务的输出被格式化为类聊天对话,其中 instruction 作为用户消息,generation 作为助手消息。可选地,如果 system_prompt 可用,它将被包含为对话中的第一条消息。

输入列
  • system_prompt (str, 可选): 在 LLM 中用于生成 generation 的系统提示(如果可用)。
  • instruction (str): 用于使用 LLM 生成 generation 的指令。
  • generation (str): 由 LLM 生成的结果。
输出列
  • prompt (str): 用于使用 LLM 生成 generation 的指令。
  • prompt_id (str): promptSHA256 哈希值。
  • messages (List[Dict[str, str]]): 类聊天对话,其中 instruction 作为用户消息,generation 作为助手消息。
类别
  • format
  • text-generation
  • instruction
  • generation

示例

格式化您的数据集以进行 SFT 微调

from distilabel.steps import FormatTextGenerationSFT

format_sft = FormatTextGenerationSFT()
format_sft.load()

# NOTE: "system_prompt" can be added optionally.
result = next(
    format_sft.process(
        [
            {
                "instruction": "What's 2+2?",
                "generation": "4"
            }
        ]
    )
)
# >>> result
# [
#     {
#         'instruction': 'What's 2+2?',
#         'generation': '4',
#         'prompt': 'What's 2+2?',
#         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
#         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}]
#     }
# ]
源代码位于 src/distilabel/steps/formatting/sft.py
class FormatTextGenerationSFT(Step):
    """Format the output of a `TextGeneration` task for Supervised Fine-Tuning (SFT).

    `FormatTextGenerationSFT` is a `Step` that formats the output of a `TextGeneration` task for
    Supervised Fine-Tuning (SFT) following the standard formatting from frameworks such as `axolotl`
    or `alignment-handbook`. The output of the `TextGeneration` task is formatted into a chat-like
    conversation with the `instruction` as the user message and the `generation` as the assistant
    message. Optionally, if the `system_prompt` is available, it is included as the first message
    in the conversation.

    Input columns:
        - system_prompt (`str`, optional): The system prompt used within the `LLM` to generate the
            `generation`, if available.
        - instruction (`str`): The instruction used to generate the `generation` with the `LLM`.
        - generation (`str`): The generation produced by the `LLM`.

    Output columns:
        - prompt (`str`): The instruction used to generate the `generation` with the `LLM`.
        - prompt_id (`str`): The `SHA256` hash of the `prompt`.
        - messages (`List[Dict[str, str]]`): The chat-like conversation with the `instruction` as
            the user message and the `generation` as the assistant message.

    Categories:
        - format
        - text-generation
        - instruction
        - generation

    Examples:
        Format your dataset for SFT fine tuning:

        ```python
        from distilabel.steps import FormatTextGenerationSFT

        format_sft = FormatTextGenerationSFT()
        format_sft.load()

        # NOTE: "system_prompt" can be added optionally.
        result = next(
            format_sft.process(
                [
                    {
                        "instruction": "What's 2+2?",
                        "generation": "4"
                    }
                ]
            )
        )
        # >>> result
        # [
        #     {
        #         'instruction': 'What's 2+2?',
        #         'generation': '4',
        #         'prompt': 'What's 2+2?',
        #         'prompt_id': '7762ecf17ad41479767061a8f4a7bfa3b63d371672af5180872f9b82b4cd4e29',
        #         'messages': [{'role': 'user', 'content': "What's 2+2?"}, {'role': 'assistant', 'content': '4'}]
        #     }
        # ]
        ```
    """

    @property
    def inputs(self) -> "StepColumns":
        """List of inputs required by the `Step`, which in this case are: `instruction`, and `generation`."""
        return {
            "system_prompt": False,
            "instruction": True,
            "generation": True,
        }

    @property
    def optional_inputs(self) -> List[str]:
        """List of optional inputs, which are not required by the `Step` but used if available,
        which in this case is: `system_prompt`."""
        return ["system_prompt"]

    @property
    def outputs(self) -> "StepColumns":
        """List of outputs generated by the `Step`, which are: `prompt`, `prompt_id`, `messages`.

        Reference:
            - Format inspired in https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
        """
        return ["prompt", "prompt_id", "messages"]

    def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
        """The `process` method formats the received `StepInput` or list of `StepInput`
        according to the SFT formatting standard.

        Args:
            *inputs: A list of `StepInput` to be combined.

        Yields:
            A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
        """
        for input in inputs:
            for item in input:
                item["prompt"] = item["instruction"]

                item["prompt_id"] = hashlib.sha256(
                    item["prompt"].encode("utf-8")  # type: ignore
                ).hexdigest()

                item["messages"] = [
                    {"role": "user", "content": item["instruction"]},  # type: ignore
                    {"role": "assistant", "content": item["generation"]},  # type: ignore
                ]
                if (
                    "system_prompt" in item
                    and isinstance(item["system_prompt"], str)  # type: ignore
                    and len(item["system_prompt"]) > 0  # type: ignore
                ):
                    item["messages"].insert(
                        0,
                        {"role": "system", "content": item["system_prompt"]},  # type: ignore
                    )

            yield input
inputs property

Step 所需的输入列表,在本例中为:instructiongeneration

optional_inputs property

可选输入列表,Step 不是必需的,但在可用时使用,在本例中为:system_prompt

outputs property

Step 生成的输出列表,包括:promptprompt_idmessages

参考
  • 格式灵感来自 https://hugging-face.cn/datasets/HuggingFaceH4/ultrachat_200k
process(*inputs)

process 方法根据 SFT 格式标准格式化接收到的 StepInputStepInput 列表。

参数

名称 类型 描述 默认
*inputs StepInput

要组合的 StepInput 列表。

()

Yields

类型 描述
StepOutput

一个 StepOutput,其中包含遵循 SFT 标准的格式化 StepInput 批次。

源代码位于 src/distilabel/steps/formatting/sft.py
def process(self, *inputs: StepInput) -> "StepOutput":  # type: ignore
    """The `process` method formats the received `StepInput` or list of `StepInput`
    according to the SFT formatting standard.

    Args:
        *inputs: A list of `StepInput` to be combined.

    Yields:
        A `StepOutput` with batches of formatted `StepInput` following the SFT standard.
    """
    for input in inputs:
        for item in input:
            item["prompt"] = item["instruction"]

            item["prompt_id"] = hashlib.sha256(
                item["prompt"].encode("utf-8")  # type: ignore
            ).hexdigest()

            item["messages"] = [
                {"role": "user", "content": item["instruction"]},  # type: ignore
                {"role": "assistant", "content": item["generation"]},  # type: ignore
            ]
            if (
                "system_prompt" in item
                and isinstance(item["system_prompt"], str)  # type: ignore
                and len(item["system_prompt"]) > 0  # type: ignore
            ):
                item["messages"].insert(
                    0,
                    {"role": "system", "content": item["system_prompt"]},  # type: ignore
                )

        yield input

LoadDataFromDicts

基类: GeneratorStep

从字典列表加载数据集。

GeneratorStep 从字典列表中加载数据集并批量生成。

属性

名称 类型 描述
data List[Dict[str, Any]]

要加载数据的字典列表。

运行时参数
  • batch_size: 处理数据时使用的批次大小。
输出列
  • dynamic (基于在列表的第一个字典中找到的键): 数据集的列。
类别
  • load

示例

从字典列表加载数据

from distilabel.steps import LoadDataFromDicts

loader = LoadDataFromDicts(
    data=[{"instruction": "What are 2+2?"}] * 5,
    batch_size=2
)
loader.load()

result = next(loader.process())
# >>> result
# ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
源代码位于 src/distilabel/steps/generators/data.py
class LoadDataFromDicts(GeneratorStep):
    """Loads a dataset from a list of dictionaries.

    `GeneratorStep` that loads a dataset from a list of dictionaries and yields it in
    batches.

    Attributes:
        data: The list of dictionaries to load the data from.

    Runtime parameters:
        - `batch_size`: The batch size to use when processing the data.

    Output columns:
        - dynamic (based on the keys found on the first dictionary of the list): The columns
            of the dataset.

    Categories:
        - load

    Examples:
        Load data from a list of dictionaries:

        ```python
        from distilabel.steps import LoadDataFromDicts

        loader = LoadDataFromDicts(
            data=[{"instruction": "What are 2+2?"}] * 5,
            batch_size=2
        )
        loader.load()

        result = next(loader.process())
        # >>> result
        # ([{'instruction': 'What are 2+2?'}, {'instruction': 'What are 2+2?'}], False)
        ```
    """

    data: List[Dict[str, Any]] = Field(default_factory=list, exclude=True)

    @override
    def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
        """Yields batches from a list of dictionaries.

        Args:
            offset: The offset to start the generation from. Defaults to `0`.

        Yields:
            A list of Python dictionaries as read from the inputs (propagated in batches)
            and a flag indicating whether the yield batch is the last one.
        """
        if offset:
            self.data = self.data[offset:]

        while self.data:
            batch = self.data[: self.batch_size]
            self.data = self.data[self.batch_size :]
            yield (
                batch,
                True if len(self.data) == 0 else False,
            )

    @property
    def outputs(self) -> List[str]:
        """Returns a list of strings with the names of the columns that the step will generate."""
        return list(self.data[0].keys())
outputs property

返回包含步骤将生成的列名称的字符串列表。

process(offset=0)

从字典列表生成批次。

参数

名称 类型 描述 默认
offset int

开始生成的偏移量。默认为 0

0

Yields

类型 描述
GeneratorStepOutput

从输入读取的 Python 字典列表(以批次形式传播)

GeneratorStepOutput

以及指示产量批次是否为最后一个批次的标志。

源代码位于 src/distilabel/steps/generators/data.py
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
    """Yields batches from a list of dictionaries.

    Args:
        offset: The offset to start the generation from. Defaults to `0`.

    Yields:
        A list of Python dictionaries as read from the inputs (propagated in batches)
        and a flag indicating whether the yield batch is the last one.
    """
    if offset:
        self.data = self.data[offset:]

    while self.data:
        batch = self.data[: self.batch_size]
        self.data = self.data[self.batch_size :]
        yield (
            batch,
            True if len(self.data) == 0 else False,
        )

DataSampler

基类: GeneratorStep

从数据集采样的步骤。

GeneratorStep 从数据集采样并批量生成。当您的 pipeline 可以受益于在提示中使用示例(例如作为少样本学习)时,此步骤非常有用,这些示例可以在每一行上更改。例如,您可以传递包含 N 个示例的字典列表,并从中生成 M 个样本(假设您有另一个加载数据的步骤,则此 M 应与该步骤中加载的数据大小相同)。size S 参数是每行生成的样本数,因此每个示例将包含 S 个示例以用作示例。

属性

名称 类型 描述
data List[Dict[str, Any]]

要从中采样的字典列表。

size int

每个示例的样本数。例如,在少样本学习场景中,每个示例将生成的少样本示例的数量。默认为 2。

samples int

步骤总共将生成的示例数。如果与另一个加载器步骤一起使用,则应与加载器步骤中的样本数相同。默认为 100。

输出列
  • dynamic (基于在列表的第一个字典中找到的键): 数据集的列。
类别
  • load

示例

从字典列表采样数据

from distilabel.steps import DataSampler

sampler = DataSampler(
    data=[{"sample": f"sample {i}"} for i in range(30)],
    samples=10,
    size=2,
    batch_size=4
)
sampler.load()

result = next(sampler.process())
# >>> result
# ([{'sample': ['sample 7', 'sample 0']}, {'sample': ['sample 2', 'sample 21']}, {'sample': ['sample 17', 'sample 12']}, {'sample': ['sample 2', 'sample 14']}], False)

Pipeline,加载器和采样器组合在单个流中

from datasets import load_dataset

from distilabel.steps import LoadDataFromDicts, DataSampler
from distilabel.steps.tasks.apigen.utils import PrepareExamples
from distilabel.pipeline import Pipeline

ds = (
    load_dataset("Salesforce/xlam-function-calling-60k", split="train")
    .shuffle(seed=42)
    .select(range(500))
    .to_list()
)
data = [
    {
        "func_name": "final_velocity",
        "func_desc": "Calculates the final velocity of an object given its initial velocity, acceleration, and time.",
    },
    {
        "func_name": "permutation_count",
        "func_desc": "Calculates the number of permutations of k elements from a set of n elements.",
    },
    {
        "func_name": "getdivision",
        "func_desc": "Divides two numbers by making an API call to a division service.",
    },
]
with Pipeline(name="APIGenPipeline") as pipeline:
    loader_seeds = LoadDataFromDicts(data=data)
    sampler = DataSampler(
        data=ds,
        size=2,
        samples=len(data),
        batch_size=8,
    )
    prep_examples = PrepareExamples()

    sampler >> prep_examples
    (
        [loader_seeds, prep_examples]
        >> combine_steps
    )
# Now we have a single stream of data with the loader and the sampler data
源代码位于 src/distilabel/steps/generators/data_sampler.py
class DataSampler(GeneratorStep):
    """Step to sample from a dataset.

    `GeneratorStep` that samples from a dataset and yields it in batches.
    This step is useful when you have a pipeline that can benefit from using examples
    in the prompts for example as few-shot learning, that can be changing on each row.
    For example, you can pass a list of dictionaries with N examples and generate M samples
    from it (assuming you have another step loading data, this M should have the same size
    as the data being loaded in that step). The size S argument is the number of samples per
    row generated, so each example would contain S examples to be used as examples.

    Attributes:
        data: The list of dictionaries to sample from.
        size: Number of samples per example. For example in a few-shot learning scenario,
            the number of few-shot examples that will be generated per example. Defaults to 2.
        samples: Number of examples that will be generated by the step in total.
            If used with another loader step, this should be the same as the number
            of samples in the loader step. Defaults to 100.

    Output columns:
        - dynamic (based on the keys found on the first dictionary of the list): The columns
            of the dataset.

    Categories:
        - load

    Examples:
        Sample data from a list of dictionaries:

        ```python
        from distilabel.steps import DataSampler

        sampler = DataSampler(
            data=[{"sample": f"sample {i}"} for i in range(30)],
            samples=10,
            size=2,
            batch_size=4
        )
        sampler.load()

        result = next(sampler.process())
        # >>> result
        # ([{'sample': ['sample 7', 'sample 0']}, {'sample': ['sample 2', 'sample 21']}, {'sample': ['sample 17', 'sample 12']}, {'sample': ['sample 2', 'sample 14']}], False)
        ```

        Pipeline with a loader and a sampler combined in a single stream:

        ```python
        from datasets import load_dataset

        from distilabel.steps import LoadDataFromDicts, DataSampler
        from distilabel.steps.tasks.apigen.utils import PrepareExamples
        from distilabel.pipeline import Pipeline

        ds = (
            load_dataset("Salesforce/xlam-function-calling-60k", split="train")
            .shuffle(seed=42)
            .select(range(500))
            .to_list()
        )
        data = [
            {
                "func_name": "final_velocity",
                "func_desc": "Calculates the final velocity of an object given its initial velocity, acceleration, and time.",
            },
            {
                "func_name": "permutation_count",
                "func_desc": "Calculates the number of permutations of k elements from a set of n elements.",
            },
            {
                "func_name": "getdivision",
                "func_desc": "Divides two numbers by making an API call to a division service.",
            },
        ]
        with Pipeline(name="APIGenPipeline") as pipeline:
            loader_seeds = LoadDataFromDicts(data=data)
            sampler = DataSampler(
                data=ds,
                size=2,
                samples=len(data),
                batch_size=8,
            )
            prep_examples = PrepareExamples()

            sampler >> prep_examples
            (
                [loader_seeds, prep_examples]
                >> combine_steps
            )
        # Now we have a single stream of data with the loader and the sampler data
        ```
    """

    data: List[Dict[str, Any]] = Field(default_factory=list, exclude=True)
    size: int = Field(
        default=2,
        description=(
            "Number of samples per example. For example in a few-shot learning scenario, the number "
            "of few-shot examples that will be generated per example."
        ),
    )
    samples: int = Field(
        default=100,
        description=(
            "Number of examples that will be generated by the step in total. "
            "If used with another loader step, this should be the same as the number of "
            "samples in the loader step."
        ),
    )

    @override
    def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
        """Yields batches from a list of dictionaries.

        Args:
            offset: The offset to start the generation from. Defaults to `0`.

        Yields:
            A list of Python dictionaries as read from the inputs (propagated in batches)
            and a flag indicating whether the yield batch is the last one.
        """

        total_samples = 0

        while total_samples < self.samples:
            batch = []
            bs = min(self.batch_size, self.samples - total_samples)
            for _ in range(self.batch_size):
                choices = random.choices(self.data, k=self.size)
                choices = self._transform_data(choices)
                batch.extend(choices)
            total_samples += bs
            batch = list(islice(batch, bs))
            yield (batch, True if total_samples >= self.samples else False)
            batch = []

    @staticmethod
    def _transform_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        if not data:
            return []

        result = {key: [] for key in data[0].keys()}

        for item in data:
            for key, value in item.items():
                result[key].append(value)

        return [result]

    @property
    def outputs(self) -> List[str]:
        return list(self.data[0].keys())
process(offset=0)

从字典列表生成批次。

参数

名称 类型 描述 默认
offset int

开始生成的偏移量。默认为 0

0

Yields

类型 描述
GeneratorStepOutput

从输入读取的 Python 字典列表(以批次形式传播)

GeneratorStepOutput

以及指示产量批次是否为最后一个批次的标志。

源代码位于 src/distilabel/steps/generators/data_sampler.py
@override
def process(self, offset: int = 0) -> "GeneratorStepOutput":  # type: ignore
    """Yields batches from a list of dictionaries.

    Args:
        offset: The offset to start the generation from. Defaults to `0`.

    Yields:
        A list of Python dictionaries as read from the inputs (propagated in batches)
        and a flag indicating whether the yield batch is the last one.
    """

    total_samples = 0

    while total_samples < self.samples:
        batch = []
        bs = min(self.batch_size, self.samples - total_samples)
        for _ in range(self.batch_size):
            choices = random.choices(self.data, k=self.size)
            choices = self._transform_data(choices)
            batch.extend(choices)
        total_samples += bs
        batch = list(islice(batch, bs))
        yield (batch, True if total_samples >= self.samples else False)
        batch = []

RewardModelScore

基类: Step, CudaDevicePlacementMixin

使用奖励模型为响应分配分数。

RewardModelScore 是一个 Step,它使用 transformers 加载的奖励模型 (RM),为针对指令生成的响应分配分数,或为多轮对话分配分数。

属性

名称 类型 描述
model str

模型 Hugging Face Hub repo id 或包含模型权重和配置文件目录的路径。

revision str

如果 model 指的是 Hugging Face Hub 存储库,则要使用的修订版(例如分支名称或提交 ID)。默认为 "main"

torch_dtype str

模型要使用的 torch dtype,例如 "float16"、"float32" 等。默认为 "auto"

trust_remote_code bool

是否允许获取和执行从 Hub 中的存储库获取的远程代码。默认为 False

device_map Union[str, Dict[str, Any], None]

将模型的每一层映射到设备的字典,或类似 "sequential""auto" 的模式。默认为 None

token Union[SecretStr, None]

将用于向 Hugging Face Hub 验证身份的 Hugging Face Hub 令牌。如果未提供,则将使用 HF_TOKEN 环境变量或 huggingface_hub 包本地配置。默认为 None

truncation bool

是否在最大长度处截断序列。默认为 False

max_length Union[int, None]

用于填充或截断的最大长度。默认为 None

输入列
  • instruction (str, 可选): 用于生成 response 的指令。如果提供,则也必须提供 response
  • response (str, 可选): 针对 instruction 生成的响应。如果提供,则也必须提供 instruction
  • conversation (ChatType, 可选): 多轮对话。如果未提供,则必须提供 instructionresponse 列。
输出列
  • score (float): 奖励模型为指令-响应对或对话给出的分数。
类别
  • scorer

示例

为指令-响应对分配分数

from distilabel.steps import RewardModelScore

step = RewardModelScore(
    model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
)

step.load()

result = next(
    step.process(
        inputs=[
            {
                "instruction": "How much is 2+2?",
                "response": "The output of 2+2 is 4",
            },
            {"instruction": "How much is 2+2?", "response": "4"},
        ]
    )
)
# [
#   {'instruction': 'How much is 2+2?', 'response': 'The output of 2+2 is 4', 'score': 0.11690367758274078},
#   {'instruction': 'How much is 2+2?', 'response': '4', 'score': 0.10300665348768234}
# ]

为多轮对话分配分数

from distilabel.steps import RewardModelScore

step = RewardModelScore(
    model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
)

step.load()

result = next(
    step.process(
        inputs=[
            {
                "conversation": [
                    {"role": "user", "content": "How much is 2+2?"},
                    {"role": "assistant", "content": "The output of 2+2 is 4"},
                ],
            },
            {
                "conversation": [
                    {"role": "user", "content": "How much is 2+2?"},
                    {"role": "assistant", "content": "4"},
                ],
            },
        ]
    )
)
# [
#   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': 'The output of 2+2 is 4'}], 'score': 0.11690367758274078},
#   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': '4'}], 'score': 0.10300665348768234}
# ]
源代码位于 src/distilabel/steps/reward_model.py
class RewardModelScore(Step, CudaDevicePlacementMixin):
    """Assign a score to a response using a Reward Model.

    `RewardModelScore` is a `Step` that using a Reward Model (RM) loaded using `transformers`,
    assigns an score to a response generated for an instruction, or a score to a multi-turn
    conversation.

    Attributes:
        model: the model Hugging Face Hub repo id or a path to a directory containing the
            model weights and configuration files.
        revision: if `model` refers to a Hugging Face Hub repository, then the revision
            (e.g. a branch name or a commit id) to use. Defaults to `"main"`.
        torch_dtype: the torch dtype to use for the model e.g. "float16", "float32", etc.
            Defaults to `"auto"`.
        trust_remote_code: whether to allow fetching and executing remote code fetched
            from the repository in the Hub. Defaults to `False`.
        device_map: a dictionary mapping each layer of the model to a device, or a mode like `"sequential"` or `"auto"`. Defaults to `None`.
        token: the Hugging Face Hub token that will be used to authenticate to the Hugging
            Face Hub. If not provided, the `HF_TOKEN` environment or `huggingface_hub` package
            local configuration will be used. Defaults to `None`.
        truncation: whether to truncate sequences at the maximum length. Defaults to `False`.
        max_length: maximun length to use for padding or truncation. Defaults to `None`.

    Input columns:
        - instruction (`str`, optional): the instruction used to generate a `response`.
            If provided, then `response` must be provided too.
        - response (`str`, optional): the response generated for `instruction`. If provided,
            then `instruction` must be provide too.
        - conversation (`ChatType`, optional): a multi-turn conversation. If not provided,
            then `instruction` and `response` columns must be provided.

    Output columns:
        - score (`float`): the score given by the reward model for the instruction-response
            pair or the conversation.

    Categories:
        - scorer

    Examples:
        Assigning an score for an instruction-response pair:

        ```python
        from distilabel.steps import RewardModelScore

        step = RewardModelScore(
            model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
        )

        step.load()

        result = next(
            step.process(
                inputs=[
                    {
                        "instruction": "How much is 2+2?",
                        "response": "The output of 2+2 is 4",
                    },
                    {"instruction": "How much is 2+2?", "response": "4"},
                ]
            )
        )
        # [
        #   {'instruction': 'How much is 2+2?', 'response': 'The output of 2+2 is 4', 'score': 0.11690367758274078},
        #   {'instruction': 'How much is 2+2?', 'response': '4', 'score': 0.10300665348768234}
        # ]
        ```

        Assigning an score for a multi-turn conversation:

        ```python
        from distilabel.steps import RewardModelScore

        step = RewardModelScore(
            model="RLHFlow/ArmoRM-Llama3-8B-v0.1", device_map="auto", trust_remote_code=True
        )

        step.load()

        result = next(
            step.process(
                inputs=[
                    {
                        "conversation": [
                            {"role": "user", "content": "How much is 2+2?"},
                            {"role": "assistant", "content": "The output of 2+2 is 4"},
                        ],
                    },
                    {
                        "conversation": [
                            {"role": "user", "content": "How much is 2+2?"},
                            {"role": "assistant", "content": "4"},
                        ],
                    },
                ]
            )
        )
        # [
        #   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': 'The output of 2+2 is 4'}], 'score': 0.11690367758274078},
        #   {'conversation': [{'role': 'user', 'content': 'How much is 2+2?'}, {'role': 'assistant', 'content': '4'}], 'score': 0.10300665348768234}
        # ]
        ```
    """

    model: str
    revision: str = "main"
    torch_dtype: str = "auto"
    trust_remote_code: bool = False
    device_map: Union[str, Dict[str, Any], None] = None
    token: Union[SecretStr, None] = Field(
        default_factory=lambda: os.getenv(HF_TOKEN_ENV_VAR), description=""
    )
    truncation: bool = False
    max_length: Union[int, None] = None

    _model: Union["PreTrainedModel", None] = PrivateAttr(None)
    _tokenizer: Union["PreTrainedTokenizer", None] = PrivateAttr(None)

    def load(self) -> None:
        super().load()

        if self.device_map in ["cuda", "auto"]:
            CudaDevicePlacementMixin.load(self)

        try:
            from transformers import AutoModelForSequenceClassification, AutoTokenizer
        except ImportError as e:
            raise ImportError(
                "`transformers` is not installed. Please install it using `pip install 'distilabel[hf-transformers]'`."
            ) from e

        token = self.token.get_secret_value() if self.token is not None else self.token

        self._model = AutoModelForSequenceClassification.from_pretrained(
            self.model,
            revision=self.revision,
            torch_dtype=self.torch_dtype,
            trust_remote_code=self.trust_remote_code,
            device_map=self.device_map,
            token=token,
        )
        self._tokenizer = AutoTokenizer.from_pretrained(
            self.model,
            revision=self.revision,
            torch_dtype=self.torch_dtype,
            trust_remote_code=self.trust_remote_code,
            token=token,
        )

    @property
    def inputs(self) -> "StepColumns":
        """Either `response` and `instruction`, or a `conversation` columns."""
        return {
            "response": False,
            "instruction": False,
            "conversation": False,
        }

    @property
    def outputs(self) -> "StepColumns":
        """The `score` given by the reward model."""
        return ["score"]

    def _prepare_conversation(self, input: Dict[str, Any]) -> "ChatType":
        if "instruction" in input and "response" in input:
            return [
                {"role": "user", "content": input["instruction"]},
                {"role": "assistant", "content": input["response"]},
            ]

        return input["conversation"]

    def _prepare_inputs(self, inputs: List[Dict[str, Any]]) -> "torch.Tensor":
        return self._tokenizer.apply_chat_template(  # type: ignore
            [self._prepare_conversation(input) for input in inputs],  # type: ignore
            return_tensors="pt",
            padding=True,
            truncation=self.truncation,
            max_length=self.max_length,
        ).to(self._model.device)  # type: ignore

    def _inference(self, inputs: List[Dict[str, Any]]) -> List[float]:
        import torch

        input_ids = self._prepare_inputs(inputs)
        with torch.no_grad():
            output = self._model(input_ids)  # type: ignore
            logits = output.logits
            if logits.shape == (2, 1):
                logits = logits.squeeze(-1)
            return logits.tolist()

    def process(self, inputs: StepInput) -> "StepOutput":  # type: ignore
        scores = self._inference(inputs)
        for input, score in zip(inputs, scores):
            input["score"] = score
        yield inputs

    def unload(self) -> None:
        if self.device_map in ["cuda", "auto"]:
            CudaDevicePlacementMixin.unload(self)
        super().unload()
inputs property

responseinstruction,或 conversation 列。

outputs property

奖励模型给出的 score

TruncateTextColumn

Bases: Step

使用 tokenizer 或字符数截断行。

TruncateTextColumn 是一个 Step,它根据最大长度截断行。如果提供了 tokenizer,则将使用 tokenizer 截断行,并且 max_length 将用作最大 token 数;否则,它将用作最大字符数。当想要将行截断为特定长度,以避免模型中由于长度引起的后续错误时,TruncateTextColumn 步骤非常有用。

属性

名称 类型 描述
column str

要截断的列。默认为 "text"

max_length int

用于截断的最大长度。如果给定 tokenizer,则对应于 token 数,否则对应于字符数。默认为 8192

tokenizer Optional[str]

要使用的 tokenizer 的名称。如果提供,则将使用 tokenizer 截断行。默认为 None

输入列
  • dynamic (由 column 属性确定): 要截断的列,默认为 "text"。
输出列
  • dynamic (由 column 属性确定): 截断后的列。
类别
  • text-manipulation

示例

将行截断为给定的 token 数

from distilabel.steps import TruncateTextColumn

trunc = TruncateTextColumn(
    tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
    max_length=4,
    column="text"
)

trunc.load()

result = next(
    trunc.process(
        [
            {"text": "This is a sample text that is longer than 10 characters"}
        ]
    )
)
# result
# [{'text': 'This is a sample'}]

将行截断为给定的字符数

from distilabel.steps import TruncateTextColumn

trunc = TruncateTextColumn(max_length=10)

trunc.load()

result = next(
    trunc.process(
        [
            {"text": "This is a sample text that is longer than 10 characters"}
        ]
    )
)
# result
# [{'text': 'This is a '}]
源代码位于 src/distilabel/steps/truncate.py
class TruncateTextColumn(Step):
    """Truncate a row using a tokenizer or the number of characters.

    `TruncateTextColumn` is a `Step` that truncates a row according to the max length. If
    the `tokenizer` is provided, then the row will be truncated using the tokenizer,
    and the `max_length` will be used as the maximum number of tokens, otherwise it will
    be used as the maximum number of characters. The `TruncateTextColumn` step is useful when one
    wants to truncate a row to a certain length, to avoid posterior errors in the model due
    to the length.

    Attributes:
        column: the column to truncate. Defaults to `"text"`.
        max_length: the maximum length to use for truncation.
            If a `tokenizer` is given, corresponds to the number of tokens,
            otherwise corresponds to the number of characters. Defaults to `8192`.
        tokenizer: the name of the tokenizer to use. If provided, the row will be
            truncated using the tokenizer. Defaults to `None`.

    Input columns:
        - dynamic (determined by `column` attribute): The columns to be truncated, defaults to "text".

    Output columns:
        - dynamic (determined by `column` attribute): The truncated column.

    Categories:
        - text-manipulation

    Examples:
        Truncating a row to a given number of tokens:

        ```python
        from distilabel.steps import TruncateTextColumn

        trunc = TruncateTextColumn(
            tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
            max_length=4,
            column="text"
        )

        trunc.load()

        result = next(
            trunc.process(
                [
                    {"text": "This is a sample text that is longer than 10 characters"}
                ]
            )
        )
        # result
        # [{'text': 'This is a sample'}]
        ```

        Truncating a row to a given number of characters:

        ```python
        from distilabel.steps import TruncateTextColumn

        trunc = TruncateTextColumn(max_length=10)

        trunc.load()

        result = next(
            trunc.process(
                [
                    {"text": "This is a sample text that is longer than 10 characters"}
                ]
            )
        )
        # result
        # [{'text': 'This is a '}]
        ```
    """

    column: str = "text"
    max_length: int = 8192
    tokenizer: Optional[str] = None
    _truncator: Optional[Callable[[str], str]] = None
    _tokenizer: Optional[Any] = None

    def load(self):
        super().load()
        if self.tokenizer:
            if not importlib.util.find_spec("transformers"):
                raise ImportError(
                    "`transformers` is needed to tokenize, but is not installed. "
                    "Please install it using `pip install 'distilabel[hf-transformers]'`."
                )

            from transformers import AutoTokenizer

            self._tokenizer = AutoTokenizer.from_pretrained(self.tokenizer)
            self._truncator = self._truncate_with_tokenizer
        else:
            self._truncator = self._truncate_with_length

    @property
    def inputs(self) -> List[str]:
        return [self.column]

    @property
    def outputs(self) -> List[str]:
        return self.inputs

    def _truncate_with_length(self, text: str) -> str:
        """Truncates the text according to the number of characters."""
        return text[: self.max_length]

    def _truncate_with_tokenizer(self, text: str) -> str:
        """Truncates the text according to the number of characters using the tokenizer."""
        return self._tokenizer.decode(
            self._tokenizer.encode(
                text,
                add_special_tokens=False,
                max_length=self.max_length,
                truncation=True,
            )
        )

    @override
    def process(self, inputs: StepInput) -> "StepOutput":
        for input in inputs:
            input[self.column] = self._truncator(input[self.column])
        yield inputs
_truncate_with_length(text)

根据字符数截断文本。

源代码位于 src/distilabel/steps/truncate.py
def _truncate_with_length(self, text: str) -> str:
    """Truncates the text according to the number of characters."""
    return text[: self.max_length]
_truncate_with_tokenizer(text)

使用 tokenizer 根据字符数截断文本。

源代码位于 src/distilabel/steps/truncate.py
def _truncate_with_tokenizer(self, text: str) -> str:
    """Truncates the text according to the number of characters using the tokenizer."""
    return self._tokenizer.decode(
        self._tokenizer.encode(
            text,
            add_special_tokens=False,
            max_length=self.max_length,
            truncation=True,
        )
    )