跳到内容

创建数据集以使用 Math-Shepherd 训练过程奖励模型

此示例将介绍 Math-Shepherd:无需人工标注即可逐步验证和增强 LLM,这是一种创新的数学过程奖励模型 (PRM),它为数学问题解决方案的每个步骤分配奖励分数。具体来说,我们将介绍创建数据集以训练此类模型的方法。最后几节包含 2 个 pipeline 示例,用于运行具有不同资源的 pipeline。

Replica

与仅关注最终答案的传统模型(输出奖励模型或 ORM)不同,此系统评估数学解决方案的每个步骤,并为单个解决方案步骤分配奖励分数。让我们看一下论文中的图 2,其中总结了他们工作中提出的标注方法。

Math-Shepherd framework

在传统的 ORM 方法中,标注是根据最终结果完成的,而过程奖励模型 (PRM) 允许标注导致解决方案的不同步骤,从而获得更丰富的信息集。

涉及的步骤

  • MathShepherdGenerator:此步骤负责为指令生成解决方案。根据为 M 设置的值,此步骤可用于生成 golden_solution(用作标注器的参考)或要标注的 solutions 集。对于 solutions 列,我们希望有一定的多样性,以使模型能够获得好坏两种解决方案,以便我们为标注器提供具有代表性的样本,因此最好使用“较弱”的模型。

  • MathShepherdCompleter。此任务执行论文中 completer 的工作,生成如图 2 第 3.3.2 节所示的补全。它不会自行生成列,而是更新 MathShepherdGeneratorsolutions 列中生成的步骤,使用 golden_solution 作为标注数据的参考。因此,为了使此步骤有效,我们需要数据集中同时存在这两列。根据数据集的类型,我们可能已经可以访问 golden_solution,即使名称不同,但对于 solutions 而言并非如此。

  • FormatPRM。此步骤执行辅助工作,准备数据以遵循论文中定义的格式,即具有两列 inputlabel。运行 MathShepherdCompleter 后,我们获得了可以根据用户意愿格式化的原始数据。使用 ExpandColumns 和此步骤,可以直接获得论文中共享的数据集中呈现的相同格式:peiyi9979/Math-Shepherd

数据准备

对于此示例,正如原始论文一样,我们使用 openai/gsm8k 数据集。我们只需要一个包含要解决的指令的数据集(在本例中,它对应于 question 列),我们可以使用我们预定义的步骤生成所有其他内容。

构建 pipeline

该 pipeline 使用 openai/gsm8k 作为参考,但该 pipeline 可以应用于不同的数据集,请记住,可以通过调整每个任务中的 extra_rulesfew_shots 来修改当前定义中的提示

from datasets import load_dataset

from distilabel.steps.tasks import MathShepherdCompleter, MathShepherdGenerator, FormatPRM
from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns

ds_name = "openai/gsm8k"

ds = load_dataset(ds_name, "main", split="test").rename_column("question", "instruction").select(range(3))  # (1)

with Pipeline(name="Math-Shepherd") as pipe:
    model_id_70B = "meta-llama/Meta-Llama-3.1-70B-Instruct"
    model_id_8B = "meta-llama/Meta-Llama-3.1-8B-Instruct"

    llm_70B = InferenceEndpointsLLM(
        model_id=model_id_70B,
        tokenizer_id=model_id_70B,
        generation_kwargs={"max_new_tokens": 1024, "temperature": 0.6},
    )
    llm_8B = InferenceEndpointsLLM(
        model_id=model_id_8B,
        tokenizer_id=model_id_8B,
        generation_kwargs={"max_new_tokens": 2048, "temperature": 0.6},
    )  # (2)

    generator_golden = MathShepherdGenerator(
        name="golden_generator",
        llm=llm_70B,
    )  # (3)
    generator = MathShepherdGenerator(
        name="generator",
        llm=llm_8B,
        use_default_structured_output=True,  # (9)
        M=5
    )  # (4)
    completer = MathShepherdCompleter(
        name="completer",
        llm=llm_8B,
        use_default_structured_output=True,
        N=4
    )  # (5)

    combine = CombineOutputs()

    expand = ExpandColumns(
        name="expand_columns",
        columns=["solutions"],
        split_statistics=True,
    )  # (6)
    formatter = FormatPRM(name="format_prm")  # (7)

    [generator_golden, generator] >> combine >> completer >> expand >> formatter # (8)
  1. 将仅使用示例数据集中的 3 行,并将“question”重命名为“instruction”,以设置 MathShepherdGenerator 的预期值。

  2. 我们将使用 2 个不同的 LLM,meta-llama/Meta-Llama-3.1-70B-Instruct(用于 golden_solution 的更强大的模型)和 meta-llama/Meta-Llama-3.1-8B-Instruct(用于生成候选解决方案和补全的较弱的模型)。

  3. MathShepherdGenerator 任务,使用更强大的模型,将为我们生成 golden_solution,即任务的逐步解决方案。

  4. 另一个 MathShepherdGenerator 任务,但在这种情况下使用较弱的模型将生成候选 solutions(总共 M=5 个)。

  5. 现在,MathShepherdCompleter 任务将为 solutions 列中每个候选解决方案的每个步骤生成 n=4补全,并使用 golden_solution 对其进行标注,如图 2 所示。此步骤将在适当位置将标签(它使用 [+ 和 -] 标签,遵循论文中的实现,但这些值可以修改)添加到 solutions 列,而不是生成额外的列,但中间补全最终不会显示。

  6. ExpandColumns 步骤扩展解决方案以匹配指令,因此如果我们设置 M=5,我们现在将有 5x 指令对解决方案。我们将 split_statistics 设置为 True,以确保 distilabel_metadata 相应地拆分,否则每个解决方案的 token 数将计为生成的所有解决方案列表所需的 token 数。可以省略此步骤和以下步骤,并根据需要处理数据以进行训练。

  7. 最后,FormatPRM 生成两列:inputlabel,它们准备数据以进行训练,如原始 Math-Shepherd 数据集中所示。

  8. generator_goldengenerator 都可以并行运行,因为它们之间没有依赖关系,之后我们将结果组合起来并将其传递给 completer。最后,我们使用 expandformatter 以预期格式准备数据,以训练原始论文中定义的过程奖励模型。

  9. 生成结构化输出以确保更容易解析它们,否则模型在解析易于解析的列表时可能会多次失败。

脚本和最终数据集

要查看所有组件是否就位,请查看完整的 pipeline

运行
python examples/pipe_math_shepherd.py
完整 pipeline
pipe_math_shepherd.py
# Copyright 2023-present, Argilla, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://apache.ac.cn/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datasets import load_dataset

from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns
from distilabel.steps.tasks import (
    FormatPRM,
    MathShepherdCompleter,
    MathShepherdGenerator,
)

ds_name = "openai/gsm8k"

ds = (
    load_dataset(ds_name, "main", split="test")
    .rename_column("question", "instruction")
    .select(range(3))
)


with Pipeline(name="Math-Shepherd") as pipe:
    model_id_70B = "meta-llama/Meta-Llama-3.1-70B-Instruct"
    model_id_8B = "meta-llama/Meta-Llama-3.1-8B-Instruct"

    llm_70B = InferenceEndpointsLLM(
        model_id=model_id_8B,
        tokenizer_id=model_id_8B,
        generation_kwargs={"max_new_tokens": 1024, "temperature": 0.5},
    )
    llm_8B = InferenceEndpointsLLM(
        model_id=model_id_8B,
        tokenizer_id=model_id_8B,
        generation_kwargs={"max_new_tokens": 2048, "temperature": 0.7},
    )

    generator_golden = MathShepherdGenerator(
        name="golden_generator",
        llm=llm_70B,
    )
    generator = MathShepherdGenerator(
        name="generator",
        llm=llm_8B,
        M=5,
    )
    completer = MathShepherdCompleter(name="completer", llm=llm_8B, N=4)

    combine = CombineOutputs()

    expand = ExpandColumns(
        name="expand_columns",
        columns=["solutions"],
        split_statistics=True,
    )
    formatter = FormatPRM(name="format_prm")
    [generator_golden, generator] >> combine >> completer >> expand >> formatter


if __name__ == "__main__":
    distiset = pipe.run(use_cache=False, dataset=ds)
    distiset.push_to_hub("plaguss/test_math_shepherd_prm")

结果数据集可以在以下位置查看:plaguss/test_math_shepherd_prm

使用 vLLM 和 ray 的 Pipeline

本节包含另一种使用更大结果运行 pipeline 的方法。为了展示如何扩展 pipeline,我们为 3 个生成任务使用了 Qwen/Qwen2.5-72B-Instruct,这大大提高了最终质量,因为它更接近给定的提示。此外,我们使用 vLLM 和 3 个节点(在本例中每个任务一个节点)来扩展生成过程。

Math-Shepherd 的更大 pipeline
from datasets import load_dataset

from distilabel.models import vLLM
from distilabel.steps import StepResources
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns
from distilabel.steps.tasks import (
    FormatPRM,
    MathShepherdCompleter,
    MathShepherdGenerator,
)

ds_name = "openai/gsm8k"

ds = (
    load_dataset(ds_name, "main", split="test")
    .rename_column("question", "instruction")
)


with Pipeline(name="Math-Shepherd").ray() as pipe:  # (1)

    model_id_72B = "Qwen/Qwen2.5-72B-Instruct"

    llm_72B = vLLM(
        model=model_id_72B,
        tokenizer=model_id_72B,
        extra_kwargs={
            "tensor_parallel_size": 8,               # Number of GPUs per node
            "max_model_len": 2048,
        },
        generation_kwargs={
            "temperature": 0.5,
            "max_new_tokens": 4096,
        },
    )

    generator_golden = MathShepherdGenerator(
        name="golden_generator",
        llm=llm_72B,
        input_batch_size=50,
        output_mappings={"model_name": "model_name_golden_generator"},
        resources=StepResources(replicas=1, gpus=8)  # (2)
    )
    generator = MathShepherdGenerator(
        name="generator",
        llm=llm_72B,
        input_batch_size=50,
        M=5,
        use_default_structured_output=True,
        output_mappings={"model_name": "model_name_generator"},
        resources=StepResources(replicas=1, gpus=8)
    )
    completer = MathShepherdCompleter(
        name="completer", 
        llm=llm_72B,
        N=8,
        use_default_structured_output=True,
        output_mappings={"model_name": "model_name_completer"},
        resources=StepResources(replicas=1, gpus=8)
    )

    combine = CombineOutputs()

    expand = ExpandColumns(
        name="expand_columns",
        columns=["solutions"],
        split_statistics=True,

    )
    formatter = FormatPRM(name="format_prm", format="trl")  # (3)

    [generator_golden, generator] >> combine >> completer >> expand >> formatter


if __name__ == "__main__":
    distiset = pipe.run(use_cache=False, dataset=ds, dataset_batch_size=50)
    if distiset:
        distiset.push_to_hub("plaguss/test_math_shepherd_prm_ray")
  1. 转换 pipeline 以使用 ray 后端运行。

  2. 分配资源:副本数 1,因为我们希望节点中只有一个任务实例,GPU 数量等于 8,使用整个节点。鉴于我们在 slurm 文件中定义了使用 3 个节点的脚本,这将使用所有 3 个可用节点,每个任务使用 8 个 GPU。

  3. TRL 预期的格式准备列以进行训练。

单击以查看用于运行先前 pipeline 的 slurm 文件。这是我们常用的 slurm 文件,使用 3 个 8xH100 节点。

Slurm 文件
#!/bin/bash
#SBATCH --job-name=math-shepherd-test-ray
#SBATCH --partition=hopper-prod
#SBATCH --qos=normal
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --output=./logs/%x-%j.out
#SBATCH --err=./logs/%x-%j.err
#SBATCH --time=48:00:00

set -ex

module load cuda/12.1

echo "SLURM_JOB_ID: $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"

source .venv/bin/activate

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

# Get the IP address of the head node
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

# Start Ray head node
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"

# Generate a unique Ray tmp dir for the head node
head_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_head"

echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
    ray start --head --node-ip-address="$head_node_ip" --port=$port \
    --dashboard-host=0.0.0.0 \
    --dashboard-port=8265 \
    --temp-dir="$head_tmp_dir" \
    --block &

# Give some time to head node to start...
sleep 10

# Start Ray worker nodes
worker_num=$((SLURM_JOB_NUM_NODES - 1))

# Start from 1 (0 is head node)
for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    worker_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_worker_$i"
    echo "Starting WORKER $i at $node_i"
    srun --nodes=1 --ntasks=1 -w "$node_i" \
        ray start --address "$ip_head" \
        --temp-dir="$worker_tmp_dir" \
        --block &
    sleep 5
done

# Give some time to the Ray cluster to gather info
sleep 60

# Finally submit the job to the cluster
RAY_ADDRESS="http://$head_node_ip:8265" ray job submit --working-dir pipeline -- python -u pipeline_math_shepherd_ray.py
最终数据集

结果数据集可以在以下位置查看:plaguss/test_math_shepherd_prm_ray