在 Pipeline 中执行步骤和任务¶
如何创建 pipeline¶
Pipeline
将步骤和任务组织成一个序列,其中一个步骤的输出是下一个步骤的输入。Pipeline
应该通过使用上下文管理器以及传递 name 和可选的 description 来创建。
from distilabel.pipeline import Pipeline
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
使用 Step.connect
方法连接步骤¶
现在,我们可以定义 Pipeline
的步骤。
注意
没有前置步骤的步骤(即根步骤)需要是 GeneratorStep
,例如 LoadDataFromDicts
或 LoadDataFromHub
。在此之后,可以定义其他步骤。
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(name="load_dataset")
...
轻松加载您的数据集
如果您已经习惯于通过 load_dataset
或 pd.DataFrame
使用 Hugging Face 的 Dataset
,您可以直接从数据集(或数据帧)创建 GeneratorStep
,并在 make_generator_step
的帮助下创建步骤
from datasets import load_dataset
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step
dataset = load_dataset(
"DIBT/10k_prompts_ranked",
split="train"
).filter(
lambda r: r["avg_rating"]>=4 and r["num_responses"]>=2
).select(range(500))
with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
import pandas as pd
from distilabel.pipeline import Pipeline
from distilabel.steps import make_generator_step
dataset = pd.read_csv("path/to/dataset.csv")
with Pipeline("pipe-name", description="My first pipe") as pipeline:
loader = make_generator_step(dataset, output_mappings={"prompt": "instruction"})
...
接下来,我们将使用从通过 LoadDataFromHub
获取的数据集中的 prompt
列,并使用多个 LLM
来执行 TextGeneration
任务。我们还将使用 Task.connect()
方法连接步骤,以便一个步骤的输出是下一个步骤的输入。
注意
步骤的执行顺序将由步骤的连接决定。在本例中,TextGeneration
任务将在 LoadDataFromHub
步骤之后执行。
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(name="load_dataset")
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
task.connect(load_dataset)
...
对于数据集的每一行,TextGeneration
任务将基于 instruction
列和 LLM
模型生成文本,并将结果(单个字符串)存储在一个名为 generation
的新列中。因为我们需要在同一列中拥有 response
,我们将添加 GroupColumns
以将它们全部合并到同一列中,作为字符串列表。
注意
在本例中,GroupColumns
任务将在所有 TextGeneration
步骤之后执行。
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(name="load_dataset")
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset.connect(task)
task.connect(combine_generations)
使用 >>
运算符连接步骤¶
除了 Step.connect
方法:step1.connect(step2)
之外,还有另一种方法是使用 >>
运算符。我们可以以更易读的方式连接步骤,也可以一次连接多个步骤。
对 step1.connect(step2)
的每次调用都已在循环中被 step1 >> step2
替换。
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(name="load_dataset")
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
load_dataset >> task >> combine_generations
每个任务首先被附加到一个列表,然后所有连接调用都在一个调用中完成。
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(name="load_dataset")
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)
load_dataset >> tasks >> combine_generations
将批次路由到特定的下游步骤¶
在某些 pipeline 中,您可能希望根据特定条件将来自单个上游步骤的批次发送到特定的下游步骤。为了实现这一点,您可以使用 routing_batch_function
。此函数接受下游步骤列表,并返回每个批次应路由到的步骤名称列表。
让我们更新上面的示例,将由 LoadDataFromHub
步骤加载的批次仅路由到 2 个 TextGeneration
任务。首先,我们将创建自定义的 routing_batch_function
,然后我们将更新 pipeline 以使用它
import random
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
return random.sample(steps, 2)
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
load_dataset >> sample_two_steps >> tasks >> combine_generations
我们刚刚构建的 routing_batch_function
是一个通用的函数,因此 distilabel
附带了一个内置函数,可用于实现相同的行为
运行 pipeline¶
Pipeline.dry_run¶
在运行 Pipeline
之前,我们可以使用 Pipeline.dry_run()
方法检查 pipeline 是否有效。它接受与 run
方法相同的参数(我们将在下一节中讨论),以及我们希望 dry run 使用的 batch_size
(默认设置为 1)。
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
if __name__ == "__main__":
distiset = pipeline.dry_run(parameters=..., batch_size=1)
Pipeline.run¶
测试之后,我们现在可以使用 Pipeline.run()
方法执行完整的 Pipeline
。
with Pipeline("pipe-name", description="My first pipe") as pipeline:
...
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
但是,如果我们运行上面的 pipeline,我们将看到 run
方法将失败
ValueError: Step 'text_generation_with_gpt-4-0125-preview' requires inputs ['instruction'], but only the inputs=['prompt', 'completion', 'meta'] are available, which means that the inputs=['instruction'] are missing or not available
when the step gets to be executed in the pipeline. Please make sure previous steps to 'text_generation_with_gpt-4-0125-preview' are generating the required inputs.
这是因为,在实际运行 pipeline 之前,我们必须确保每个步骤都具有执行所需的输入列。在本例中,TextGeneration
任务需要 instruction
列,但 LoadDataFromHub
步骤生成 prompt
列。为了解决这个问题,我们可以使用各个 Step
的 output_mappings
或 input_mapping
参数,以将列从一个步骤映射到另一个步骤。
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(
name="load_dataset",
output_mappings={"prompt": "instruction"}
)
...
如果我们再次执行 pipeline,它将成功运行,我们将得到一个 Distiset
,其中包含 pipeline 所有叶子步骤的输出,我们可以将其推送到 Hugging Face Hub。
if __name__ == "__main__":
distiset = pipeline.run(...)
distiset.push_to_hub("distilabel-internal-testing/instruction-dataset-mini-with-generations")
使用数据集运行 Pipeline.run¶
请注意,在大多数情况下,如果您不需要 GeneratorSteps
为您带来的额外灵活性,您可以像往常一样创建一个数据集,并将其直接传递给 Pipeline.run 方法。查看突出显示的行以查看更新后的行
import random
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline, routing_batch_function
from distilabel.steps import GroupColumns
from distilabel.steps.tasks import TextGeneration
@routing_batch_function
def sample_two_steps(steps: list[str]) -> list[str]:
return random.sample(steps, 2)
dataset = load_dataset(
"distilabel-internal-testing/instruction-dataset-mini",
split="test"
)
with Pipeline("pipe-name", description="My first pipe") as pipeline:
tasks = []
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
tasks.append(
TextGeneration(name=f"text_generation_with_{llm.model_name}", llm=llm)
)
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
sample_two_steps >> tasks >> combine_generations
if __name__ == "__main__":
distiset = pipeline.run(
dataset=dataset,
parameters=...
)
停止 pipeline¶
如果您想在 pipeline 运行时停止它,您可以按 Ctrl+C 或 Cmd+C (取决于您的操作系统)或向主进程发送 SIGINT
,输出将存储在缓存中。再次按下将强制 pipeline 停止执行,但这可能会导致某些批次的生成输出丢失。
缓存¶
如果由于某种原因,pipeline 执行停止(例如,按 Ctrl+C
),pipeline 的状态和输出将存储在缓存中,因此我们可以从停止的点恢复 pipeline 执行。
如果我们想强制 pipeline 再次运行而不使用缓存,那么我们可以使用 Pipeline.run()
方法的 use_cache
参数
注意
有关缓存的更多信息,我们请读者参考 caching 部分。
调整每个步骤的批次大小¶
当处理大型数据集或使用大型模型时,可能会出现内存问题。为了避免这种情况,我们可以使用各个任务的 input_batch_size
参数。TextGeneration
任务将接收 5 个字典,而 LoadDataFromHub
步骤将每个批次发送 10 个字典
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(
name="load_dataset",
output_mappings={"prompt": "instruction"},
batch_size=10
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.5-pro"),
):
task = TextGeneration(
name=f"text_generation_with_{llm.model_name.replace('.', '-')}",
llm=llm,
input_batch_size=5,
)
...
序列化 pipeline¶
与他人共享 pipeline 非常容易,因为我们可以使用 save
方法序列化 pipeline 对象。我们可以将 pipeline 保存为不同的格式,例如 yaml
或 json
要加载 pipeline,我们可以使用 from_yaml
或 from_json
方法
当我们想与他人共享 pipeline,或者当我们想存储 pipeline 以供将来使用时,序列化 pipeline 非常有用。它甚至可以托管在网上,因此可以使用 CLI 直接执行 pipeline。
可视化 pipeline¶
我们可以使用 Pipeline.draw()
方法可视化 pipeline。这将创建一个 mermaid
图,并返回图像的路径。
在 notebook 中,我们可以简单地调用 pipeline
,图表将显示出来。或者,我们可以使用 Pipeline.draw()
方法来更好地控制图表可视化,并使用 IPython
来显示它。
现在让我们看看 完整的工作示例 的 pipeline 是什么样子的。
完整的工作示例¶
总而言之,这是我们在本节中创建的 pipeline 的完整代码。请注意,您需要更改 Hugging Face 仓库的名称,结果将被推送到该仓库,设置 OPENAI_API_KEY
环境变量,设置 MISTRAL_API_KEY
并安装和配置 gcloud
代码
from distilabel.models import MistralLLM, OpenAILLM, VertexAILLM
from distilabel.pipeline import Pipeline
from distilabel.steps import GroupColumns, LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadDataFromHub(
name="load_dataset",
output_mappings={"prompt": "instruction"},
)
combine_generations = GroupColumns(
name="combine_generations",
columns=["generation", "model_name"],
output_columns=["generations", "model_names"],
)
for llm in (
OpenAILLM(model="gpt-4-0125-preview"),
MistralLLM(model="mistral-large-2402"),
VertexAILLM(model="gemini-1.0-pro"),
):
task = TextGeneration(
name=f"text_generation_with_{llm.model_name.replace('.', '-')}", llm=llm
)
load_dataset.connect(task)
task.connect(combine_generations)
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
"load_dataset": {
"repo_id": "distilabel-internal-testing/instruction-dataset-mini",
"split": "test",
},
"text_generation_with_gpt-4-0125-preview": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_mistral-large-2402": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
"text_generation_with_gemini-1.0-pro": {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 512,
}
}
},
},
)
distiset.push_to_hub(
"distilabel-internal-testing/instruction-dataset-mini-with-generations"
)