提交新活动

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。

提交新闻报道

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。

订阅新闻通讯

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。
2019年8月28日

Dask 在 HPC 上的应用:案例研究

作者:

Dask 在传统 HPC 机器上的部署越来越频繁。过去一周,我亲自帮助了四个不同的团队进行安装设置。这是一个出乎意料的个性化过程,因为每台 HPC 机器都有其特殊性。每台机器都使用 SLURM/PBS/SGE/LSF/… 等作业调度器、网络文件系统和高速互连,但这些子系统在不同的机器上策略略有不同,这就是事情变得棘手的地方。

通常,如果我们同时具备以下条件,可以在大约 30 分钟内解决这些问题:

  • 熟悉机器的人,例如高级用户或 IT 管理员
  • 熟悉设置 Dask 的人

这些系统的规模范围很大。在本周的不同规模端,我同时看到了

  • 一个用于生物成像实验室内部研究工作的、小型内部 24 节点 SLURM 集群
  • Summit,世界上最强大的超级计算机

在这篇文章中,我将分享一些我处理 Summit 时遇到的一些特别麻烦的注意事项。希望这能让你了解可能会出现的情况。这些技巧可能不适用于你的特定系统,但希望能让你了解可能出错的地方以及我们如何追查问题的过程。

Power 架构

首先,Summit 是一台 IBM PowerPC 机器,这意味着在普通 Intel 芯片上编译的软件包无法工作。幸运的是,Anaconda 提供了一个与其 Power 架构兼容的发行版下载,这给了我一个很好的起点。

https://anaconda.net.cn/distribution/#linux

软件包似乎比普通发行版要旧几个月,但这我可以接受。

安装 Dask-Jobqueue 并配置基本信息

我们需要告诉 Dask 每台机器有多少核心和多少内存。这个过程相当简单,在 jobqueue.dask.org 上有详细的文档和信息丰富的屏幕录像,甚至可以通过错误消息进行自我引导。

In [1]: from dask_jobqueue import PBSCluster
In [2]: cluster = PBSCluster()
ValueError: You must specify how many cores to use per job like ``cores=8``

现在我将跳过这一部分,因为一般来说,新手用户都能处理这个问题。欲了解更多信息,可以观看此 YouTube 视频(30 分钟)。

作业脚本中的无效操作

因此,我们创建了一个包含所有信息的集群对象,调用 .scale,然后从作业调度器那里得到一些错误消息。

from dask_jobqueue import LSFCluster
cluster = LSFCluster(
cores=128,
memory="600 GB",
project="GEN119",
walltime="00:30",
)
cluster.scale(3) # 请求三个节点

命令
bsub /tmp/tmp4874eufw.sh
标准输出

典型用法
bsub [LSF arguments] jobscript
bsub [LSF arguments] -Is $SHELL
bsub -h[elp] [options]
bsub -V

注意事项
* 所有作业必须指定 walltime (-W) 和项目 ID (-P)
* 标准作业必须指定节点数 (-nnodes) 或 -ln_slots。这些作业不能指定资源字符串 (-R)。
* 专家模式作业 (-csm y) 必须指定资源字符串,并且不能指定 -nnodes 或 -ln_slots。

标准错误
ERROR: Resource strings (-R) are not supported in easy mode. Please resubmit without a resource string.
ERROR: -n is no longer supported. Please request nodes with -nnodes.
ERROR: No nodes requested. Please request nodes with -nnodes.

Dask-Jobqueue 试图根据您提供的输入生成一个合理的作业脚本,但您使用的资源管理器可能具有该集群独有的额外策略。我们通过查看生成的脚本并将其与已知可在 HPC 机器上工作的脚本进行比较来调试此问题。

print(cluster.job_script())

#!/usr/bin/env bash

#BSUB -J dask-worker
#BSUB -P GEN119
#BSUB -n 128
#BSUB -R "span[hosts=1]"
#BSUB -M 600000
#BSUB -W 00:30
JOB_ID=${LSB_JOBID%.*}

/ccs/home/mrocklin/anaconda/bin/python -m distributed.cli.dask_worker tcp://scheduler:8786 --nthreads 16 --nprocs 8 --memory-limit 75.00GB --name name --nanny --death-timeout 60 --interface ib0 --interface ib0

在与我们已知可在 Summit 上工作的现有脚本进行比较后,我们修改关键字以在头部添加和删除某些行。

cluster = LSFCluster(
cores=128,
memory="500 GB",
project="GEN119",
walltime="00:30",
job_extra=["-nnodes 1"], # <--- 新增!
header_skip=["-R", "-n ", "-M"], # <--- 新增!
)

调用 scale 后,LSF 似乎满意了。它不再输出大量的错误消息。

>>> cluster.scale(3) # 似乎通过了
>>>

工作节点未连接到调度器

从 LSF 的角度来看,一切似乎正常,但是当我们连接客户端到我们的集群时,却看不到任何东西到达。

>>> from dask.distributed import Client
>>> client = Client(cluster)
>>> client
<Client: scheduler='tcp://10.41.0.34:41107' processes=0 cores=0>

需要检查两件事,作业是否真的通过了队列?通常我们使用资源管理器操作,如 qstat、squeue 或 bjobs 来检查。也许我们的作业被困在队列里了?

$ bash
JOBID USER STAT SLOTS QUEUE START_TIME FINISH_TIME JOB_NAME
600785 mrocklin RUN 43 batch Aug 26 13:11 Aug 26 13:41 dask-worker
600786 mrocklin RUN 43 batch Aug 26 13:11 Aug 26 13:41 dask-worker
600784 mrocklin RUN 43 batch Aug 26 13:11 Aug 26 13:41 dask-worker

不,看起来它们处于运行状态。现在我们去查看它们的日志。有时很难找到作业的日志文件,但您的 IT 管理员应该知道它们在哪里。通常它们位于您运行作业的目录中,并且文件名中包含作业 ID。

$ cat dask-worker.600784.err
distributed.worker - INFO - Start worker at: tcp://128.219.134.81:44053
distributed.worker - INFO - Listening to: tcp://128.219.134.81:44053
distributed.worker - INFO - dashboard at: 128.219.134.81:34583
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 16
distributed.worker - INFO - Memory: 75.00 GB
distributed.worker - INFO - Local Directory: /autofs/nccs-svm1_home1/mrocklin/worker-ybnhk4ib
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
distributed.worker - INFO - Waiting to connect to: tcp://128.219.134.74:34153
...

因此,工作节点进程已启动,但连接调度器时遇到困难。当我们询问 IT 管理员时,他们指出这里的地址位于错误的网络接口上

128.219.134.74 <--- 不可访问的网络地址

因此,我们运行 ifconfig,找到 InfiniBand 网络接口 ib0,它的可访问范围更广。

cluster = LSFCluster(
cores=128,
memory="500 GB",
project="GEN119",
walltime="00:30",
job_extra=["-nnodes 1"],
header_skip=["-R", "-n ", "-M"],
interface="ib0", # <--- 新增!
)

我们试了一下,但还是没成功 :(

交互节点

专家用户然后说:“哦,我们的登录节点很严格,我们从交互式计算节点试一下。在那里通常会运行得更好。” 我们运行了一些神秘的 bash 命令(我从未见过两个看起来一样的命令,所以这里省略了),然后奇迹般地开始工作了。万岁!

我们运行一个小型 Dask 计算,只是为了证明我们可以做一些工作。

>>> client = Client(cluster)
>>> client.submit(lambda x: x + 1, 10).result()
11

实际上,事实证明我们最终能够使用 LSF 中稍有不同的 bsub 命令在 Summit 的登录节点上运行起来,但这里我将省略细节,因为我们正在 Dask 中修复这个问题,未来不太可能影响用户(希望如此?)。锁定的登录节点仍然是各种系统中无法建立连接的常见原因。与我交互的系统中,大约有 30% 属于这种情况。

SSH 隧道

启动并运行 Dashboard 以便查看正在发生的事情非常重要。通常我们使用 SSH 隧道来实现。大多数 HPC 用户都知道如何做,并且上面的 YouTube 屏幕录像中已经涵盖了,所以我这里跳过不提。

Jupyter Lab

今天,许多 HPC 上的交互式 Dask 用户正转向使用 JupyterLab。这个选择为他们提供了笔记本、终端、文件浏览器和 Dask 的 Dashboard,所有这些都在一个浏览器选项卡中。这大大减少了他们需要通过 SSH 连接的次数,而且借助于 Web 代理的魔力,这意味着他们只需要建立一次隧道。

我使用 conda 安装了 JupyterLab 和一个代理库,然后尝试设置 Dask JupyterLab 扩展

conda install jupyterlab
pip install jupyter-server-proxy # 将 dashboard 通过 Jupyter 的端口路由

接下来,我们将安装 Dask Labextension 到 JupyterLab 中,以便将 Dask Dashboard 直接集成到我们的 Jupyter 会话中。为此,我们需要 nodejs 来安装 JupyterLab 的扩展。考虑到 Power 架构,我以为这会很麻烦,但令人惊讶的是,它似乎也在 Anaconda 的默认 Power 通道中。

mrocklin@login2.summit $ conda install nodejs # 感谢 conda 打包开发者!

然后我安装 Dask-Labextension,它既是 Python 包也是 JavaScript 包

pip install dask_labextension
jupyter labextension install dask-labextension

然后我为我的 Jupyter 会话设置了密码

jupyter notebook password

并以网络友好的方式运行 JupyterLab

mrocklin@login2.summit $ jupyter lab --no-browser --ip="login2"

并在我的本地机器与登录节点之间建立一个 SSH 隧道

# 确保匹配登录节点的主机名和下面的 Jupyter 端口

mrocklin@my-laptop $ ssh -L 8888:login2:8888 summit.olcf.ornl.gov

我现在可以通过导航到 http://localhost:8888 从我的笔记本电脑连接到 Jupyter,在笔记本中运行上面的集群命令,一切运行良好。此外,多亏了 jupyter-server-proxy,Dask 的 Dashboard 也可在 http://localhost:8888/proxy/####/status 访问,其中 #### 是当前托管 Dask Dashboard 的端口。您可以通过查看 cluster.dashboard_link 来找到这个端口。它默认为 8787,但如果您最近在系统上启动了许多 Dask 调度器,该端口可能被占用,因此 Dask 不得不使用随机端口。

配置文件

我不想一直输入所有这些命令,所以现在我将它们放入一个单独的配置文件中,并将该文件放在 ~/.config/dask/summit.yaml(任何以 .yaml 结尾的文件都可以)。

jobqueue
lsf
cores: 128
processes: 8
memory: 500 GB
job-extra
- "-nnodes 1"
interface: ib0
header-skip
- "-R"
- "-n "
- "-M"

labextension
factory
module: "dask_jobqueue"
class: "LSFCluster"
args: []
kwargs
project: your-project-id

工作节点启动缓慢

现在事情更容易使用了,我发现自己使用该系统的频率更高了,也出现了一些其他问题。

我注意到启动一个工作节点需要很长时间。启动过程中似乎会间歇性地挂起,因此我在 distributed/__init__.py 中添加了几行代码,每秒打印出主 Python 线程的状态,以查看问题发生在哪里

import threading, sys, time
from . import profile

main_thread = threading.get_ident()

def f()
while True
time.sleep(1)
frame = sys._current_frames()[main_thread]
print("".join(profile.call_stack(frame)

thread = threading.Thread(target=f, daemon=True)
thraed.start()

这会打印出一个回溯,将我们带到 Dask 中的这段代码

if is_locking_enabled()
try
self._lock_path = os.path.join(self.dir_path + DIR_LOCK_EXT)
assert not os.path.exists(self._lock_path)
logger.debug("Locking %r...", self._lock_path)
# 在锁定文件之前避免竞态条件
# 通过获取全局锁
try
with workspace._global_lock()
self._lock_file = locket.lock_file(self._lock_path)
self._lock_file.acquire()

看起来 Dask 正在尝试使用基于文件的锁。不幸的是,一些 NFS 系统不喜欢基于文件的锁,或者处理它们非常慢。在 Summit 的情况下,计算节点实际上以只读方式挂载了 home 目录,因此基于文件的锁将直接失败。查找 is_locking_enabled 函数,我们看到它检查了一个配置值。

def is_locking_enabled()
return dask.config.get("distributed.worker.use-file-locking")

所以我们将其添加到我们的配置文件中。同时,我将 multiprocessing 方法从 forkserver 切换到 spawn(我以为这可能有所帮助,但没有),这相对无害。

distributed
worker
multiprocessing-method: spawn
use-file-locking: False

jobqueue
lsf
cores: 128
processes: 8
memory: 500 GB
job-extra
- "-nnodes 1"
interface: ib0
header-skip
- "-R"
- "-n "
- "-M"

labextension
factory
module: 'dask_jobqueue'
class: 'LSFCluster'
args: []
kwargs
project: your-project-id

结论

这篇文章概述了我在使 Dask 在一个特定的 HPC 系统上运行时遇到的许多问题。这些问题并非普遍存在,所以您可能不会遇到它们,但它们也并非非常罕见。我写这篇文章的主要目的是让人们了解 Dask 与 HPC 系统交互时可能出现的各种问题。

上述问题都不算特别严重。它们以前都发生过,并且都有可以写入配置文件的解决方案。然而,找到问题所在可能具有挑战性,并且通常需要对 Dask 和特定 HPC 系统都有经验的人员的综合专业知识。

jobqueue.dask.org/en/latest/configurations.html 上发布了一些配置文件,可能会提供一些信息。 Dask Jobqueue issue tracker 也是一个非常友好的地方,里面既有 IT 专业人士,也有 Dask 专家。

此外,提醒一下,您不需要有 HPC 机器即可使用 Dask。Dask 可以方便地部署在其他云、Hadoop 和本地系统上。请参阅 Dask 设置文档以获取更多信息。

未来工作:GPU

Summit 之所以快,是因为它有很多 GPU。我接下来将着手处理这个问题,但这可能会涵盖足够多的内容,写成一篇完整的另一篇博客文章 :)

分支

对于在家(或在 Summit 上)跟着尝试的各位。我正在使用以下开发分支:

不过希望在写这篇文章的一个月内,所有内容都能处于一个良好发布的版本状态。