好的,下面开始正文。
Python实现高通量计算(HTC)的分布式调度:利用SLURM/PBS管理ML任务
大家好!今天我们来探讨如何利用Python实现高通量计算(HTC)的分布式调度,并重点介绍如何使用SLURM和PBS这类作业调度系统来管理机器学习(ML)任务。HTC旨在通过大量计算资源并行处理大量独立任务,非常适合参数扫描、模型训练等ML场景。
1. 高通量计算(HTC)与机器学习
高通量计算的核心思想是并行处理大量相对独立的任务。在机器学习领域,HTC有诸多应用场景:
- 超参数优化: 尝试不同的超参数组合来训练模型,每组超参数对应一个独立的训练任务。
- 模型集成: 训练多个不同的模型(例如,使用不同的算法或数据集子集),然后将它们的预测结果进行集成。
- 交叉验证: 将数据集分割成多个子集,并使用不同的子集进行训练和验证。
- 数据预处理: 对大量数据进行并行处理,例如图像处理、文本清洗等。
2. 分布式调度系统:SLURM和PBS
为了有效地利用集群资源进行HTC,我们需要使用作业调度系统。SLURM (Simple Linux Utility for Resource Management) 和PBS (Portable Batch System) 是两种常见的开源作业调度系统。它们负责资源分配、任务调度和监控,使得我们可以方便地提交、管理和监控大量的计算任务。
- SLURM: 一个功能强大的开源集群管理和作业调度系统,广泛应用于高性能计算(HPC)环境中。
- PBS: 另一个流行的作业调度系统,也常用于HPC集群。虽然原始的PBS是商业软件,但现在也有开源实现,例如OpenPBS。
这两种系统的大致工作流程如下:
- 用户编写作业脚本,描述任务需求(例如,CPU核心数、内存、运行时间等)。
- 用户将作业脚本提交给调度系统。
- 调度系统根据集群资源情况和作业优先级,将作业分配到合适的计算节点。
- 计算节点执行作业脚本,完成计算任务。
- 调度系统监控作业执行情况,并提供日志和报告。
3. Python与SLURM/PBS的集成
Python可以很好地与SLURM和PBS集成,方便我们编写和提交作业。主要有两种方法:
- 直接调用命令行工具: 使用Python的
subprocess模块调用SLURM或PBS的命令行工具(例如,sbatch、qsub)。 - 使用Python库: 有一些Python库提供了更高级的接口来与SLURM或PBS交互,例如
PySLURM和pbs.
接下来,我们将分别介绍这两种方法,并提供示例代码。
3.1 使用subprocess调用命令行工具
这是最直接的方法,适用于简单的作业提交。
import subprocess
import os
def submit_job(script_path, job_name, cpus, memory):
"""
提交SLURM作业。
Args:
script_path (str): 作业脚本的路径。
job_name (str): 作业名称。
cpus (int): 需要的CPU核心数。
memory (str): 需要的内存 (例如, "4G")。
"""
try:
# 构建sbatch命令
cmd = [
"sbatch",
"--job-name", job_name,
"--cpus-per-task", str(cpus),
"--mem", memory,
script_path
]
# 执行命令并获取输出
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Job submitted successfully. Output:n{result.stdout}")
except subprocess.CalledProcessError as e:
print(f"Error submitting job: {e.stderr}")
def create_job_script(script_content, script_path):
"""
创建SLURM作业脚本。
Args:
script_content (str): 脚本内容。
script_path (str): 脚本保存路径。
"""
with open(script_path, "w") as f:
f.write(script_content)
if __name__ == "__main__":
# 创建一个简单的作业脚本
script_content = """#!/bin/bash
#SBATCH --output=output.txt
#SBATCH --error=error.txt
echo "Hello from SLURM!"
date
hostname
"""
script_path = "my_job.sh"
create_job_script(script_content, script_path)
# 提交作业
submit_job(script_path, "my_test_job", 4, "4G")
这个例子首先创建了一个简单的SLURM作业脚本my_job.sh,然后使用submit_job函数将其提交给SLURM。submit_job函数使用subprocess.run执行sbatch命令,并将作业脚本的路径作为参数传递给sbatch。capture_output=True 捕获输出和错误信息,text=True确保输出以文本形式返回,check=True 使得如果命令执行失败,会抛出异常。
对于PBS,可以使用类似的方法:
import subprocess
import os
def submit_job_pbs(script_path, job_name, cpus, memory):
"""
提交PBS作业。
Args:
script_path (str): 作业脚本的路径。
job_name (str): 作业名称。
cpus (int): 需要的CPU核心数。
memory (str): 需要的内存 (例如, "4G")。
"""
try:
# 构建qsub命令
cmd = [
"qsub",
"-N", job_name,
"-l", f"select=1:ncpus={cpus}:mem={memory}",
script_path
]
# 执行命令并获取输出
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
print(f"Job submitted successfully. Output:n{result.stdout}")
except subprocess.CalledProcessError as e:
print(f"Error submitting job: {e.stderr}")
def create_job_script_pbs(script_content, script_path):
"""
创建PBS作业脚本。
Args:
script_content (str): 脚本内容。
script_path (str): 脚本保存路径。
"""
with open(script_path, "w") as f:
f.write(script_content)
if __name__ == "__main__":
# 创建一个简单的作业脚本
script_content = """#!/bin/bash
#PBS -o output.txt
#PBS -e error.txt
echo "Hello from PBS!"
date
hostname
"""
script_path = "my_job_pbs.sh"
create_job_script_pbs(script_content, script_path)
# 提交作业
submit_job_pbs(script_path, "my_test_job_pbs", 4, "4GB")
这个例子与SLURM的例子类似,只是使用了qsub命令和不同的作业脚本格式。
3.2 使用Python库
使用Python库可以提供更方便的接口来管理作业。
- PySLURM: 一个用于与SLURM交互的Python库。
- pbs: 一个用于与PBS交互的Python库。 (注意: 可能需要安装
pbs_python)
使用PySLURM示例:
# 需要安装 pyslurm: pip install pyslurm
try:
import pyslurm
except ImportError:
print("PySLURM is not installed. Please install it using 'pip install pyslurm'")
exit()
def submit_job_pyslurm(job_name, script_path, cpus, memory):
"""
使用PySLURM提交作业。
Args:
job_name (str): 作业名称。
script_path (str): 作业脚本的路径。
cpus (int): 需要的CPU核心数。
memory (str): 需要的内存 (例如, "4G")。
"""
try:
job = pyslurm.job()
job.create(
job_name=job_name,
script=script_path,
cpus_per_task=cpus,
memory=memory,
output="output.txt",
error="error.txt"
)
print(f"Job submitted successfully. Job ID: {job.job_id}")
except Exception as e:
print(f"Error submitting job: {e}")
if __name__ == "__main__":
# 创建一个简单的作业脚本
script_content = """#!/bin/bash
echo "Hello from SLURM using PySLURM!"
date
hostname
"""
script_path = "my_job_pyslurm.sh"
with open(script_path, "w") as f:
f.write(script_content)
# 提交作业
submit_job_pyslurm("my_test_job_pyslurm", script_path, 4, "4G")
使用pbs库示例:
# 需要安装 pbs_python: pip install pbs_python
try:
import pbs
except ImportError:
print("pbs_python is not installed. Please install it using 'pip install pbs_python'")
exit()
def submit_job_pbs_lib(job_name, script_path, cpus, memory):
"""
使用 pbs 库提交作业。
Args:
job_name (str): 作业名称。
script_path (str): 作业脚本的路径。
cpus (int): 需要的CPU核心数。
memory (str): 需要的内存 (例如, "4G")。
"""
try:
conn = pbs.pbs()
job = pbs.Job(conn, script_path)
job.N = job_name
job.l = f"select=1:ncpus={cpus}:mem={memory}"
job.o = "output.txt"
job.e = "error.txt"
job.submit()
print(f"Job submitted successfully. Job ID: {job.id}")
conn.disconnect()
except Exception as e:
print(f"Error submitting job: {e}")
if __name__ == "__main__":
# 创建一个简单的作业脚本
script_content = """#!/bin/bash
echo "Hello from PBS using pbs_python!"
date
hostname
"""
script_path = "my_job_pbs_lib.sh"
with open(script_path, "w") as f:
f.write(script_content)
# 提交作业
submit_job_pbs_lib("my_test_job_pbs_lib", script_path, 4, "4GB")
这些例子展示了如何使用Python库来提交作业。这些库通常提供了更高级的接口,可以更方便地设置作业参数和管理作业。
4. 高通量计算的Python实现
现在,我们来讨论如何使用Python实现高通量计算。核心思想是将大量的独立任务分解成小的作业,并将它们提交给SLURM或PBS。
4.1 参数扫描示例
假设我们要训练一个机器学习模型,并对学习率进行参数扫描。我们可以将不同的学习率组合分配到不同的作业中。
import subprocess
import os
import itertools
def train_model(learning_rate, model_id):
"""
模拟训练模型的函数。
Args:
learning_rate (float): 学习率。
model_id (int): 模型ID,用于区分不同的模型。
Returns:
str: 训练结果。
"""
# 模拟训练过程
print(f"Training model {model_id} with learning rate {learning_rate}")
# 实际的训练代码应该放在这里
# ...
result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
return result
def create_training_script(learning_rate, model_id, script_path):
"""
创建训练脚本。
Args:
learning_rate (float): 学习率。
model_id (int): 模型ID。
script_path (str): 脚本保存路径。
"""
script_content = f"""#!/bin/bash
#SBATCH --output=output_{model_id}.txt
#SBATCH --error=error_{model_id}.txt
python -c "import train; print(train.train_model({learning_rate}, {model_id}))"
"""
with open(script_path, "w") as f:
f.write(script_content)
def submit_training_jobs(learning_rates, cpus, memory):
"""
提交训练作业。
Args:
learning_rates (list): 学习率列表。
cpus (int): 需要的CPU核心数。
memory (str): 需要的内存 (例如, "4G")。
"""
for i, learning_rate in enumerate(learning_rates):
model_id = i + 1
script_path = f"train_model_{model_id}.sh"
create_training_script(learning_rate, model_id, script_path)
submit_job(script_path, f"train_model_{model_id}", cpus, memory)
if __name__ == "__main__":
# 学习率列表
learning_rates = [0.001, 0.01, 0.1]
# 提交训练作业
submit_training_jobs(learning_rates, 4, "4G")
# 创建一个名为train.py的文件,包含train_model函数
with open("train.py", "w") as f:
f.write("""
def train_model(learning_rate, model_id):
print(f"Training model {model_id} with learning rate {learning_rate}")
result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
return result
""")
在这个例子中,我们首先定义了一个train_model函数,用于模拟训练模型的过程。然后,我们使用create_training_script函数为每个学习率创建一个独立的作业脚本。最后,我们使用submit_training_jobs函数将所有的作业提交给SLURM。
4.2 更高级的HTC框架:dask
虽然我们可以使用上述方法手动提交作业,但对于更复杂的HTC场景,使用更高级的框架可能更方便。dask是一个流行的Python并行计算库,它可以与SLURM和PBS集成,提供更高级的抽象和功能。
# 需要安装 dask 和 dask-jobqueue: pip install dask dask-jobqueue
try:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import dask
except ImportError:
print("Dask and dask-jobqueue are not installed. Please install them using 'pip install dask dask-jobqueue'")
exit()
def train_model_dask(learning_rate, model_id):
"""
模拟训练模型的函数 (Dask版本)。
Args:
learning_rate (float): 学习率。
model_id (int): 模型ID,用于区分不同的模型。
Returns:
str: 训练结果。
"""
# 模拟训练过程
print(f"Training model {model_id} with learning rate {learning_rate}")
# 实际的训练代码应该放在这里
# ...
result = f"Model {model_id} trained with learning rate {learning_rate} successfully."
return result
if __name__ == "__main__":
# 创建SLURM集群
cluster = SLURMCluster(
queue='your_queue', # 替换为你的队列名称
cores=4, # 每个worker的CPU核心数
memory='4GB', # 每个worker的内存
walltime='00:30:00', # 最大运行时间
log_directory='dask_logs', # 日志目录
local_directory='dask_tmp' # 临时文件目录
)
# 启动多个worker
cluster.scale(jobs=3) #启动3个worker
# 连接到Dask集群
client = Client(cluster)
# 学习率列表
learning_rates = [0.001, 0.01, 0.1]
# 使用Dask并行执行训练任务
futures = []
for i, learning_rate in enumerate(learning_rates):
model_id = i + 1
future = client.submit(train_model_dask, learning_rate, model_id)
futures.append(future)
# 获取训练结果
results = [future.result() for future in futures]
print("Training results:", results)
# 关闭Dask集群
client.close()
cluster.close()
在这个例子中,我们首先创建了一个SLURMCluster对象,用于配置Dask集群。然后,我们使用client.submit函数将训练任务提交给Dask集群。Dask会自动将任务分配到集群中的worker节点上,并并行执行。
5. 注意事项与最佳实践
- 资源请求: 准确估计每个任务所需的资源(例如,CPU、内存、运行时间),并合理设置资源请求。如果资源请求过高,可能会导致资源浪费;如果资源请求过低,可能会导致任务失败。
- 作业依赖: 如果任务之间存在依赖关系,可以使用作业依赖来确保任务按正确的顺序执行。SLURM和PBS都提供了作业依赖的机制。
- 错误处理: 在作业脚本中添加错误处理机制,以便在任务失败时能够及时发现并进行处理。
- 日志记录: 在作业脚本中添加日志记录功能,以便跟踪任务执行情况和调试问题。
- 数据管理: 合理管理输入和输出数据,避免数据丢失或损坏。
- 模块化: 将代码模块化,方便重用和维护。
- 版本控制: 使用版本控制系统(例如,Git)来管理代码。
- 性能分析: 使用性能分析工具来识别瓶颈,并进行优化。
6. 总结SLURM/PBS与Python结合的优势
本文介绍了如何使用Python与SLURM/PBS集成,实现高通量计算的分布式调度。通过subprocess直接调用命令行工具,或者使用PySLURM/pbs等Python库可以方便地提交和管理作业。对于更复杂的HTC场景,可以使用dask等高级框架提供更高级的抽象和功能。合理设置资源请求、处理错误、记录日志等最佳实践可以提高HTC效率和可靠性。
更多IT精英技术系列讲座,到智猿学院