提交新活动

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

提交新闻专题

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

订阅新闻通讯

谢谢!您的提交已收到!
糟糕!提交表单时出错了。
2019年6月9日

使用 UCX 和 DGX 进行高性能网络实验

作者

这篇文章是关于实验性且快速变化的软件的。本文中的代码示例不应被依赖于将来仍然有效。

执行摘要

这篇文章讨论了如何将高性能网络库 UCX 连接到并行 Python 库 Dask,以加速通信密集型工作负载,特别是在使用 GPU 时。

此外,我们在 DGX 上完成了这项工作,DGX 是一种高端多 CPU 多 GPU 机器,具有复杂的内部网络。在这种环境中工作有助于改进 Dask 在异构环境中的设置,使其能够针对不同的网卡、CPU 插槽、GPU 等进行优化。

动机

许多分布式计算工作负载受限于通信。这在以下情况下很常见:

  1. Dataframe 连接
  2. 机器学习算法
  3. 复杂的数组计算

随着我们加速计算,例如使用 GPU 进行计算时,通信成为更大的瓶颈。

历史上,高性能通信只能使用 MPI 或定制解决方案实现。本文描述了一项努力,旨在在保持 Dask 这种动态系统的易编程性和可访问性的同时,接近 MPI 的通信带宽。

UCX、Python 和 Dask

为了在 Dask 中获得高性能网络,我们使用 Python 对 UCX 进行了封装,然后将其连接到 Dask。

OpenUCX 项目为各种高性能网络库(如 InfiniBand、传统网络协议如 TCP/共享内存以及 GPU 特定协议如 NVLink)提供了统一的 API。它是位于 OpenMPI(目前 OpenUCX 的主要用户)等系统之下的一层,用于确定要使用的网络系统。

今天的 Python 用户除了通过有时不理想的 MPI 之外,很难访问这些网络库。(尝试在 PyPI 上搜索“infiniband”)。

这促使我们创建了UCX-Py。UCX-Py 是 UCX C 库的 Python 封装,提供了一种 Pythonic API,既有适合传统 HPC 程序的阻塞语法,也有适合更多并发程序(如 Dask)的非阻塞 async/await 语法。有关 UCX 的更多信息,我建议观看 Akshay 在 2019 年 GPU Technology Conference 上的UCX 演讲

注意:UCX-Py 主要由 Akshay Venkatesh (UCX, NVIDIA)、Tom Augspurger (Dask, Pandas, Anaconda) 和 Ben Zaitlen (NVIDIA, RAPIDS, Dask) 开发。

然后,我们扩展了 Dask 的通信以可选地使用 UCX。如果您安装了 UCX 和 UCX-Py,那么您可以在地址中使用 ucx:// 协议,或者在启动时使用 --protocol ucx 标志,如下所示。

$ dask-scheduler --protocol ucx
Scheduler started at ucx://127.0.0.1:8786

$ dask-worker ucx://127.0.0.1:8786

>>> from dask.distributed import Client
>>> client = Client('ucx://127.0.0.1:8786')

实验

我们修改了使用 Dask 和 CuPy 的SVD 基准测试,使其使用 UCX 协议进行进程间通信,并在半个 DGX 机器上运行,使用了四个 GPU。以下是启用 UCX 的代码的最小实现:

import cupy
import dask
import dask.array
from dask.distributed import Client, wait
from dask_cuda import DGX

# 定义 DGX 集群和客户端
cluster = DGX(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3])
client = Client(cluster)

# 创建随机数据
rs = dask.array.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((1000000, 1000), chunks=(10000, 1000))
x = x.persist()

# 执行分布式 SVD
u, s, v = dask.array.linalg.svd(x)
u, s, v = dask.persist(u, s, v)
_ = wait([u, s, v])

通过使用 UCX,整体通信时间减少了一个数量级。为了生成下面的任务流图,基准测试在 CUDA_VISIBLE_DEVICES=[0,1,2,3] 的 DGX-1 上运行。很明显,代表进程间通信的红色任务条被显著压缩了。以前需要 500 毫秒到 1 秒的通信现在只需要大约 20 毫秒。

使用 UCX 之前:

使用 UCX 之后:

深入细节

在使用 NVLink 的 GPU 上,一对 GPU 之间的吞吐量可以达到 5-10 GB/秒。在 CPU 上,这会下降到 1-2 GB/秒(这似乎远低于最优)。这些速度会影响所有 Dask 工作负载(数组、dataframe、xarray、ML 等),但当存在适当的硬件时,可能会出现其他瓶颈,例如处理文本或类似 JSON 数据时的序列化。

当然,这取决于是否存在这种高级网络硬件。在上面的 GPU 示例中,我们主要依赖 NVLink,但我们在 HPC InfiniBand 网络上,甚至在单台笔记本电脑上使用共享内存传输时,也能获得改进的性能。

上面的示例在 DGX 机器上运行,该机器包括所有这些传输方式以及更多(以及众多 GPU)。

DGX

上述使用的测试机器是DGX-1,它有八个 GPU、两个 CPU 插槽、四个 Infiniband 网卡以及复杂的 NVLink 配置。这是非均匀硬件的一个很好的例子。某些 CPU 更接近某些 GPU 和网卡,了解这种接近性对性能有着数量级的影响。这种情况并非 DGX 机器独有。当出现以下情况时,也会发生同样的情况:

  • 一个节点中有多个 worker,一个集群中有多个节点
  • 一个机架中有多个节点,一个数据中心中有多个机架
  • 多个数据中心,例如混合云的情况

使用 DGX 进行工作很有趣,因为它迫使我们开始思考异构性,并使使用 Dask 指定复杂的部署场景变得更容易。

这是显示 DGX-1 中 GPU、CPU 和 Infiniband 卡之间如何相互连接的图示:

这是 nvidia-smi 的输出,显示了 NVLink、网络和 CPU 亲和性结构(这与上面显示的结构基本正交)。

$ nvidia-smi topo -m
GPU0 GPU1 GPU2 GPU3 GPU4 GPU5 GPU6 GPU7 ib0 ib1 ib2 ib3
GPU0 X NV1 NV1 NV2 NV2 SYS SYS SYS PIX SYS PHB SYS
GPU1 NV1 X NV2 NV1 SYS NV2 SYS SYS PIX SYS PHB SYS
GPU2 NV1 NV2 X NV2 SYS SYS NV1 SYS PHB SYS PIX SYS
GPU3 NV2 NV1 NV2 X SYS SYS SYS NV1 PHB SYS PIX SYS
GPU4 NV2 SYS SYS SYS X NV1 NV1 NV2 SYS PIX SYS PHB
GPU5 SYS NV2 SYS SYS NV1 X NV2 NV1 SYS PIX SYS PHB
GPU6 SYS SYS NV1 SYS NV1 NV2 X NV2 SYS PHB SYS PIX
GPU7 SYS SYS SYS NV1 NV2 NV1 NV2 X SYS PHB SYS PIX
ib0 PIX PIX PHB PHB SYS SYS SYS SYS X SYS PHB SYS
ib1 SYS SYS SYS SYS PIX PIX PHB PHB SYS X SYS PHB
ib2 PHB PHB PIX PIX SYS SYS SYS SYS PHB SYS X SYS
ib3 SYS SYS SYS SYS PHB PHB PIX PIX SYS PHB SYS X

CPU 亲和性
GPU0 0-19,40-59
GPU1 0-19,40-59
GPU2 0-19,40-59
GPU3 0-19,40-59
GPU4 20-39,60-79
GPU5 20-39,60-79
GPU6 20-39,60-79
GPU7 20-39,60-79

图例

X = 自身
SYS = 遍历 PCIe 以及 NUMA 节点之间的 SMP 互连
NODE = 遍历 PCIe 以及 PCIe 主桥之间的互连
PHB = 遍历 PCIe 以及 PCIe 主桥(通常是 CPU)
PXB = 遍历多个 PCIe 交换机(不经过 PCIe 主桥)
PIX = 遍历单个 PCIe 交换机
NV# = 遍历 # 个绑定的 NVLink 集合

DGX 最初是为深度学习应用设计的。上述复杂的网络基础设施可以被像NCCL 这样专业的 NVIDIA 网络库很好地利用,NCCL 知道如何正确路由数据,但这对于像 Dask 这样更通用的系统来说是适应的挑战。

幸运的是,在应对这一挑战时,我们能够清理 Dask 中的许多相关问题。特别是现在我们可以:

  1. 在启动本地集群时指定更异构的 worker 配置dask/distributed #2675
  2. 随时间学习带宽dask/distributed #2658
  3. 添加 Worker 插件以帮助处理 CPU 亲和性等问题(尽管这很通用)dask/distributed #2453

通过这些更改,我们现在可以在下面的 Python 函数中将大部分 DGX 结构描述为配置:

import os

from dask.distributed import Nanny, SpecCluster, Scheduler
from distributed.worker import TOTAL_MEMORY

from dask_cuda.local_cuda_cluster import cuda_visible_devices


class CPUAffinity
""" 用于固定 CPU 亲和性的 Worker 插件 """
def __init__(self, cores)
self.cores = cores

def setup(self, worker=None)
os.sched_setaffinity(0, self.cores)


affinity = { # 参见 nvidia-smi topo -m
0: list(range(0, 20)) + list(range(40, 60)),
1: list(range(0, 20)) + list(range(40, 60)),
2: list(range(0, 20)) + list(range(40, 60)),
3: list(range(0, 20)) + list(range(40, 60)),
4: list(range(20, 40)) + list(range(60, 79)),
5: list(range(20, 40)) + list(range(60, 79)),
6: list(range(20, 40)) + list(range(60, 79)),
7: list(range(20, 40)) + list(range(60, 79)),
}

def DGX(
interface="ib",
dashboard_address=":8787",
threads_per_worker=1,
silence_logs=True,
CUDA_VISIBLE_DEVICES=None,
**kwargs
):
""" 用于 DGX 1 机器的本地集群

NVIDIA 的 DGX-1 机器具有复杂的 CPU 映射架构,
GPU 和网络硬件。此函数创建一个本地集群,
尽量尊重此硬件配置。

它为每个 GPU 创建一个 Dask worker 进程,并为每个 worker
进程分配正确的 CPU 核心和网卡,
以最大限度地提高性能。

话虽如此,事情并非完美。今天的 DGX 在某些 GPU 集合之间具有非常高的
性能,而在其他 GPU 集合之间则不是。一个 Dask DGX
集群,如果仅使用计算机中某些紧密耦合的部分,
其带宽将显著高于在整个机器上的部署。
"""

参数
----------
interface: str
Infiniband 网卡的接口前缀。这通常是
"ib" 或 "bond"。我们将根据情况添加数字后缀 0,1,2,3。
默认值为 "ib"。
dashboard_address: str
调度器仪表盘的地址。默认值为 ":8787"。
CUDA_VISIBLE_DEVICES: str
字符串,例如 ``"0,1,2,3"`` 或 ``[0, 1, 2, 3]``,用于限制
活动到不同的 GPU。

示例
--------
>>> from dask_cuda import DGX
>>> from dask.distributed import Client
>>> cluster = DGX(interface='ib')
>>> client = Client(cluster)
"""
if CUDA_VISIBLE_DEVICES is None
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0,1,2,3,4,5,6,7")
if isinstance(CUDA_VISIBLE_DEVICES, str)
CUDA_VISIBLE_DEVICES = CUDA_VISIBLE_DEVICES.split(",")
CUDA_VISIBLE_DEVICES = list(map(int, CUDA_VISIBLE_DEVICES))
memory_limit = TOTAL_MEMORY / 8

spec = {
i: {
"cls": Nanny,
"options": {
"env": {
"CUDA_VISIBLE_DEVICES": cuda_visible_devices(
ii, CUDA_VISIBLE_DEVICES
),
"UCX_TLS": "rc,cuda_copy,cuda_ipc",
},
"interface": interface + str(i // 2),
"protocol": "ucx",
"ncores": threads_per_worker,
"data": dict,
"preload": ["dask_cuda.initialize_context"],
"dashboard_address": ":0",
"plugins": [CPUAffinity(affinity[i])],
"silence_logs": silence_logs,
"memory_limit": memory_limit,
},
}
for ii, i in enumerate(CUDA_VISIBLE_DEVICES)
}

scheduler = {
"cls": Scheduler,
"options": {
"interface": interface + str(CUDA_VISIBLE_DEVICES[0] // 2),
"protocol": "ucx",
"dashboard_address": dashboard_address,
},
}

return SpecCluster(
workers=spec,
scheduler=scheduler,
silence_logs=silence_logs,
**kwargs
)

然而,我们还没有完全解决 NVLink 结构的问题。Dask 调度器目前仍然假定 worker 之间具有均匀的带宽。我们已经开始朝着改变这一点迈出小步,但尚未完成(这对于考虑机架内或跨数据中心部署的人员也很有用)。

像往常一样,在解决一个高度特定的问题时,我们能够解决许多悬而未决的通用特性,这使得我们的特定问题变得易于实现。

未来工作

在过去的几个月里,我们付出了巨大的努力来使上述所有内容正常工作。特别是我们……

  • 修改 UCX 以支持客户端-服务器工作负载
  • 使用 UCX-Py 包装 UCX,并设计了一个 Python async-await 友好的接口
  • 使用 Dask 包装 UCX-Py
  • 将所有东西连接在一起,使通用工作负载能够良好运行

结果相当不错,特别是对于通信密集型工作负载。然而,还有很多工作要做。本节详细介绍了我们目前正在考虑如何继续这项工作。

  • 复杂网络中的路由:如果您将 DGX 的八个 GPU 限制为四个,则一对 GPU 之间可以获得 5-12 GB/秒的带宽。对于某些工作负载来说,这可能意义重大。它让系统感觉更像一个整体,而不是一堆独立的机器。
  • 然而,我们仍然无法在整个 DGX 上获得出色的性能,因为有许多 GPU 对没有通过 NVLink 连接,因此速度会慢 10 倍。如果您天真地尝试使用整个 DGX,这些通信成本会占据主导地位。
  • 这可以通过以下方式解决:
  • 教导 Dask 避免这些通信
  • 教导 UCX 通过多个 NVLink 连接链路由此类通信
  • 完全避免复杂的网络。像 DGX-2 这样的新系统使用 NVSwitch,它提供了统一的连接性,每个 GPU 都连接到所有其他 GPU。
  • 编辑:我后来了解到 UCX 应该能够处理这个问题。一旦上游 bug 得到修复,即使没有 NVLink,我们也应该能够获得 PCIe 速度(大约 4-7 GB/秒)。太棒了!
  • CPU:我们可以在 InfiniBand 上获得 1-2 GB/秒的带宽,这还不错,但也不是我们希望的 5-8 GB/秒。这值得更认真的分析,以确定出了什么问题。目前的猜测是这与内存分配有关。
  • In [1]: %time _ = b'0' * 1000000000 # 1 GB
    CPU times: user 248 ms, sys: 223 ms, total: 472 ms
    Wall time: 470 ms # <<----- 大约 2 GB/秒。比我预期的要慢
  • 可能我们在这里只是做了一些蠢事。
  • 打包 UCX:目前我正在从源代码构建 UCX 和 UCX-Py 库(参见下面的附录获取说明)。理想情况下,这些会成为 conda 包。John Kirkham (Conda Forge, NVIDIA, Dask) 正在与来自 Mellanox 的 UCX 开发人员一起研究这个问题。
  • 更多信息请参见ucx-py #65
  • 学习异构带宽:为了做出好的调度决策,Dask 需要估计在机器之间移动数据需要多长时间。这个问题现在变得更加复杂,并且取决于源机器和目标机器(网络拓扑)、数据类型(NumPy 数组、GPU 数组、带有文本的 Pandas Dataframe)等等。在复杂情况下,我们的带宽范围可能跨越 100 倍(100 MB/秒到 10 GB/秒)。
  • Dask 将必须开发更复杂的带宽模型,并随着时间学习这些模型。
  • 更多信息请参见dask/distributed#2743
  • 支持其他 GPU 库:为了在 GPU 之间发送数据,我们需要教导 Dask 如何将 Python 对象序列化到 GPU 缓冲区中。在 dask/distributed 仓库中有为 Numba、CuPy 和 RAPIDS cuDF 对象实现此功能的代码,但我们只对 CuPy 进行了认真的测试。我们应该通过以下一些步骤来扩展此功能:
  • 尝试分布式 Dask cuDF 连接计算
  • 有关初步工作,请参见dask/distributed #2746
  • 教导 Dask 序列化数组 GPU 库,例如 PyTorch 和 TensorFlow,或者任何支持 __cuda_array_interface__ 协议的对象。
  • 追查通信故障:我们仍然偶尔会遇到无法解释的通信故障。我们应该对该系统进行压力测试,以发现问题。
  • TCP:拥有高性能 TCP 网络的团队尚无法利用 UCX+Dask(尽管他们可以单独使用其中一个)。
  • 目前,像我们使用 Dask 那样在客户端-服务器模式下使用 UCX 需要访问 RDMA 库,而没有 InfiniBand 等网络系统的计算机通常没有这些库。这意味着拥有高性能 TCP 网络的团队无法利用 UCX+Dask。
  • 这项工作正在进行中,请参见openucx/ucx #3570
  • 商用硬件:目前,此代码仅在具有 InfiniBand 或 NVLink 的高性能 Linux 系统上真正有用。然而,如果在更普通的系统上使用它也会很好,包括使用 TCP 和共享内存的个人笔记本电脑。
  • 目前,Dask 在单台机器上使用 TCP 进行进程间通信。在个人计算机上使用 UCX 将使我们能够访问共享内存速度,其速度往往快一个数量级。
  • 更多信息请参见openucx/ucx #3663
  • 调优性能:我们目前使用 NVLink 看到的 5-10 GB/秒带宽并非最优。仅使用 UCX-Py,在大消息传输上我们可以获得大约 15 GB/秒的带宽。我们应该对我们的实现进行基准测试和调优,以查看额外时间花费在哪里。然而,在系统运行更稳定之前,这属于次要优先级。

附录:设置

执行这些实验目前依赖于几个仓库的开发分支。本节包括我当前的设置。

创建 Conda 环境

conda create -n ucx python=3.7 libtool cmake automake autoconf cython bokeh pytest pkg-config ipython dask numba -y

注意:由于某种原因,使用 conda-forge 会导致下面的 autogen 步骤失败。

设置 UCX

# 克隆 UCX 仓库并获取分支
git clone https://github.com/openucx/ucx
cd ucx
git remote add Akshay-Venkatesh git@github.com:Akshay-Venkatesh/ucx.git
git remote update Akshay-Venkatesh
git checkout ucx-cuda

# 构建
git clean -xfd
export CUDA_HOME=/usr/local/cuda-9.2/
./autogen.sh
mkdir build
cd build
../configure --prefix=$CONDA_PREFIX --enable-debug --with-cuda=$CUDA_HOME --enable-mt --disable-cma CPPFLAGS="-I//usr/local/cuda-9.2/include"
make -j install

# 验证
ucx_info -d
which ucx_info # 验证这是否在 conda 环境中

# 验证是否看到 NVLink 速度
ucx_perftest -t tag_bw -m cuda -s 1048576 -n 1000 & ucx_perftest dgx15 -t tag_bw -m cuda -s 1048576 -n 1000

设置 UCX-Py

git clone git@github.com:rapidsai/ucx-py
cd ucx-py

export UCX_PATH=$CONDA_PREFIX
make install

设置 Dask

git clone git@github.com:dask/dask.git
cd dask
pip install -e .
cd ..

git clone git@github.com:dask/distributed.git
cd distributed
pip install -e .
cd ..

可选地设置 cupy

pip install cupy-cuda92==6

可选地设置 cudf

conda install -c rapidsai-nightly -c conda-forge -c numba cudf dask-cudf cudatoolkit=9.2

可选地设置 JupyterLab

conda install ipykernel jupyterlab nb_conda_kernels nodejs

对于 Dask 仪表盘

pip install dask_labextension
jupyter labextension install dask-labextension

我的基准测试

我一直在使用以下基准测试来测试通信。它分配一个分块的 Dask 数组,然后将其与其转置相加,这会强制进行大量通信,但计算量不大。

from collections import defaultdict
import asyncio
import time
import numpy as np
from pprint import pprint
import cupy

import dask.array as da
from dask.distributed import Client, wait
from distributed.utils import format_time, format_bytes

async def f()

# 在本地机器上设置 worker
async with DGX(asynchronous=True, silence_logs=True) as cluster
async with Client(cluster, asynchronous=True) as client

# 创建一个简单的随机数组
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((40000, 40000), chunks='128 MiB').persist()
print(x.npartitions, '块')
await wait(x)

# 将 X 添加到其转置,强制计算
y = (x + x.T).sum()
result = await client.compute(y)

# 收集、聚合并打印点对点带宽
incoming_logs = await client.run(lambda dask_worker: dask_worker.incoming_transfer_log)
bandwidths = defaultdict(list)
for k, L in incoming_logs.items()
for d in L
if d['total'] > 1_000_000
bandwidths[k, d['who']].append(d['bandwidth'])
bandwidths = {
(cluster.scheduler.workers[w1].name,
cluster.scheduler.workers[w2].name): [format_bytes(x) + '/s' for x in np.quantile(v, [0.25, 0.50, 0.75])]
for (w1, w2), v in bandwidths.items()
}
pprint(bandwidths)


if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(f())

注意:此示例的大部分只是获取诊断信息,可以轻松忽略。另外,如果您愿意,可以删除 async/await 代码。我认为世界上应该有更多使用 async/await 语法的 Dask 示例,所以我决定保留它。