跳到内容

在 Pipeline 中执行步骤和任务

如何创建 pipeline

Pipeline 将步骤和任务组织成一个序列,其中一个步骤的输出是下一个步骤的输入。Pipeline 应该通过使用上下文管理器以及传递 name 和可选的 description 来创建。

from distilabel.pipeline import Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

使用 Step.connect 方法连接步骤

现在,我们可以定义 Pipeline 的步骤。

注意

没有前置步骤的步骤(即根步骤)需要是 GeneratorStep,例如 LoadDataFromDictsLoadDataFromHub。在此之后,可以定义其他步骤。

from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")
    ...

轻松加载您的数据集

如果您已经习惯于通过 load_datasetpd.DataFrame 使用 Hugging Face 的 Dataset,您可以直接从数据集(或数据帧)创建 GeneratorStep,并在 make_generator_step 的帮助下创建步骤

from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = [{"instruction": "Tell me a joke."}]

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
    ...
from datasets import load_dataset
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = load_dataset(
    "DIBT/10k_prompts_ranked",
    split="train"
).filter(
    lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2
).select(range(500))

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
    ...
import pandas as pd
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step

dataset = pd.read_csv("path/to/dataset.csv")

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
    ...

接下来,我们将使用从通过 LoadDataFromHub 获取的数据集中的 prompt 列,并使用多个 LLM 来执行 TextGeneration 任务。我们还将使用 Task.connect() 方法连接步骤,以便一个步骤的输出是下一个步骤的输入。

注意

步骤的执行顺序将由步骤的连接决定。在本例中,TextGeneration 任务将在 LoadDataFromHub 步骤之后执行。

from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        task.connect(load_dataset)

    ...

对于数据集的每一行,TextGeneration 任务将基于 instruction 列和 LLM 模型生成文本,并将结果(单个字符串)存储在一个名为 generation 的新列中。因为我们需要在同一列中拥有 response,我们将添加 GroupColumns 以将它们全部合并到同一列中,作为字符串列表。

注意

在本例中,GroupColumns 任务将在所有 TextGeneration 步骤之后执行。

from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset.connect(task)
        task.connect(combine_generations)

使用 >> 运算符连接步骤

除了 Step.connect 方法:step1.connect(step2) 之外,还有另一种方法是使用 >> 运算符。我们可以以更易读的方式连接步骤,也可以一次连接多个步骤。

step1.connect(step2) 的每次调用都已在循环中被 step1 >> step2 替换。

from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        load_dataset >> task >> combine_generations

每个任务首先被附加到一个列表,然后所有连接调用都在一个调用中完成。

from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(name="load_dataset")

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    load_dataset >> tasks >> combine_generations

将批次路由到特定的下游步骤

在某些 pipeline 中,您可能希望根据特定条件将来自单个上游步骤的批次发送到特定的下游步骤。为了实现这一点,您可以使用 routing_batch_function。此函数接受下游步骤列表,并返回每个批次应路由到的步骤名称列表。

让我们更新上面的示例,将由 LoadDataFromHub 步骤加载的批次仅路由到 2 个 TextGeneration 任务。首先,我们将创建自定义的 routing_batch_function,然后我们将更新 pipeline 以使用它

import random
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
    return random.sample(steps, 2)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    load_dataset >> sample_two_steps >> tasks >> combine_generations

我们刚刚构建的 routing_batch_function 是一个通用的函数,因此 distilabel 附带了一个内置函数,可用于实现相同的行为

from distilable.pipeline import sample_n_steps

sample_two_steps = sample_n_steps(2)

运行 pipeline

Pipeline.dry_run

在运行 Pipeline 之前,我们可以使用 Pipeline.dry_run() 方法检查 pipeline 是否有效。它接受与 run 方法相同的参数(我们将在下一节中讨论),以及我们希望 dry run 使用的 batch_size(默认设置为 1)。

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.dry_run(parameters=..., batch_size=1)

Pipeline.run

测试之后,我们现在可以使用 Pipeline.run() 方法执行完整的 Pipeline

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )

但是,如果我们运行上面的 pipeline,我们将看到 run 方法将失败

ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'], but only the inputs=['prompt', 'completion', 'meta'] are available, which means that the inputs=['instruction'] are missing or not available
when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are generating the required inputs.

这是因为,在实际运行 pipeline 之前,我们必须确保每个步骤都具有执行所需的输入列。在本例中,TextGeneration 任务需要 instruction 列,但 LoadDataFromHub 步骤生成 prompt 列。为了解决这个问题,我们可以使用各个 Stepoutput_mappingsinput_mapping 参数,以将列从一个步骤映射到另一个步骤。

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"}
    )

    ...

如果我们再次执行 pipeline,它将成功运行,我们将得到一个 Distiset,其中包含 pipeline 所有叶子步骤的输出,我们可以将其推送到 Hugging Face Hub。

if __name__ == "__main__":
    distiset = pipeline.run(...)
    distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")

使用数据集运行 Pipeline.run

请注意,在大多数情况下,如果您不需要 GeneratorSteps 为您带来的额外灵活性,您可以像往常一样创建一个数据集,并将其直接传递给 Pipeline.run 方法。查看突出显示的行以查看更新后的行

import random
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns
from distilabel.steps.tasks import TextGeneration

@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
    return random.sample(steps, 2)

dataset = load_dataset(
    "distilabel-internal-testing/instruction-dataset-mini",
    split="test"
)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    tasks = []
    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        tasks.append(
            TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
        )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    sample_two_steps >> tasks >> combine_generations


if __name__ == "__main__":
    distiset = pipeline.run(
        dataset=dataset,
        parameters=...
    )

停止 pipeline

如果您想在 pipeline 运行时停止它,您可以按 Ctrl+CCmd+C (取决于您的操作系统)或向主进程发送 SIGINT,输出将存储在缓存中。再次按下将强制 pipeline 停止执行,但这可能会导致某些批次的生成输出丢失。

缓存

如果由于某种原因,pipeline 执行停止(例如,按 Ctrl+C),pipeline 的状态和输出将存储在缓存中,因此我们可以从停止的点恢复 pipeline 执行。

如果我们想强制 pipeline 再次运行而不使用缓存,那么我们可以使用 Pipeline.run() 方法的 use_cache 参数

if __name__ == "__main__":
    distiset = pipeline.run(parameters={...}, use_cache=False)

注意

有关缓存的更多信息,我们请读者参考 caching 部分。

调整每个步骤的批次大小

当处理大型数据集或使用大型模型时,可能会出现内存问题。为了避免这种情况,我们可以使用各个任务的 input_batch_size 参数。TextGeneration 任务将接收 5 个字典,而 LoadDataFromHub 步骤将每个批次发送 10 个字典

from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
        batch_size=10
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.5-pro"),
    ):
        task = TextGeneration(
            name=f"text_generation_with_{llm.model_name.replace('.', '-')}",
            llm=llm,
            input_batch_size=5,
        )

    ...

序列化 pipeline

与他人共享 pipeline 非常容易,因为我们可以使用 save 方法序列化 pipeline 对象。我们可以将 pipeline 保存为不同的格式,例如 yamljson

if __name__ == "__main__":
    pipeline.save("pipeline.yaml", format="yaml")
if __name__ == "__main__":
    pipeline.save("pipeline.json", format="json")

要加载 pipeline,我们可以使用 from_yamlfrom_json 方法

pipeline = Pipeline.from_yaml("pipeline.yaml")
pipeline = Pipeline.from_json("pipeline.json")

当我们想与他人共享 pipeline,或者当我们想存储 pipeline 以供将来使用时,序列化 pipeline 非常有用。它甚至可以托管在网上,因此可以使用 CLI 直接执行 pipeline。

可视化 pipeline

我们可以使用 Pipeline.draw() 方法可视化 pipeline。这将创建一个 mermaid 图,并返回图像的路径。

path_to_image = pipeline.draw(
    top_to_bottom=True,
    show_edge_labels=True,
)

在 notebook 中,我们可以简单地调用 pipeline,图表将显示出来。或者,我们可以使用 Pipeline.draw() 方法来更好地控制图表可视化,并使用 IPython 来显示它。

from IPython.display import Image, display

display(Image(path_to_image))

现在让我们看看 完整的工作示例 的 pipeline 是什么样子的。

Pipeline

完整的工作示例

总而言之,这是我们在本节中创建的 pipeline 的完整代码。请注意,您需要更改 Hugging Face 仓库的名称,结果将被推送到该仓库,设置 OPENAI_API_KEY 环境变量,设置 MISTRAL_API_KEY 并安装和配置 gcloud

代码
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadDataFromHub(
        name="load_dataset",
        output_mappings={"prompt": "instruction"},
    )

    combine_generations = GroupColumns(
        name="combine_generations",
        columns=["generation", "model_name"],
        output_columns=["generations", "model_names"],
    )

    for llm in (
        OpenAILLM(model="gpt-4-0125-preview"),
        MistralLLM(model="mistral-large-2402"),
        VertexAILLM(model="gemini-1.0-pro"),
    ):
        task = TextGeneration(
            name=f"text_generation_with_{llm.model_name.replace('.', '-')}", llm=llm
        )
        load_dataset.connect(task)
        task.connect(combine_generations)

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            "load_dataset": {
                "repo_id": "distilabel-internal-testing/instruction-dataset-mini",
                "split": "test",
            },
            "text_generation_with_gpt-4-0125-preview": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_mistral-large-2402": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
            "text_generation_with_gemini-1.0-pro": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 512,
                    }
                }
            },
        },
    )
    distiset.push_to_hub(
        "distilabel-internal-testing/instruction-dataset-mini-with-generations"
    )