这篇文章是关于实验性且快速变化的软件的。本文中的代码示例不应被依赖于将来仍然有效。
这篇文章讨论了如何将高性能网络库 UCX 连接到并行 Python 库 Dask,以加速通信密集型工作负载,特别是在使用 GPU 时。
此外,我们在 DGX 上完成了这项工作,DGX 是一种高端多 CPU 多 GPU 机器,具有复杂的内部网络。在这种环境中工作有助于改进 Dask 在异构环境中的设置,使其能够针对不同的网卡、CPU 插槽、GPU 等进行优化。
许多分布式计算工作负载受限于通信。这在以下情况下很常见:
随着我们加速计算,例如使用 GPU 进行计算时,通信成为更大的瓶颈。
历史上,高性能通信只能使用 MPI 或定制解决方案实现。本文描述了一项努力,旨在在保持 Dask 这种动态系统的易编程性和可访问性的同时,接近 MPI 的通信带宽。
为了在 Dask 中获得高性能网络,我们使用 Python 对 UCX 进行了封装,然后将其连接到 Dask。
的OpenUCX 项目为各种高性能网络库(如 InfiniBand、传统网络协议如 TCP/共享内存以及 GPU 特定协议如 NVLink)提供了统一的 API。它是位于 OpenMPI(目前 OpenUCX 的主要用户)等系统之下的一层,用于确定要使用的网络系统。
今天的 Python 用户除了通过有时不理想的 MPI 之外,很难访问这些网络库。(尝试在 PyPI 上搜索“infiniband”)。
这促使我们创建了UCX-Py。UCX-Py 是 UCX C 库的 Python 封装,提供了一种 Pythonic API,既有适合传统 HPC 程序的阻塞语法,也有适合更多并发程序(如 Dask)的非阻塞 async/await 语法。有关 UCX 的更多信息,我建议观看 Akshay 在 2019 年 GPU Technology Conference 上的UCX 演讲。
注意:UCX-Py 主要由 Akshay Venkatesh (UCX, NVIDIA)、Tom Augspurger (Dask, Pandas, Anaconda) 和 Ben Zaitlen (NVIDIA, RAPIDS, Dask) 开发。
然后,我们扩展了 Dask 的通信以可选地使用 UCX。如果您安装了 UCX 和 UCX-Py,那么您可以在地址中使用 ucx:// 协议,或者在启动时使用 --protocol ucx 标志,如下所示。
$ dask-scheduler --protocol ucx
Scheduler started at ucx://127.0.0.1:8786
$ dask-worker ucx://127.0.0.1:8786
>>> from dask.distributed import Client
>>> client = Client('ucx://127.0.0.1:8786')
我们修改了使用 Dask 和 CuPy 的SVD 基准测试,使其使用 UCX 协议进行进程间通信,并在半个 DGX 机器上运行,使用了四个 GPU。以下是启用 UCX 的代码的最小实现:
import cupy
import dask
import dask.array
from dask.distributed import Client, wait
from dask_cuda import DGX
# 定义 DGX 集群和客户端
cluster = DGX(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3])
client = Client(cluster)
# 创建随机数据
rs = dask.array.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((1000000, 1000), chunks=(10000, 1000))
x = x.persist()
# 执行分布式 SVD
u, s, v = dask.array.linalg.svd(x)
u, s, v = dask.persist(u, s, v)
_ = wait([u, s, v])
通过使用 UCX,整体通信时间减少了一个数量级。为了生成下面的任务流图,基准测试在 CUDA_VISIBLE_DEVICES=[0,1,2,3] 的 DGX-1 上运行。很明显,代表进程间通信的红色任务条被显著压缩了。以前需要 500 毫秒到 1 秒的通信现在只需要大约 20 毫秒。
使用 UCX 之前:
使用 UCX 之后:
在使用 NVLink 的 GPU 上,一对 GPU 之间的吞吐量可以达到 5-10 GB/秒。在 CPU 上,这会下降到 1-2 GB/秒(这似乎远低于最优)。这些速度会影响所有 Dask 工作负载(数组、dataframe、xarray、ML 等),但当存在适当的硬件时,可能会出现其他瓶颈,例如处理文本或类似 JSON 数据时的序列化。
当然,这取决于是否存在这种高级网络硬件。在上面的 GPU 示例中,我们主要依赖 NVLink,但我们在 HPC InfiniBand 网络上,甚至在单台笔记本电脑上使用共享内存传输时,也能获得改进的性能。
上面的示例在 DGX 机器上运行,该机器包括所有这些传输方式以及更多(以及众多 GPU)。
上述使用的测试机器是DGX-1,它有八个 GPU、两个 CPU 插槽、四个 Infiniband 网卡以及复杂的 NVLink 配置。这是非均匀硬件的一个很好的例子。某些 CPU 更接近某些 GPU 和网卡,了解这种接近性对性能有着数量级的影响。这种情况并非 DGX 机器独有。当出现以下情况时,也会发生同样的情况:
使用 DGX 进行工作很有趣,因为它迫使我们开始思考异构性,并使使用 Dask 指定复杂的部署场景变得更容易。
这是显示 DGX-1 中 GPU、CPU 和 Infiniband 卡之间如何相互连接的图示:
这是 nvidia-smi 的输出,显示了 NVLink、网络和 CPU 亲和性结构(这与上面显示的结构基本正交)。
$ nvidia-smi topo -m
GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 ib0 ib1 ib2 ib3
GPU0 X NV1 NV1 NV2 NV2 SYS SYS SYS PIX SYS PHB SYS
GPU1 NV1 X NV2 NV1 SYS NV2 SYS SYS PIX SYS PHB SYS
GPU2 NV1 NV2 X NV2 SYS SYS NV1 SYS PHB SYS PIX SYS
GPU3 NV2 NV1 NV2 X SYS SYS SYS NV1 PHB SYS PIX SYS
GPU4 NV2 SYS SYS SYS X NV1 NV1 NV2 SYS PIX SYS PHB
GPU5 SYS NV2 SYS SYS NV1 X NV2 NV1 SYS PIX SYS PHB
GPU6 SYS SYS NV1 SYS NV1 NV2 X NV2 SYS PHB SYS PIX
GPU7 SYS SYS SYS NV1 NV2 NV1 NV2 X SYS PHB SYS PIX
ib0 PIX PIX PHB PHB SYS SYS SYS SYS X SYS PHB SYS
ib1 SYS SYS SYS SYS PIX PIX PHB PHB SYS X SYS PHB
ib2 PHB PHB PIX PIX SYS SYS SYS SYS PHB SYS X SYS
ib3 SYS SYS SYS SYS PHB PHB PIX PIX SYS PHB SYS X
CPU 亲和性
GPU0 0-19,40-59
GPU1 0-19,40-59
GPU2 0-19,40-59
GPU3 0-19,40-59
GPU4 20-39,60-79
GPU5 20-39,60-79
GPU6 20-39,60-79
GPU7 20-39,60-79
图例
X = 自身
SYS = 遍历 PCIe 以及 NUMA 节点之间的 SMP 互连
NODE = 遍历 PCIe 以及 PCIe 主桥之间的互连
PHB = 遍历 PCIe 以及 PCIe 主桥(通常是 CPU)
PXB = 遍历多个 PCIe 交换机(不经过 PCIe 主桥)
PIX = 遍历单个 PCIe 交换机
NV# = 遍历 # 个绑定的 NVLink 集合
DGX 最初是为深度学习应用设计的。上述复杂的网络基础设施可以被像NCCL 这样专业的 NVIDIA 网络库很好地利用,NCCL 知道如何正确路由数据,但这对于像 Dask 这样更通用的系统来说是适应的挑战。
幸运的是,在应对这一挑战时,我们能够清理 Dask 中的许多相关问题。特别是现在我们可以:
通过这些更改,我们现在可以在下面的 Python 函数中将大部分 DGX 结构描述为配置:
import os
from dask.distributed import Nanny, SpecCluster, Scheduler
from distributed.worker import TOTAL_MEMORY
from dask_cuda.local_cuda_cluster import cuda_visible_devices
class CPUAffinity
""" 用于固定 CPU 亲和性的 Worker 插件 """
def __init__(self, cores)
self.cores = cores
def setup(self, worker=None)
os.sched_setaffinity(0, self.cores)
affinity = { # 参见 nvidia-smi topo -m
0: list(range(0, 20)) + list(range(40, 60)),
1: list(range(0, 20)) + list(range(40, 60)),
2: list(range(0, 20)) + list(range(40, 60)),
3: list(range(0, 20)) + list(range(40, 60)),
4: list(range(20, 40)) + list(range(60, 79)),
5: list(range(20, 40)) + list(range(60, 79)),
6: list(range(20, 40)) + list(range(60, 79)),
7: list(range(20, 40)) + list(range(60, 79)),
}
def DGX(
interface="ib",
dashboard_address=":8787",
threads_per_worker=1,
silence_logs=True,
CUDA_VISIBLE_DEVICES=None,
**kwargs
):
""" 用于 DGX 1 机器的本地集群
NVIDIA 的 DGX-1 机器具有复杂的 CPU 映射架构,
GPU 和网络硬件。此函数创建一个本地集群,
尽量尊重此硬件配置。
它为每个 GPU 创建一个 Dask worker 进程,并为每个 worker
进程分配正确的 CPU 核心和网卡,
以最大限度地提高性能。
话虽如此,事情并非完美。今天的 DGX 在某些 GPU 集合之间具有非常高的
性能,而在其他 GPU 集合之间则不是。一个 Dask DGX
集群,如果仅使用计算机中某些紧密耦合的部分,
其带宽将显著高于在整个机器上的部署。
"""
参数
----------
interface: str
Infiniband 网卡的接口前缀。这通常是
"ib" 或 "bond"。我们将根据情况添加数字后缀 0,1,2,3。
默认值为 "ib"。
dashboard_address: str
调度器仪表盘的地址。默认值为 ":8787"。
CUDA_VISIBLE_DEVICES: str
字符串,例如 ``"0,1,2,3"`` 或 ``[0, 1, 2, 3]``,用于限制
活动到不同的 GPU。
示例
--------
>>> from dask_cuda import DGX
>>> from dask.distributed import Client
>>> cluster = DGX(interface='ib')
>>> client = Client(cluster)
"""
if CUDA_VISIBLE_DEVICES is None
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0,1,2,3,4,5,6,7")
if isinstance(CUDA_VISIBLE_DEVICES, str)
CUDA_VISIBLE_DEVICES = CUDA_VISIBLE_DEVICES.split(",")
CUDA_VISIBLE_DEVICES = list(map(int, CUDA_VISIBLE_DEVICES))
memory_limit = TOTAL_MEMORY / 8
spec = {
i: {
"cls": Nanny,
"options": {
"env": {
"CUDA_VISIBLE_DEVICES": cuda_visible_devices(
ii, CUDA_VISIBLE_DEVICES
),
"UCX_TLS": "rc,cuda_copy,cuda_ipc",
},
"interface": interface + str(i // 2),
"protocol": "ucx",
"ncores": threads_per_worker,
"data": dict,
"preload": ["dask_cuda.initialize_context"],
"dashboard_address": ":0",
"plugins": [CPUAffinity(affinity[i])],
"silence_logs": silence_logs,
"memory_limit": memory_limit,
},
}
for ii, i in enumerate(CUDA_VISIBLE_DEVICES)
}
scheduler = {
"cls": Scheduler,
"options": {
"interface": interface + str(CUDA_VISIBLE_DEVICES[0] // 2),
"protocol": "ucx",
"dashboard_address": dashboard_address,
},
}
return SpecCluster(
workers=spec,
scheduler=scheduler,
silence_logs=silence_logs,
**kwargs
)
然而,我们还没有完全解决 NVLink 结构的问题。Dask 调度器目前仍然假定 worker 之间具有均匀的带宽。我们已经开始朝着改变这一点迈出小步,但尚未完成(这对于考虑机架内或跨数据中心部署的人员也很有用)。
像往常一样,在解决一个高度特定的问题时,我们能够解决许多悬而未决的通用特性,这使得我们的特定问题变得易于实现。
在过去的几个月里,我们付出了巨大的努力来使上述所有内容正常工作。特别是我们……
结果相当不错,特别是对于通信密集型工作负载。然而,还有很多工作要做。本节详细介绍了我们目前正在考虑如何继续这项工作。
执行这些实验目前依赖于几个仓库的开发分支。本节包括我当前的设置。
conda create -n ucx python=3.7 libtool cmake automake autoconf cython bokeh pytest pkg-config ipython dask numba -y
注意:由于某种原因,使用 conda-forge 会导致下面的 autogen 步骤失败。
# 克隆 UCX 仓库并获取分支
git clone https://github.com/openucx/ucx
cd ucx
git remote add Akshay-Venkatesh git@github.com:Akshay-Venkatesh/ucx.git
git remote update Akshay-Venkatesh
git checkout ucx-cuda
# 构建
git clean -xfd
export CUDA_HOME=/usr/local/cuda-9.2/
./autogen.sh
mkdir build
cd build
../configure --prefix=$CONDA_PREFIX --enable-debug --with-cuda=$CUDA_HOME --enable-mt --disable-cma CPPFLAGS="-I//usr/local/cuda-9.2/include"
make -j install
# 验证
ucx_info -d
which ucx_info # 验证这是否在 conda 环境中
# 验证是否看到 NVLink 速度
ucx_perftest -t tag_bw -m cuda -s 1048576 -n 1000 & ucx_perftest dgx15 -t tag_bw -m cuda -s 1048576 -n 1000
git clone git@github.com:rapidsai/ucx-py
cd ucx-py
export UCX_PATH=$CONDA_PREFIX
make install
git clone git@github.com:dask/dask.git
cd dask
pip install -e .
cd ..
git clone git@github.com:dask/distributed.git
cd distributed
pip install -e .
cd ..
pip install cupy-cuda92==6
conda install -c rapidsai-nightly -c conda-forge -c numba cudf dask-cudf cudatoolkit=9.2
conda install ipykernel jupyterlab nb_conda_kernels nodejs
对于 Dask 仪表盘
pip install dask_labextension
jupyter labextension install dask-labextension
我一直在使用以下基准测试来测试通信。它分配一个分块的 Dask 数组,然后将其与其转置相加,这会强制进行大量通信,但计算量不大。
from collections import defaultdict
import asyncio
import time
import numpy as np
from pprint import pprint
import cupy
import dask.array as da
from dask.distributed import Client, wait
from distributed.utils import format_time, format_bytes
async def f()
# 在本地机器上设置 worker
async with DGX(asynchronous=True, silence_logs=True) as cluster
async with Client(cluster, asynchronous=True) as client
# 创建一个简单的随机数组
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((40000, 40000), chunks='128 MiB').persist()
print(x.npartitions, '块')
await wait(x)
# 将 X 添加到其转置,强制计算
y = (x + x.T).sum()
result = await client.compute(y)
# 收集、聚合并打印点对点带宽
incoming_logs = await client.run(lambda dask_worker: dask_worker.incoming_transfer_log)
bandwidths = defaultdict(list)
for k, L in incoming_logs.items()
for d in L
if d['total'] > 1_000_000
bandwidths[k, d['who']].append(d['bandwidth'])
bandwidths = {
(cluster.scheduler.workers[w1].name,
cluster.scheduler.workers[w2].name): [format_bytes(x) + '/s' for x in np.quantile(v, [0.25, 0.50, 0.75])]
for (w1, w2), v in bandwidths.items()
}
pprint(bandwidths)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())
注意:此示例的大部分只是获取诊断信息,可以轻松忽略。另外,如果您愿意,可以删除 async/await 代码。我认为世界上应该有更多使用 async/await 语法的 Dask 示例,所以我决定保留它。