提交新活动

谢谢!您的提交已收到!
哎呀!提交表单时出错了。

提交新闻报道

谢谢!您的提交已收到!
哎呀!提交表单时出错了。

订阅新闻邮件

谢谢!您的提交已收到!
哎呀!提交表单时出错了。
2018年10月8日

Dask-jobqueue

作者

这项工作是与 Matthew Rocklin (Anaconda)、Jim Edwards (NCAR)、Guillaume Eynard-Bontemps (CNES) 和 Loïc Estève (INRIA) 合作完成的,并得到了美国国家科学基金会 Earth Cube 项目的部分支持。dask-jobqueue 包是 Pangeo Project 的衍生项目。这篇博文先前发布在 这里

一句话概括: Dask-jobqueue 使您能够无缝地在各种作业队列系统(如 PBS、Slurm、SGE 或 LSF)中使用 dask 部署到 HPC 集群。Dask-jobqueue 提供了一种 Pythonic 用户界面,通过在 HPC 系统上提交、执行和删除单个作业来管理 dask workers/clusters。它让用户能够在大型 HPC 系统上交互式地扩展工作负载;将一个交互式的 Jupyter Notebook 变成一个在超大型数据集上进行可扩展计算的强大工具。

安装方式

conda install -c conda-forge dask-jobqueue

或者

pip install dask-jobqueue

并查看 dask-jobqueue 文档:https://jobqueue.dask.org.cn

引言

大型高性能计算机 (HPC) 集群在整个计算科学领域无处不在。这些 HPC 系统包含强大的硬件,包括许多大型计算节点、高速互连和并行文件系统。我们在 NCAR 使用的一个此类系统名为 Cheyenne。Cheyenne 是一台相当大的机器,拥有约 15 万个核心和超过 300 TB 的总内存。

Cheyenne is a 5.34-petaflops, high-performance computer operated by NCAR.

Cheyenne 是一台由 NCAR 运营的 5.34 千万亿次浮点运算高性能计算机。

这些系统经常使用作业队列系统,例如 PBS、Slurm 或 SGE,来管理众多用户提交的许多并发作业的排队和执行。“作业”是指程序在用户 HPC 系统上的某些资源集上的一次单独执行。这些作业通常通过命令行提交

qsub do_thing_a.sh

其中 do_thing_a.sh 是一个 shell 脚本,可能看起来像这样

#!/bin/bash
#PBS -N thing_a
#PBS -q premium
#PBS -A 123456789
#PBS -l select=1:ncpus=36:mem=109G

echo “doing thing A”

在这个例子中,“-N”指定了作业名称,“-q”指定了作业应该运行的队列,“-A”指定了用于作业运行时 CPU 时间计费的项目代码,“-l”指定了作业的硬件规格。每个作业队列系统配置和提交这些作业的语法略有不同。

这种接口导致了一些常见工作流模式的发展

  1. 如果要扩展,就用 MPI。MPI 是 Message Passing Interface 的缩写。它是一种广泛采用的接口,允许在传统 HPC 集群上进行并行计算。许多大型计算模型是用 C 和 Fortran 等语言编写的,并使用 MPI 来管理它们的并行执行。对于经验丰富的人来说,在扩展复杂计算时,这是首选的解决方案。
  2. 批量处理。科学处理流程中很常见包含一些步骤,可以通过并行提交多个作业轻松地进行并行化。也许您想用略微不同的输入运行“do_thing_a.sh”500次——这很简单,只需单独提交所有作业(或在某些队列系统中称为“array-job”)。
  3. 串行也行。如今计算机相当快,对吧?也许您根本不需要并行化您的编程。好吧,那就保持串行,在作业运行时去喝杯咖啡。

问题

上面列出的工作流模式都不允许在超大型数据分析上进行交互式分析。当我原型化新的处理方法时,我经常想进行交互式工作,例如在 Jupyter Notebook 中。临时编写 MPI 代码既困难又昂贵,批量作业本质上不是交互式的,而当我开始处理许多 TB 的数据时,串行方式根本无法胜任。我们的经验是,这些工作流往往相当笨拙,并且难以在应用程序之间移植,从而导致沿途重复大量的努力。

... Pangeo 项目的目标之一是促进在超大型数据集上的交互式数据分析。Pangeo 利用 Jupyter 和 dask,以及许多更具领域特定性的包,如 xarray,来实现这一点。问题在于我们没有一个特别令人满意的方法来在我们的 HPC 集群上部署 dask。

系统

  • Jupyter Notebooks 是支持交互式代码执行、图形和动画显示以及内联解释性文本和公式的 Web 应用程序。它们正迅速成为 Python 中交互式计算的标准开源格式。
  • Dask 是一个用于并行计算的库,它与 Python 现有的科学软件生态系统(包括 NumPyPandasScikit-Learn 和 xarray 等库)良好协作。在许多情况下,它为用户提供了将现有工作流快速扩展到更大规模应用程序的能力。*Dask-distributed* 是 dask 的一个扩展,可促进跨多台计算机的并行执行。
  • Dask-jobqueue 是我们构建的一个新的 Python 包,旨在促进 dask 在 HPC 集群上的部署,并与多种作业队列系统交互。其用法简洁且具有 Python 风格。

from dask_jobqueue import PBSCluster
from dask.distributed import Client

cluster = PBSCluster(cores=36,
memory="108GB",
queue="premium")
cluster.scale(10)
client = Client(cluster)

幕后发生了什么?

  1. 在调用 PBSCluster() 时,我们正在告诉 dask-jobqueue 我们希望如何配置每个作业。在这种情况下,我们将每个作业设置为拥有 1 个 Worker,每个 Worker 使用 36 个核心(线程)和 108 GB 内存。我们还告诉 PBS 队列系统,我们希望将此作业提交到“premium”队列。这一步还会启动一个 Scheduler 来管理我们稍后添加的 workers。
  2. 直到我们调用 cluster.scale() 方法时,我们才与 PBS 系统交互。在这里,我们启动 10 个 workers,或等效地启动 10 个 PBS 作业。对于每个作业,dask-jobqueue 会创建一个类似于上面所示的 shell 命令(只是调用 dask-worker 而不是 echo),并通过子进程调用提交作业。
  3. 最后,通过实例化 Client 类连接到集群。从这里开始,我们代码的其余部分看起来就像使用 dask 的本地调度器一样。

Dask-jobqueue 易于定制,可帮助用户利用先进的 HPC 功能。一个更复杂的示例(可在 NCAR 的 Cheyenne 超级计算机上运行)是

cluster = PBSCluster(cores=36,
processes=18,
memory="108GB",
project='P48500028',
queue='premium',
resource_spec='select=1:ncpus=36:mem=109G',
walltime='02:00:00',
interface='ib0',
local_directory='$TMPDIR')

在此示例中,我们指示 PBSCluster:1) 每个作业最多使用 36 个核心,2) 每个作业使用 18 个 worker 进程,3) 使用每台 109 GB 的大内存节点,4) 使用比标准更长的 walltime,5) 使用 InfiniBand 网络接口 (ib0),以及 6) 使用快速 SSD 磁盘作为其本地目录空间。

最后,Dask 提供了基于一组启发式方法“自动扩展”集群的能力。当集群需要更多 CPU 或内存时,它会扩展;当集群有未使用的资源时,它会缩减。Dask-jobqueue 通过一个简单的接口支持此功能

cluster.adapt(minimum=18, maximum=360)

在此示例中,我们告诉集群在 18 到 360 个 workers(或 1 到 20 个作业)之间自动扩展。

演示

我们整理了一个相当全面的截屏视频,引导用户完成在 HPC 集群上设置 Jupyter 和 Dask(以及 dask-jobqueue)的所有步骤。

结论

Dask-jobqueue 使在 HPC 集群上部署 Dask 变得容易得多。该包为常见的作业队列系统提供了一个 Pythonic 接口。它也易于定制。

自动扩展功能提供了一种在 HPC 集群上进行科学研究的全新方式。启动您的 Jupyter Notebook,实例化您的 dask 集群,然后进行科学研究——让 dask 根据计算需求决定何时扩展和缩减。我们认为这种爆发式的交互式并行计算方法带来了许多好处。

最后,在开发 dask-jobqueue 的过程中,我们遇到了一些值得一提的挑战。

  • 队列系统高度可定制。系统管理员似乎对其各自的队列系统实现有很大的控制权。实际上,这意味着通常很难同时涵盖特定队列系统的所有变体。我们通常发现事情似乎足够灵活,并在不够灵活的情况下欢迎反馈。
  • CI 测试需要相当多的设置工作。使用 dask-jobqueue 的目标环境是现有的 HPC 集群。为了方便 dask-jobqueue 的持续集成测试,我们不得不配置多个队列系统(PBS、Slurm、SGE)以使用 Travis CI 在 docker 中运行。这是一项艰巨的任务,我们仍在努力。
  • 我们构建 dask-jobqueue 以在 dask-deploy 框架中运行。如果您熟悉 dask-kubernetesdask-yarn,您也会在 dask-jobqueue 中认出基本语法。这些 dask 部署包的同时开发最近引发了一些重要的协调讨论(例如 https://github.com/dask/distributed/issues/2235)。