Python实现高通量计算(HTC)的分布式调度:利用SLURM/PBS管理ML任务

好的,下面开始正文。

Python实现高通量计算(HTC)的分布式调度:利用SLURM/PBS管理ML任务

大家好!今天我们来探讨如何利用Python实现高通量计算(HTC)的分布式调度,并重点介绍如何使用SLURM和PBS这类作业调度系统来管理机器学习(ML)任务。HTC旨在通过大量计算资源并行处理大量独立任务,非常适合参数扫描、模型训练等ML场景。

1. 高通量计算(HTC)与机器学习

高通量计算的核心思想是并行处理大量相对独立的任务。在机器学习领域,HTC有诸多应用场景:

  • 超参数优化: 尝试不同的超参数组合来训练模型,每组超参数对应一个独立的训练任务。
  • 模型集成: 训练多个不同的模型(例如,使用不同的算法或数据集子集),然后将它们的预测结果进行集成。
  • 交叉验证: 将数据集分割成多个子集,并使用不同的子集进行训练和验证。
  • 数据预处理: 对大量数据进行并行处理,例如图像处理、文本清洗等。

2. 分布式调度系统:SLURM和PBS

为了有效地利用集群资源进行HTC,我们需要使用作业调度系统。SLURM (Simple Linux Utility for Resource Management) 和PBS (Portable Batch System) 是两种常见的开源作业调度系统。它们负责资源分配、任务调度和监控,使得我们可以方便地提交、管理和监控大量的计算任务。

  • SLURM: 一个功能强大的开源集群管理和作业调度系统,广泛应用于高性能计算(HPC)环境中。
  • PBS: 另一个流行的作业调度系统,也常用于HPC集群。虽然原始的PBS是商业软件,但现在也有开源实现,例如OpenPBS。

这两种系统的大致工作流程如下:

  1. 用户编写作业脚本,描述任务需求(例如,CPU核心数、内存、运行时间等)。
  2. 用户将作业脚本提交给调度系统。
  3. 调度系统根据集群资源情况和作业优先级,将作业分配到合适的计算节点。
  4. 计算节点执行作业脚本,完成计算任务。
  5. 调度系统监控作业执行情况,并提供日志和报告。

3. Python与SLURM/PBS的集成

Python可以很好地与SLURM和PBS集成,方便我们编写和提交作业。主要有两种方法:

  • 直接调用命令行工具: 使用Python的subprocess模块调用SLURM或PBS的命令行工具(例如,sbatchqsub)。
  • 使用Python库: 有一些Python库提供了更高级的接口来与SLURM或PBS交互,例如PySLURMpbs.

接下来,我们将分别介绍这两种方法,并提供示例代码。

3.1 使用subprocess调用命令行工具

这是最直接的方法,适用于简单的作业提交。

import subprocess
import os

def submit_job(script_path, job_name, cpus, memory):
    """
    提交SLURM作业。

    Args:
        script_path (str): 作业脚本的路径。
        job_name (str): 作业名称。
        cpus (int): 需要的CPU核心数。
        memory (str): 需要的内存 (例如, "4G")。
    """
    try:
        # 构建sbatch命令
        cmd = [
            "sbatch",
            "--job-name", job_name,
            "--cpus-per-task", str(cpus),
            "--mem", memory,
            script_path
        ]

        # 执行命令并获取输出
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        print(f"Job submitted successfully. Output:n{result.stdout}")

    except subprocess.CalledProcessError as e:
        print(f"Error submitting job: {e.stderr}")

def create_job_script(script_content, script_path):
    """
    创建SLURM作业脚本。

    Args:
        script_content (str): 脚本内容。
        script_path (str): 脚本保存路径。
    """
    with open(script_path, "w") as f:
        f.write(script_content)

if __name__ == "__main__":
    # 创建一个简单的作业脚本
    script_content = """#!/bin/bash
#SBATCH --output=output.txt
#SBATCH --error=error.txt

echo "Hello from SLURM!"
date
hostname
"""

    script_path = "my_job.sh"
    create_job_script(script_content, script_path)

    # 提交作业
    submit_job(script_path, "my_test_job", 4, "4G")

这个例子首先创建了一个简单的SLURM作业脚本my_job.sh,然后使用submit_job函数将其提交给SLURM。submit_job函数使用subprocess.run执行sbatch命令,并将作业脚本的路径作为参数传递给sbatchcapture_output=True 捕获输出和错误信息,text=True确保输出以文本形式返回,check=True 使得如果命令执行失败,会抛出异常。

对于PBS,可以使用类似的方法:

import subprocess
import os

def submit_job_pbs(script_path, job_name, cpus, memory):
    """
    提交PBS作业。

    Args:
        script_path (str): 作业脚本的路径。
        job_name (str): 作业名称。
        cpus (int): 需要的CPU核心数。
        memory (str): 需要的内存 (例如, "4G")。
    """
    try:
        # 构建qsub命令
        cmd = [
            "qsub",
            "-N", job_name,
            "-l", f"select=1:ncpus={cpus}:mem={memory}",
            script_path
        ]

        # 执行命令并获取输出
        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        print(f"Job submitted successfully. Output:n{result.stdout}")

    except subprocess.CalledProcessError as e:
        print(f"Error submitting job: {e.stderr}")

def create_job_script_pbs(script_content, script_path):
    """
    创建PBS作业脚本。

    Args:
        script_content (str): 脚本内容。
        script_path (str): 脚本保存路径。
    """
    with open(script_path, "w") as f:
        f.write(script_content)

if __name__ == "__main__":
    # 创建一个简单的作业脚本
    script_content = """#!/bin/bash
#PBS -o output.txt
#PBS -e error.txt

echo "Hello from PBS!"
date
hostname
"""

    script_path = "my_job_pbs.sh"
    create_job_script_pbs(script_content, script_path)

    # 提交作业
    submit_job_pbs(script_path, "my_test_job_pbs", 4, "4GB")

这个例子与SLURM的例子类似,只是使用了qsub命令和不同的作业脚本格式。

3.2 使用Python库

使用Python库可以提供更方便的接口来管理作业。

  • PySLURM: 一个用于与SLURM交互的Python库。
  • pbs: 一个用于与PBS交互的Python库。 (注意: 可能需要安装pbs_python)

使用PySLURM示例:

# 需要安装 pyslurm: pip install pyslurm

try:
    import pyslurm
except ImportError:
    print("PySLURM is not installed. Please install it using 'pip install pyslurm'")
    exit()

def submit_job_pyslurm(job_name, script_path, cpus, memory):
    """
    使用PySLURM提交作业。

    Args:
        job_name (str): 作业名称。
        script_path (str): 作业脚本的路径。
        cpus (int): 需要的CPU核心数。
        memory (str): 需要的内存 (例如, "4G")。
    """
    try:
        job = pyslurm.job()
        job.create(
            job_name=job_name,
            script=script_path,
            cpus_per_task=cpus,
            memory=memory,
            output="output.txt",
            error="error.txt"
        )
        print(f"Job submitted successfully. Job ID: {job.job_id}")

    except Exception as e:
        print(f"Error submitting job: {e}")

if __name__ == "__main__":
    # 创建一个简单的作业脚本
    script_content = """#!/bin/bash

echo "Hello from SLURM using PySLURM!"
date
hostname
"""

    script_path = "my_job_pyslurm.sh"
    with open(script_path, "w") as f:
        f.write(script_content)

    # 提交作业
    submit_job_pyslurm("my_test_job_pyslurm", script_path, 4, "4G")

使用pbs库示例:

# 需要安装 pbs_python: pip install pbs_python

try:
    import pbs
except ImportError:
    print("pbs_python is not installed. Please install it using 'pip install pbs_python'")
    exit()

def submit_job_pbs_lib(job_name, script_path, cpus, memory):
    """
    使用 pbs 库提交作业。

    Args:
        job_name (str): 作业名称。
        script_path (str): 作业脚本的路径。
        cpus (int): 需要的CPU核心数。
        memory (str): 需要的内存 (例如, "4G")。
    """
    try:
        conn = pbs.pbs()
        job = pbs.Job(conn, script_path)
        job.N = job_name
        job.l = f"select=1:ncpus={cpus}:mem={memory}"
        job.o = "output.txt"
        job.e = "error.txt"
        job.submit()
        print(f"Job submitted successfully. Job ID: {job.id}")
        conn.disconnect()

    except Exception as e:
        print(f"Error submitting job: {e}")

if __name__ == "__main__":
    # 创建一个简单的作业脚本
    script_content = """#!/bin/bash

echo "Hello from PBS using pbs_python!"
date
hostname
"""

    script_path = "my_job_pbs_lib.sh"
    with open(script_path, "w") as f:
        f.write(script_content)

    # 提交作业
    submit_job_pbs_lib("my_test_job_pbs_lib", script_path, 4, "4GB")

这些例子展示了如何使用Python库来提交作业。这些库通常提供了更高级的接口,可以更方便地设置作业参数和管理作业。

4. 高通量计算的Python实现

现在,我们来讨论如何使用Python实现高通量计算。核心思想是将大量的独立任务分解成小的作业,并将它们提交给SLURM或PBS。

4.1 参数扫描示例

假设我们要训练一个机器学习模型,并对学习率进行参数扫描。我们可以将不同的学习率组合分配到不同的作业中。

import subprocess
import os
import itertools

def train_model(learning_rate, model_id):
    """
    模拟训练模型的函数。

    Args:
        learning_rate (float): 学习率。
        model_id (int): 模型ID,用于区分不同的模型。

    Returns:
        str: 训练结果。
    """
    # 模拟训练过程
    print(f"Training model {model_id} with learning rate {learning_rate}")
    # 实际的训练代码应该放在这里
    # ...
    result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
    return result

def create_training_script(learning_rate, model_id, script_path):
    """
    创建训练脚本。

    Args:
        learning_rate (float): 学习率。
        model_id (int): 模型ID。
        script_path (str): 脚本保存路径。
    """
    script_content = f"""#!/bin/bash
#SBATCH --output=output_{model_id}.txt
#SBATCH --error=error_{model_id}.txt

python -c "import train; print(train.train_model({learning_rate}, {model_id}))"
"""

    with open(script_path, "w") as f:
        f.write(script_content)

def submit_training_jobs(learning_rates, cpus, memory):
    """
    提交训练作业。

    Args:
        learning_rates (list): 学习率列表。
        cpus (int): 需要的CPU核心数。
        memory (str): 需要的内存 (例如, "4G")。
    """
    for i, learning_rate in enumerate(learning_rates):
        model_id = i + 1
        script_path = f"train_model_{model_id}.sh"
        create_training_script(learning_rate, model_id, script_path)
        submit_job(script_path, f"train_model_{model_id}", cpus, memory)

if __name__ == "__main__":
    # 学习率列表
    learning_rates = [0.001, 0.01, 0.1]

    # 提交训练作业
    submit_training_jobs(learning_rates, 4, "4G")

    # 创建一个名为train.py的文件,包含train_model函数
    with open("train.py", "w") as f:
        f.write("""
def train_model(learning_rate, model_id):
    print(f"Training model {model_id} with learning rate {learning_rate}")
    result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
    return result
""")

在这个例子中,我们首先定义了一个train_model函数,用于模拟训练模型的过程。然后,我们使用create_training_script函数为每个学习率创建一个独立的作业脚本。最后,我们使用submit_training_jobs函数将所有的作业提交给SLURM。

4.2 更高级的HTC框架:dask

虽然我们可以使用上述方法手动提交作业,但对于更复杂的HTC场景,使用更高级的框架可能更方便。dask是一个流行的Python并行计算库,它可以与SLURM和PBS集成,提供更高级的抽象和功能。

# 需要安装 dask 和 dask-jobqueue: pip install dask dask-jobqueue

try:
    from dask.distributed import Client
    from dask_jobqueue import SLURMCluster
    import dask
except ImportError:
    print("Dask and dask-jobqueue are not installed. Please install them using 'pip install dask dask-jobqueue'")
    exit()

def train_model_dask(learning_rate, model_id):
    """
    模拟训练模型的函数 (Dask版本)。

    Args:
        learning_rate (float): 学习率。
        model_id (int): 模型ID,用于区分不同的模型。

    Returns:
        str: 训练结果。
    """
    # 模拟训练过程
    print(f"Training model {model_id} with learning rate {learning_rate}")
    # 实际的训练代码应该放在这里
    # ...
    result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
    return result

if __name__ == "__main__":
    # 创建SLURM集群
    cluster = SLURMCluster(
        queue='your_queue',  # 替换为你的队列名称
        cores=4,              # 每个worker的CPU核心数
        memory='4GB',          # 每个worker的内存
        walltime='00:30:00',    # 最大运行时间
        log_directory='dask_logs', # 日志目录
        local_directory='dask_tmp' # 临时文件目录
    )

    # 启动多个worker
    cluster.scale(jobs=3)  #启动3个worker

    # 连接到Dask集群
    client = Client(cluster)

    # 学习率列表
    learning_rates = [0.001, 0.01, 0.1]

    # 使用Dask并行执行训练任务
    futures = []
    for i, learning_rate in enumerate(learning_rates):
        model_id = i + 1
        future = client.submit(train_model_dask, learning_rate, model_id)
        futures.append(future)

    # 获取训练结果
    results = [future.result() for future in futures]
    print("Training results:", results)

    # 关闭Dask集群
    client.close()
    cluster.close()

在这个例子中,我们首先创建了一个SLURMCluster对象,用于配置Dask集群。然后,我们使用client.submit函数将训练任务提交给Dask集群。Dask会自动将任务分配到集群中的worker节点上,并并行执行。

5. 注意事项与最佳实践

  • 资源请求: 准确估计每个任务所需的资源(例如,CPU、内存、运行时间),并合理设置资源请求。如果资源请求过高,可能会导致资源浪费;如果资源请求过低,可能会导致任务失败。
  • 作业依赖: 如果任务之间存在依赖关系,可以使用作业依赖来确保任务按正确的顺序执行。SLURM和PBS都提供了作业依赖的机制。
  • 错误处理: 在作业脚本中添加错误处理机制,以便在任务失败时能够及时发现并进行处理。
  • 日志记录: 在作业脚本中添加日志记录功能,以便跟踪任务执行情况和调试问题。
  • 数据管理: 合理管理输入和输出数据,避免数据丢失或损坏。
  • 模块化: 将代码模块化,方便重用和维护。
  • 版本控制: 使用版本控制系统(例如,Git)来管理代码。
  • 性能分析: 使用性能分析工具来识别瓶颈,并进行优化。

6. 总结SLURM/PBS与Python结合的优势

本文介绍了如何使用Python与SLURM/PBS集成,实现高通量计算的分布式调度。通过subprocess直接调用命令行工具,或者使用PySLURM/pbs等Python库可以方便地提交和管理作业。对于更复杂的HTC场景,可以使用dask等高级框架提供更高级的抽象和功能。合理设置资源请求、处理错误、记录日志等最佳实践可以提高HTC效率和可靠性。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注