跳到内容

Pipeline 缓存

distilabel 将自动保存每个 StepPipeline 生成的所有中间输出,因此可以重用这些输出以恢复在完成前停止的 pipeline 执行状态,或者在添加新的下游步骤后不必重新执行 pipeline 中的步骤。

如何启用/禁用缓存

可以使用 Pipeline.use_cache 方法的 use_cache 参数来切换缓存的使用。如果为 True,则 distilabel 将重用先前执行的输出以进行新的执行。如果为 False,则 distilabel 将重新执行 pipeline 的所有步骤,以生成所有步骤的新输出。

with Pipeline(name="my-pipeline") as pipeline:
    ...

if __name__ == "__main__":
    distiset = pipeline.run(use_cache=False)  # (1)
  1. Pipeline 缓存已禁用

此外,可以使用 Step 级别的 use_cache 属性启用/禁用缓存。如果为 True,则将在新的 pipeline 执行中重用步骤的输出。如果为 False,则将重新执行该步骤以生成新的输出。如果禁用了一个步骤的缓存并且必须重新生成输出,则还将重新生成依赖于此步骤的步骤的输出。

with Pipeline(name="writting-assistant") as pipeline:
    load_data = LoadDataFromDicts(
        data=[
            {
                "instruction": "How much is 2+2?"
            }
        ]
    )

    generation = TextGeneration(
        llm=InferenceEndpointsLLM(
            model_id="Qwen/Qwen2.5-72B-Instruct",
            generation_kwargs={
                "temperature": 0.8,
                "max_new_tokens": 512,
            },
        ),
        use_cache=False  # (1)
    )

    load_data >> generation

if __name__ == "__main__":
    distiset = pipeline.run()
  1. 步骤缓存已禁用,并且每次执行 pipeline 时,都将重新执行此步骤

如何触发缓存命中

distilabel 使用 pipeline 的名称对 pipeline 生成的信息和数据进行分组,因此触发缓存命中的第一个因素是 pipeline 的名称。第二个因素是 Pipeline.signature 属性。此属性返回一个哈希值,该哈希值是使用 pipeline 中使用的步骤的名称及其连接生成的。第三个因素是 Pipeline.aggregated_steps_signature 属性,该属性用于确定新的 pipeline 执行是否与之前的执行完全相同,即 pipeline 包含完全相同的步骤,具有完全相同的连接,并且这些步骤使用完全相同的参数。如果满足这三个因素,则会触发缓存命中,并且 pipeline 不会重新执行,而是使用函数 create_distiset 使用先前执行的输出创建结果 Distiset,如下图所示

Complete cache hit

如果新的 pipeline 执行具有不同的 Pipeline.aggregated_steps_signature,即至少一个步骤更改了其参数,则 distilabel 将重用未更改的步骤的输出并重新执行已更改的步骤,如下图所示

Partial cache hit

从上面相同的 pipeline 第三次执行,但是这次最后一步 text_generation_1 发生了更改,因此需要重新执行它。其他步骤由于没有更改,因此不需要重新执行,并且它们的输出将被重用。