创建数据集以使用 Math-Shepherd 训练过程奖励模型¶
此示例将介绍 Math-Shepherd:无需人工标注即可逐步验证和增强 LLM,这是一种创新的数学过程奖励模型 (PRM),它为数学问题解决方案的每个步骤分配奖励分数。具体来说,我们将介绍创建数据集以训练此类模型的方法。最后几节包含 2 个 pipeline 示例,用于运行具有不同资源的 pipeline。
Replica¶
与仅关注最终答案的传统模型(输出奖励模型或 ORM)不同,此系统评估数学解决方案的每个步骤,并为单个解决方案步骤分配奖励分数。让我们看一下论文中的图 2,其中总结了他们工作中提出的标注方法。
在传统的 ORM 方法中,标注是根据最终结果完成的,而过程奖励模型 (PRM) 允许标注导致解决方案的不同步骤,从而获得更丰富的信息集。
涉及的步骤¶
-
MathShepherdGenerator
:此步骤负责为指令生成解决方案。根据为M
设置的值,此步骤可用于生成golden_solution
(用作标注器的参考)或要标注的solutions
集。对于solutions
列,我们希望有一定的多样性,以使模型能够获得好坏两种解决方案,以便我们为标注器提供具有代表性的样本,因此最好使用“较弱”的模型。 -
MathShepherdCompleter
。此任务执行论文中completer
的工作,生成如图 2 第 3.3.2 节所示的补全。它不会自行生成列,而是更新MathShepherdGenerator
中solutions
列中生成的步骤,使用golden_solution
作为标注数据的参考。因此,为了使此步骤有效,我们需要数据集中同时存在这两列。根据数据集的类型,我们可能已经可以访问golden_solution
,即使名称不同,但对于solutions
而言并非如此。 -
FormatPRM
。此步骤执行辅助工作,准备数据以遵循论文中定义的格式,即具有两列input
和label
。运行MathShepherdCompleter
后,我们获得了可以根据用户意愿格式化的原始数据。使用ExpandColumns
和此步骤,可以直接获得论文中共享的数据集中呈现的相同格式:peiyi9979/Math-Shepherd。
数据准备¶
对于此示例,正如原始论文一样,我们使用 openai/gsm8k 数据集。我们只需要一个包含要解决的指令的数据集(在本例中,它对应于 question
列),我们可以使用我们预定义的步骤生成所有其他内容。
构建 pipeline¶
该 pipeline 使用 openai/gsm8k
作为参考,但该 pipeline 可以应用于不同的数据集,请记住,可以通过调整每个任务中的 extra_rules
和 few_shots
来修改当前定义中的提示
from datasets import load_dataset
from distilabel.steps.tasks import MathShepherdCompleter, MathShepherdGenerator, FormatPRM
from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns
ds_name = "openai/gsm8k"
ds = load_dataset(ds_name, "main", split="test").rename_column("question", "instruction").select(range(3)) # (1)
with Pipeline(name="Math-Shepherd") as pipe:
model_id_70B = "meta-llama/Meta-Llama-3.1-70B-Instruct"
model_id_8B = "meta-llama/Meta-Llama-3.1-8B-Instruct"
llm_70B = InferenceEndpointsLLM(
model_id=model_id_70B,
tokenizer_id=model_id_70B,
generation_kwargs={"max_new_tokens": 1024, "temperature": 0.6},
)
llm_8B = InferenceEndpointsLLM(
model_id=model_id_8B,
tokenizer_id=model_id_8B,
generation_kwargs={"max_new_tokens": 2048, "temperature": 0.6},
) # (2)
generator_golden = MathShepherdGenerator(
name="golden_generator",
llm=llm_70B,
) # (3)
generator = MathShepherdGenerator(
name="generator",
llm=llm_8B,
use_default_structured_output=True, # (9)
M=5
) # (4)
completer = MathShepherdCompleter(
name="completer",
llm=llm_8B,
use_default_structured_output=True,
N=4
) # (5)
combine = CombineOutputs()
expand = ExpandColumns(
name="expand_columns",
columns=["solutions"],
split_statistics=True,
) # (6)
formatter = FormatPRM(name="format_prm") # (7)
[generator_golden, generator] >> combine >> completer >> expand >> formatter # (8)
-
将仅使用示例数据集中的 3 行,并将“question”重命名为“instruction”,以设置
MathShepherdGenerator
的预期值。 -
我们将使用 2 个不同的 LLM,
meta-llama/Meta-Llama-3.1-70B-Instruct
(用于golden_solution
的更强大的模型)和meta-llama/Meta-Llama-3.1-8B-Instruct
(用于生成候选解决方案和补全的较弱的模型)。 -
此
MathShepherdGenerator
任务,使用更强大的模型,将为我们生成golden_solution
,即任务的逐步解决方案。 -
另一个
MathShepherdGenerator
任务,但在这种情况下使用较弱的模型将生成候选solutions
(总共M=5
个)。 -
现在,
MathShepherdCompleter
任务将为solutions
列中每个候选解决方案的每个步骤生成n=4
个补全,并使用golden_solution
对其进行标注,如图 2 所示。此步骤将在适当位置将标签(它使用 [+ 和 -] 标签,遵循论文中的实现,但这些值可以修改)添加到solutions
列,而不是生成额外的列,但中间补全最终不会显示。 -
ExpandColumns
步骤扩展解决方案以匹配指令,因此如果我们设置 M=5,我们现在将有 5x 指令对解决方案。我们将split_statistics
设置为 True,以确保distilabel_metadata
相应地拆分,否则每个解决方案的 token 数将计为生成的所有解决方案列表所需的 token 数。可以省略此步骤和以下步骤,并根据需要处理数据以进行训练。 -
最后,
FormatPRM
生成两列:input
和label
,它们准备数据以进行训练,如原始 Math-Shepherd 数据集中所示。 -
generator_golden
和generator
都可以并行运行,因为它们之间没有依赖关系,之后我们将结果组合起来并将其传递给completer
。最后,我们使用expand
和formatter
以预期格式准备数据,以训练原始论文中定义的过程奖励模型。 -
生成结构化输出以确保更容易解析它们,否则模型在解析易于解析的列表时可能会多次失败。
脚本和最终数据集¶
要查看所有组件是否就位,请查看完整的 pipeline
完整 pipeline
# Copyright 2023-present, Argilla, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://apache.ac.cn/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datasets import load_dataset
from distilabel.models import InferenceEndpointsLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns
from distilabel.steps.tasks import (
FormatPRM,
MathShepherdCompleter,
MathShepherdGenerator,
)
ds_name = "openai/gsm8k"
ds = (
load_dataset(ds_name, "main", split="test")
.rename_column("question", "instruction")
.select(range(3))
)
with Pipeline(name="Math-Shepherd") as pipe:
model_id_70B = "meta-llama/Meta-Llama-3.1-70B-Instruct"
model_id_8B = "meta-llama/Meta-Llama-3.1-8B-Instruct"
llm_70B = InferenceEndpointsLLM(
model_id=model_id_8B,
tokenizer_id=model_id_8B,
generation_kwargs={"max_new_tokens": 1024, "temperature": 0.5},
)
llm_8B = InferenceEndpointsLLM(
model_id=model_id_8B,
tokenizer_id=model_id_8B,
generation_kwargs={"max_new_tokens": 2048, "temperature": 0.7},
)
generator_golden = MathShepherdGenerator(
name="golden_generator",
llm=llm_70B,
)
generator = MathShepherdGenerator(
name="generator",
llm=llm_8B,
M=5,
)
completer = MathShepherdCompleter(name="completer", llm=llm_8B, N=4)
combine = CombineOutputs()
expand = ExpandColumns(
name="expand_columns",
columns=["solutions"],
split_statistics=True,
)
formatter = FormatPRM(name="format_prm")
[generator_golden, generator] >> combine >> completer >> expand >> formatter
if __name__ == "__main__":
distiset = pipe.run(use_cache=False, dataset=ds)
distiset.push_to_hub("plaguss/test_math_shepherd_prm")
结果数据集可以在以下位置查看:plaguss/test_math_shepherd_prm。
使用 vLLM 和 ray 的 Pipeline¶
本节包含另一种使用更大结果运行 pipeline 的方法。为了展示如何扩展 pipeline,我们为 3 个生成任务使用了 Qwen/Qwen2.5-72B-Instruct,这大大提高了最终质量,因为它更接近给定的提示。此外,我们使用 vLLM
和 3 个节点(在本例中每个任务一个节点)来扩展生成过程。
Math-Shepherd 的更大 pipeline
from datasets import load_dataset
from distilabel.models import vLLM
from distilabel.steps import StepResources
from distilabel.pipeline import Pipeline
from distilabel.steps import CombineOutputs, ExpandColumns
from distilabel.steps.tasks import (
FormatPRM,
MathShepherdCompleter,
MathShepherdGenerator,
)
ds_name = "openai/gsm8k"
ds = (
load_dataset(ds_name, "main", split="test")
.rename_column("question", "instruction")
)
with Pipeline(name="Math-Shepherd").ray() as pipe: # (1)
model_id_72B = "Qwen/Qwen2.5-72B-Instruct"
llm_72B = vLLM(
model=model_id_72B,
tokenizer=model_id_72B,
extra_kwargs={
"tensor_parallel_size": 8, # Number of GPUs per node
"max_model_len": 2048,
},
generation_kwargs={
"temperature": 0.5,
"max_new_tokens": 4096,
},
)
generator_golden = MathShepherdGenerator(
name="golden_generator",
llm=llm_72B,
input_batch_size=50,
output_mappings={"model_name": "model_name_golden_generator"},
resources=StepResources(replicas=1, gpus=8) # (2)
)
generator = MathShepherdGenerator(
name="generator",
llm=llm_72B,
input_batch_size=50,
M=5,
use_default_structured_output=True,
output_mappings={"model_name": "model_name_generator"},
resources=StepResources(replicas=1, gpus=8)
)
completer = MathShepherdCompleter(
name="completer",
llm=llm_72B,
N=8,
use_default_structured_output=True,
output_mappings={"model_name": "model_name_completer"},
resources=StepResources(replicas=1, gpus=8)
)
combine = CombineOutputs()
expand = ExpandColumns(
name="expand_columns",
columns=["solutions"],
split_statistics=True,
)
formatter = FormatPRM(name="format_prm", format="trl") # (3)
[generator_golden, generator] >> combine >> completer >> expand >> formatter
if __name__ == "__main__":
distiset = pipe.run(use_cache=False, dataset=ds, dataset_batch_size=50)
if distiset:
distiset.push_to_hub("plaguss/test_math_shepherd_prm_ray")
-
转换 pipeline 以使用
ray
后端运行。 -
分配资源:副本数 1,因为我们希望节点中只有一个任务实例,GPU 数量等于 8,使用整个节点。鉴于我们在 slurm 文件中定义了使用 3 个节点的脚本,这将使用所有 3 个可用节点,每个任务使用 8 个 GPU。
-
以
TRL
预期的格式准备列以进行训练。
单击以查看用于运行先前 pipeline 的 slurm 文件。这是我们常用的 slurm
文件,使用 3 个 8xH100 节点。
Slurm 文件
#!/bin/bash
#SBATCH --job-name=math-shepherd-test-ray
#SBATCH --partition=hopper-prod
#SBATCH --qos=normal
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-node=8
#SBATCH --output=./logs/%x-%j.out
#SBATCH --err=./logs/%x-%j.err
#SBATCH --time=48:00:00
set -ex
module load cuda/12.1
echo "SLURM_JOB_ID: $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
source .venv/bin/activate
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
# Get the IP address of the head node
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
# Start Ray head node
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
# Generate a unique Ray tmp dir for the head node
head_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_head"
echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
ray start --head --node-ip-address="$head_node_ip" --port=$port \
--dashboard-host=0.0.0.0 \
--dashboard-port=8265 \
--temp-dir="$head_tmp_dir" \
--block &
# Give some time to head node to start...
sleep 10
# Start Ray worker nodes
worker_num=$((SLURM_JOB_NUM_NODES - 1))
# Start from 1 (0 is head node)
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
worker_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_worker_$i"
echo "Starting WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start --address "$ip_head" \
--temp-dir="$worker_tmp_dir" \
--block &
sleep 5
done
# Give some time to the Ray cluster to gather info
sleep 60
# Finally submit the job to the cluster
RAY_ADDRESS="http://$head_node_ip:8265" ray job submit --working-dir pipeline -- python -u pipeline_math_shepherd_ray.py
最终数据集
结果数据集可以在以下位置查看:plaguss/test_math_shepherd_prm_ray。