这项工作是与 Matthew Rocklin (Anaconda)、Jim Edwards (NCAR)、Guillaume Eynard-Bontemps (CNES) 和 Loïc Estève (INRIA) 合作完成的,并得到了美国国家科学基金会 Earth Cube 项目的部分支持。dask-jobqueue 包是 Pangeo Project 的衍生项目。这篇博文先前发布在 这里
一句话概括: Dask-jobqueue 使您能够无缝地在各种作业队列系统(如 PBS、Slurm、SGE 或 LSF)中使用 dask 部署到 HPC 集群。Dask-jobqueue 提供了一种 Pythonic 用户界面,通过在 HPC 系统上提交、执行和删除单个作业来管理 dask workers/clusters。它让用户能够在大型 HPC 系统上交互式地扩展工作负载;将一个交互式的 Jupyter Notebook 变成一个在超大型数据集上进行可扩展计算的强大工具。
安装方式
conda install -c conda-forge dask-jobqueue
或者
pip install dask-jobqueue
并查看 dask-jobqueue 文档:https://jobqueue.dask.org.cn
大型高性能计算机 (HPC) 集群在整个计算科学领域无处不在。这些 HPC 系统包含强大的硬件,包括许多大型计算节点、高速互连和并行文件系统。我们在 NCAR 使用的一个此类系统名为 Cheyenne。Cheyenne 是一台相当大的机器,拥有约 15 万个核心和超过 300 TB 的总内存。
Cheyenne 是一台由 NCAR 运营的 5.34 千万亿次浮点运算高性能计算机。
这些系统经常使用作业队列系统,例如 PBS、Slurm 或 SGE,来管理众多用户提交的许多并发作业的排队和执行。“作业”是指程序在用户 HPC 系统上的某些资源集上的一次单独执行。这些作业通常通过命令行提交
qsub do_thing_a.sh
其中 do_thing_a.sh 是一个 shell 脚本,可能看起来像这样
#!/bin/bash
#PBS -N thing_a
#PBS -q premium
#PBS -A 123456789
#PBS -l select=1:ncpus=36:mem=109G
echo “doing thing A”
在这个例子中,“-N”指定了作业名称,“-q”指定了作业应该运行的队列,“-A”指定了用于作业运行时 CPU 时间计费的项目代码,“-l”指定了作业的硬件规格。每个作业队列系统配置和提交这些作业的语法略有不同。
这种接口导致了一些常见工作流模式的发展
上面列出的工作流模式都不允许在超大型数据分析上进行交互式分析。当我原型化新的处理方法时,我经常想进行交互式工作,例如在 Jupyter Notebook 中。临时编写 MPI 代码既困难又昂贵,批量作业本质上不是交互式的,而当我开始处理许多 TB 的数据时,串行方式根本无法胜任。我们的经验是,这些工作流往往相当笨拙,并且难以在应用程序之间移植,从而导致沿途重复大量的努力。
... Pangeo 项目的目标之一是促进在超大型数据集上的交互式数据分析。Pangeo 利用 Jupyter 和 dask,以及许多更具领域特定性的包,如 xarray,来实现这一点。问题在于我们没有一个特别令人满意的方法来在我们的 HPC 集群上部署 dask。
from dask_jobqueue import PBSCluster
from dask.distributed import Client
cluster = PBSCluster(cores=36,
memory="108GB",
queue="premium")
cluster.scale(10)
client = Client(cluster)
Dask-jobqueue 易于定制,可帮助用户利用先进的 HPC 功能。一个更复杂的示例(可在 NCAR 的 Cheyenne 超级计算机上运行)是
cluster = PBSCluster(cores=36,
processes=18,
memory="108GB",
project='P48500028',
queue='premium',
resource_spec='select=1:ncpus=36:mem=109G',
walltime='02:00:00',
interface='ib0',
local_directory='$TMPDIR')
在此示例中,我们指示 PBSCluster:1) 每个作业最多使用 36 个核心,2) 每个作业使用 18 个 worker 进程,3) 使用每台 109 GB 的大内存节点,4) 使用比标准更长的 walltime,5) 使用 InfiniBand 网络接口 (ib0),以及 6) 使用快速 SSD 磁盘作为其本地目录空间。
最后,Dask 提供了基于一组启发式方法“自动扩展”集群的能力。当集群需要更多 CPU 或内存时,它会扩展;当集群有未使用的资源时,它会缩减。Dask-jobqueue 通过一个简单的接口支持此功能
cluster.adapt(minimum=18, maximum=360)
在此示例中,我们告诉集群在 18 到 360 个 workers(或 1 到 20 个作业)之间自动扩展。
我们整理了一个相当全面的截屏视频,引导用户完成在 HPC 集群上设置 Jupyter 和 Dask(以及 dask-jobqueue)的所有步骤。
Dask-jobqueue 使在 HPC 集群上部署 Dask 变得容易得多。该包为常见的作业队列系统提供了一个 Pythonic 接口。它也易于定制。
自动扩展功能提供了一种在 HPC 集群上进行科学研究的全新方式。启动您的 Jupyter Notebook,实例化您的 dask 集群,然后进行科学研究——让 dask 根据计算需求决定何时扩展和缩减。我们认为这种爆发式的交互式并行计算方法带来了许多好处。
最后,在开发 dask-jobqueue 的过程中,我们遇到了一些值得一提的挑战。