使用 Ray 扩展和分发 pipeline¶
尽管基于 multiprocessing 的本地 Pipeline + 使用外部服务提供 LLM 服务足以执行用于创建 SFT 和偏好数据集的大多数 pipeline,但在某些情况下,您可能需要跨多台机器扩展 pipeline。在这种情况下,distilabel 利用 Ray 有效地分配工作负载。这使您能够生成更大的数据集,减少执行时间,并最大限度地提高跨机器集群的资源利用率,而无需更改一行代码。
distilabel step 和 Ray Actor 之间的关系¶
一个 distilabel
pipeline 由多个 Step
组成。Step
是一个定义基本生命周期的类
- 它将加载或创建运行其逻辑所需的资源(LLM、客户端等)。
- 它将运行一个循环,等待使用队列接收的传入批次。一旦收到一个批次,它将处理它并将处理后的批次放入输出队列。
- 当它完成最后一个批次或收到特殊信号时,循环将结束,并执行卸载逻辑。
因此,Step
需要维护最小状态,而使用 Ray 执行此操作的最佳方法是使用 actor。
graph TD
A[Step] -->|has| B[Multiple Replicas]
B -->|wrapped in| C[Ray Actor]
C -->|maintains| D[Step Replica State]
C -->|executes| E[Step Lifecycle]
E -->|1. Load/Create Resources| F[LLMs, Clients, etc.]
E -->|2. Process batches from| G[Input Queue]
E -->|3. Processed batches are put in| H[Output Queue]
E -->|4. Unload| I[Cleanup]
使用 Ray 执行 pipeline¶
使用 Ray Jobs API 是使用 Ray 执行 distilabel
pipeline 的推荐方法。
在深入解释之前,我们首先安装先决条件
提示
建议创建一个虚拟环境。
为了解释如何使用 Ray 执行 pipeline,我们将在整个示例中使用以下 pipeline
from distilabel.models import vLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration
with Pipeline(name="text-generation-ray-pipeline") as pipeline:
load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"})
text_generation = TextGeneration(
llm=vLLM(
model="meta-llama/Meta-Llama-3-8B-Instruct",
tokenizer="meta-llama/Meta-Llama-3-8B-Instruct",
)
)
load_data_from_hub >> text_generation
if __name__ == "__main__":
distiset = pipeline.run(
parameters={
load_data_from_hub.name: {
"repo_id": "HuggingFaceH4/instruction-dataset",
"split": "test",
},
text_generation.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 4096,
}
},
"resources": {"replicas": 2, "gpus": 1}, # (1)
},
}
)
distiset.push_to_hub(
"<YOUR_HF_USERNAME_OR_ORGANIZATION>/text-generation-distilabel-ray" # (2)
)
- 我们为
text_generation
step 设置了资源,并定义我们想要两个副本,每个副本一个 GPU。distilabel
将创建 step 的两个副本,即 Ray 集群中的两个 actor,并且每个 actor 将请求分配到集群中至少有一个 GPU 的节点上。您可以在此处阅读有关 Ray 如何管理资源的更多信息。 - 您应该修改此项,并在 Hugging Face Hub 上添加您的用户或组织。
这是一个基本的 pipeline,只有两个 step:一个从 Hub 加载带有 instruction
列的数据集,另一个使用带有 vLLM 的 Llama 3 8B Instruct 为该 instruction 生成 response
。简单但足以演示如何使用 Ray 集群分发和扩展工作负载!
使用 Ray Jobs API¶
如果您不了解 Ray Jobs API,建议阅读 Ray Jobs Overview。快速摘要:Ray Jobs 是在 Ray 集群中执行作业的推荐方法,因为它将处理 Ray 应用程序的打包、部署和管理。
要执行上面的 pipeline,我们首先需要创建一个目录(类似于软件包),其中包含我们将提交到 Ray 集群的 pipeline 脚本(或多个脚本)
目录 ray-pipeline
的内容应为
第一个文件包含 pipeline 的代码,而第二个文件 (runtime_env.yaml
) 是一个特定的 Ray 文件,其中包含运行作业所需的 环境依赖项
通过此文件,我们基本上是在通知 Ray 集群,它必须安装带有 vllm
和 ray
额外依赖项的 distilabel
才能运行作业。此外,我们正在定义 HF_TOKEN
环境变量,该变量将用于(通过 push_to_hub
方法)将生成的数据集上传到 Hugging Face Hub。
之后,我们可以继续执行 ray
命令,该命令会将作业提交到 Ray 集群
ray job submit \
--address http://localhost:8265 \
--working-dir ray-pipeline \
--runtime-env ray-pipeline/runtime_env.yaml -- python pipeline.py
这将做的是基本上将 --working-dir
上传到 Ray 集群,安装依赖项,然后从 head 节点执行 python pipeline.py
命令。
文件系统要求¶
如 Using a file system to pass data to steps 中所述,distilabel
依赖于文件系统将数据传递给 GlobalStep
,因此,如果要 Ray 集群中执行的 pipeline 具有任何 GlobalStep
,或者您想要设置 run 方法的 use_fs_to_pass_data=True
,那么您将需要设置一个文件系统,Ray 集群的所有节点都可以访问该文件系统
if __name__ == "__main__":
distiset = pipeline.run(
parameters={...},
storage_parameters={"path": "file:///mnt/data"}, # (1)
use_fs_to_pass_data=True,
)
- Ray 集群的所有节点都应有权访问
/mnt/data
。
在带有 Slurm 的集群中执行 RayPipeline
¶
如果您有权访问 HPC,那么您可能也是 Slurm 的用户,Slurm 是一种通常在 HPC 上使用的工作负载管理器。我们可以创建一个 Slurm 作业,该作业获取一些节点并部署一个 Ray 集群来运行分布式 distilabel
pipeline
#!/bin/bash
#SBATCH --job-name=distilabel-ray-text-generation
#SBATCH --partition=your-partition
#SBATCH --qos=normal
#SBATCH --nodes=2 # (1)
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1 # (2)
#SBATCH --gpus-per-node=1 # (3)
#SBATCH --time=0:30:00
set -ex
echo "SLURM_JOB_ID: $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
# Activate virtual environment
source /path/to/virtualenv/.venv/bin/activate
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
# Get the IP address of the head node
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
# Start Ray head node
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
# Generate a unique Ray tmp dir for the head node (just in case the default one is not writable)
head_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_head"
echo "Starting HEAD at $head_node"
OUTLINES_CACHE_DIR="/tmp/.outlines" srun --nodes=1 --ntasks=1 -w "$head_node" \ # (4)
ray start --head --node-ip-address="$head_node_ip" --port=$port \
--dashboard-host=0.0.0.0 \
--dashboard-port=8265 \
--temp-dir="$head_tmp_dir" \
--block &
# Give some time to head node to start...
echo "Waiting a bit before starting worker nodes..."
sleep 10
# Start Ray worker nodes
worker_num=$((SLURM_JOB_NUM_NODES - 1))
# Start from 1 (0 is head node)
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
worker_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_worker_$i"
echo "Starting WORKER $i at $node_i"
OUTLINES_CACHE_DIR="/tmp/.outlines" srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start --address "$ip_head" \
--temp-dir="$worker_tmp_dir" \
--block &
sleep 5
done
# Give some time to the Ray cluster to gather info
echo "Waiting a bit before submitting the job..."
sleep 60
# Finally submit the job to the cluster
ray job submit --address http://localhost:8265 --working-dir ray-pipeline -- python -u pipeline.py
- 在这种情况下,我们只需要两个节点:一个运行 Ray head 节点,另一个运行 worker。
- 我们只想在每个节点上运行一个任务,即启动 head/worker 节点的 Ray 命令。
- 我们为每个节点选择了 1 个 GPU,但我们可以根据 pipeline 选择更多。
- 我们需要将环境变量
OUTLINES_CACHE_DIR
设置为/tmp/.outlines
,以避免节点尝试读取/写入相同的outlines
缓存文件时出现问题,这是不可能的。
vLLM
和 tensor_parallel_size
¶
为了将 vLLM
的多 GPU 和多节点功能与 ray
一起使用,我们需要对上面的示例 pipeline 进行一些更改。第一个需要的更改是为 tensor_parallel_size
指定一个值,也称为“我想让你在多少个 GPU 中加载模型”,第二个更改是将 ray
定义为 distributed_executor_backend
,因为 vLLM
中的默认设置是使用 multiprocessing
with Pipeline(name="text-generation-ray-pipeline") as pipeline:
load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"})
text_generation = TextGeneration(
llm=vLLM(
model="meta-llama/Meta-Llama-3.1-70B-Instruct",
tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
extra_kwargs={
"tensor_parallel_size": 8,
"distributed_executor_backend": "ray",
}
)
)
load_data_from_hub >> text_generation
有关使用 vLLM
进行分布式推理的更多信息,请访问:vLLM - Distributed Serving