跳到内容

使用 Ray 扩展和分发 pipeline

尽管基于 multiprocessing 的本地 Pipeline + 使用外部服务提供 LLM 服务足以执行用于创建 SFT 和偏好数据集的大多数 pipeline,但在某些情况下,您可能需要跨多台机器扩展 pipeline。在这种情况下,distilabel 利用 Ray 有效地分配工作负载。这使您能够生成更大的数据集,减少执行时间,并最大限度地提高跨机器集群的资源利用率,而无需更改一行代码。

distilabel step 和 Ray Actor 之间的关系

一个 distilabel pipeline 由多个 Step 组成。Step 是一个定义基本生命周期的类

  1. 它将加载或创建运行其逻辑所需的资源(LLM、客户端等)。
  2. 它将运行一个循环,等待使用队列接收的传入批次。一旦收到一个批次,它将处理它并将处理后的批次放入输出队列。
  3. 当它完成最后一个批次或收到特殊信号时,循环将结束,并执行卸载逻辑。

因此,Step 需要维护最小状态,而使用 Ray 执行此操作的最佳方法是使用 actor

graph TD
    A[Step] -->|has| B[Multiple Replicas]
    B -->|wrapped in| C[Ray Actor]
    C -->|maintains| D[Step Replica State]
    C -->|executes| E[Step Lifecycle]
    E -->|1. Load/Create Resources| F[LLMs, Clients, etc.]
    E -->|2. Process batches from| G[Input Queue]
    E -->|3. Processed batches are put in| H[Output Queue]
    E -->|4. Unload| I[Cleanup]

使用 Ray 执行 pipeline

使用 Ray Jobs API 是使用 Ray 执行 distilabel pipeline 的推荐方法。

在深入解释之前,我们首先安装先决条件

pip install distilabel[ray]

提示

建议创建一个虚拟环境。

为了解释如何使用 Ray 执行 pipeline,我们将在整个示例中使用以下 pipeline

from distilabel.models import vLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline(name="text-generation-ray-pipeline") as pipeline:
    load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"})

    text_generation = TextGeneration(
        llm=vLLM(
            model="meta-llama/Meta-Llama-3-8B-Instruct",
            tokenizer="meta-llama/Meta-Llama-3-8B-Instruct",
        )
    )

    load_data_from_hub >> text_generation

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={
            load_data_from_hub.name: {
                "repo_id": "HuggingFaceH4/instruction-dataset",
                "split": "test",
            },
            text_generation.name: {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.7,
                        "max_new_tokens": 4096,
                    }
                },
                "resources": {"replicas": 2, "gpus": 1}, # (1)
            },
        }
    )

    distiset.push_to_hub(
        "<YOUR_HF_USERNAME_OR_ORGANIZATION>/text-generation-distilabel-ray" # (2)
    )
  1. 我们为 text_generation step 设置了资源,并定义我们想要两个副本,每个副本一个 GPU。distilabel 将创建 step 的两个副本,即 Ray 集群中的两个 actor,并且每个 actor 将请求分配到集群中至少有一个 GPU 的节点上。您可以在此处阅读有关 Ray 如何管理资源的更多信息。
  2. 您应该修改此项,并在 Hugging Face Hub 上添加您的用户或组织。

这是一个基本的 pipeline,只有两个 step:一个从 Hub 加载带有 instruction 列的数据集,另一个使用带有 vLLM 的 Llama 3 8B Instruct 为该 instruction 生成 response。简单但足以演示如何使用 Ray 集群分发和扩展工作负载!

使用 Ray Jobs API

如果您不了解 Ray Jobs API,建议阅读 Ray Jobs Overview。快速摘要:Ray Jobs 是在 Ray 集群中执行作业的推荐方法,因为它将处理 Ray 应用程序的打包、部署和管理。

要执行上面的 pipeline,我们首先需要创建一个目录(类似于软件包),其中包含我们将提交到 Ray 集群的 pipeline 脚本(或多个脚本)

mkdir ray-pipeline

目录 ray-pipeline 的内容应为

ray-pipeline/
├── pipeline.py
└── runtime_env.yaml

第一个文件包含 pipeline 的代码,而第二个文件 (runtime_env.yaml) 是一个特定的 Ray 文件,其中包含运行作业所需的 环境依赖项

pip:
  - distilabel[ray,vllm] >= 1.3.0
env_vars:
  HF_TOKEN: <YOUR_HF_TOKEN>

通过此文件,我们基本上是在通知 Ray 集群,它必须安装带有 vllmray 额外依赖项的 distilabel 才能运行作业。此外,我们正在定义 HF_TOKEN 环境变量,该变量将用于(通过 push_to_hub 方法)将生成的数据集上传到 Hugging Face Hub。

之后,我们可以继续执行 ray 命令,该命令会将作业提交到 Ray 集群

ray job submit \
    --address http://localhost:8265 \
    --working-dir ray-pipeline \
    --runtime-env ray-pipeline/runtime_env.yaml -- python pipeline.py

这将做的是基本上将 --working-dir 上传到 Ray 集群,安装依赖项,然后从 head 节点执行 python pipeline.py 命令。

文件系统要求

Using a file system to pass data to steps 中所述,distilabel 依赖于文件系统将数据传递给 GlobalStep,因此,如果要 Ray 集群中执行的 pipeline 具有任何 GlobalStep,或者您想要设置 run 方法的 use_fs_to_pass_data=True,那么您将需要设置一个文件系统,Ray 集群的所有节点都可以访问该文件系统

if __name__ == "__main__":
    distiset = pipeline.run(
        parameters={...},
        storage_parameters={"path": "file:///mnt/data"}, # (1)
        use_fs_to_pass_data=True,
    )
  1. Ray 集群的所有节点都应有权访问 /mnt/data

在带有 Slurm 的集群中执行 RayPipeline

如果您有权访问 HPC,那么您可能也是 Slurm 的用户,Slurm 是一种通常在 HPC 上使用的工作负载管理器。我们可以创建一个 Slurm 作业,该作业获取一些节点并部署一个 Ray 集群来运行分布式 distilabel pipeline

#!/bin/bash
#SBATCH --job-name=distilabel-ray-text-generation
#SBATCH --partition=your-partition
#SBATCH --qos=normal
#SBATCH --nodes=2 # (1)
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1 # (2)
#SBATCH --gpus-per-node=1 # (3)
#SBATCH --time=0:30:00

set -ex

echo "SLURM_JOB_ID: $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"

# Activate virtual environment
source /path/to/virtualenv/.venv/bin/activate

# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)

# Get the IP address of the head node
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

# Start Ray head node
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"

# Generate a unique Ray tmp dir for the head node (just in case the default one is not writable)
head_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_head"

echo "Starting HEAD at $head_node"
OUTLINES_CACHE_DIR="/tmp/.outlines" srun --nodes=1 --ntasks=1 -w "$head_node" \ # (4)
    ray start --head --node-ip-address="$head_node_ip" --port=$port \
    --dashboard-host=0.0.0.0 \
    --dashboard-port=8265 \
    --temp-dir="$head_tmp_dir" \
    --block &

# Give some time to head node to start...
echo "Waiting a bit before starting worker nodes..."
sleep 10

# Start Ray worker nodes
worker_num=$((SLURM_JOB_NUM_NODES - 1))

# Start from 1 (0 is head node)
for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    worker_tmp_dir="/tmp/ray_tmp_${SLURM_JOB_ID}_worker_$i"
    echo "Starting WORKER $i at $node_i"
    OUTLINES_CACHE_DIR="/tmp/.outlines" srun --nodes=1 --ntasks=1 -w "$node_i" \
        ray start --address "$ip_head" \
        --temp-dir="$worker_tmp_dir" \
        --block &
    sleep 5
done

# Give some time to the Ray cluster to gather info
echo "Waiting a bit before submitting the job..."
sleep 60

# Finally submit the job to the cluster
ray job submit --address http://localhost:8265 --working-dir ray-pipeline -- python -u pipeline.py
  1. 在这种情况下,我们只需要两个节点:一个运行 Ray head 节点,另一个运行 worker。
  2. 我们只想在每个节点上运行一个任务,即启动 head/worker 节点的 Ray 命令。
  3. 我们为每个节点选择了 1 个 GPU,但我们可以根据 pipeline 选择更多。
  4. 我们需要将环境变量 OUTLINES_CACHE_DIR 设置为 /tmp/.outlines,以避免节点尝试读取/写入相同的 outlines 缓存文件时出现问题,这是不可能的。

vLLMtensor_parallel_size

为了将 vLLM 的多 GPU 和多节点功能与 ray 一起使用,我们需要对上面的示例 pipeline 进行一些更改。第一个需要的更改是为 tensor_parallel_size 指定一个值,也称为“我想让你在多少个 GPU 中加载模型”,第二个更改是将 ray 定义为 distributed_executor_backend,因为 vLLM 中的默认设置是使用 multiprocessing

with Pipeline(name="text-generation-ray-pipeline") as pipeline:
    load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"})

    text_generation = TextGeneration(
        llm=vLLM(
            model="meta-llama/Meta-Llama-3.1-70B-Instruct",
            tokenizer="meta-llama/Meta-Llama-3.1-70B-Instruct",
            extra_kwargs={
                "tensor_parallel_size": 8,
                "distributed_executor_backend": "ray",
            }
        )
    )

    load_data_from_hub >> text_generation

有关使用 vLLM 进行分布式推理的更多信息,请访问:vLLM - Distributed Serving