提交新活动

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。

提交新闻稿

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。

订阅新闻通讯

谢谢!您的提交已收到!
糟糕!提交表单时出现问题。
2019年1月3日

GPU Dask 数组,初步探索

作者

以下代码创建并操作了 2 TB 的随机生成数据。

import dask.array as da

rs = da.random.RandomState()
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))
(x + 1)[::2, ::2].sum().compute(scheduler='threads')

在单个 CPU 上,此计算耗时两小时。

在一个八块 GPU 的单节点系统上,此计算耗时十九秒。

将 Dask 数组与 CuPy 结合

实际上,这个计算并不算令人印象深刻。这是一个简单的工作负载,大部分时间都花费在创建和销毁随机数据上。计算和通信模式也很简单,反映了数据处理工作负载中常见的简单性。

真正令人印象深刻的是,我们通过组合这四个现有的库,快速创建了一个分布式并行 GPU 数组:

  1. CuPy 在 GPU 上提供了 Numpy 的部分实现。
  2. Dask Array 在 Numpy 和 CuPy 等类 Numpy 库之上提供了分块算法。
  3. 这使得我们能够通过对数据进行分块操作来处理比内存容量更大的数据。
  4. Dask distributed 任务调度程序并行运行这些算法,轻松协调多个 CPU 核心上的工作。
  5. Dask CUDA 用于扩展 Dask distributed,增加 GPU 支持。

这些工具已经存在。我们只需要用少量“胶水代码”和微小修改将它们连接起来。通过将这些工具组合使用,我们可以快速构建并切换不同的架构,以探索最适合我们应用的方案。

对于这个例子,我们依赖于上游的以下更改:

单/多 CPU/GPU 性能对比

现在我们可以轻松地在不同架构上运行一些实验。这很容易,因为……

  • 我们可以通过切换使用 Numpy 和 CuPy 来在 CPU 和 GPU 之间切换。
  • 我们可以通过切换 Dask 的不同任务调度程序来在单/多 CPU 核心和单/多 GPU 之间切换。

这些库使我们能够快速判断以下硬件选择下的计算成本:

  1. 单线程 CPU
  2. 40 核多线程 CPU (80 超线程)
  3. 单块 GPU
  4. 单机多块 GPU (8 块 GPU)

下面我们将给出这四种选择的代码,但首先,我们先展示一个结果表。

结果

架构 时间 单核 CPU 2小时 39分钟 四十核 CPU 11分钟 30秒 单块 GPU 1分钟 37秒 八块 GPU 19秒

设置

import cupy
import dask.array as da

# 生成由大量 numpy 随机数组组成的分块 dask 数组
rs = da.random.RandomState()
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))

print(x.nbytes / 1e9) # 2 TB
# 2000.0

CPU 计时

(x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')
(x + 1)[::2, ::2].sum().compute(scheduler='threads')

单块 GPU 计时

我们通过更改数据源以生成 CuPy 数组而非 NumPy 数组来从 CPU 切换到 GPU。其他一切基本都能正常工作,无需为 CuPy 进行特殊处理。

(实际上这还不完全正确,dask.array 中的许多功能对于非 NumPy 数组会中断,但我们正在 Dask、NumPy 和 GPU 数组库内部积极解决这个问题。不过,这个例子中的所有内容都能正常工作。)

# 生成由大量 cupy 随机数组组成的分块 dask 数组
rs = da.random.RandomState(RandomState=cupy.random.RandomState) # <-- 这里我们指定了 cupy
x = rs.normal(10, 1, size=(500000, 500000), chunks=(10000, 10000))

(x + 1)[::2, ::2].sum().compute(scheduler='single-threaded')

多块 GPU 计时

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

(x + 1)[::2, ::2].sum().compute()

再次,这是结果:

架构 时间 单核 CPU 2小时 39分钟 四十核 CPU 11分钟 30秒 单块 GPU 1分钟 37秒 八块 GPU 19秒

首先,这是我第一次接触 40 核系统。我很惊讶能看到这么多核心。我也很高兴看到 Dask 的普通线程调度程序能够愉快地充分利用这么多核心。

虽然之后它确实下降到 5000-6000% 左右,如果你计算一下,你会发现我们并没有获得 40 倍的加速。我的猜测是,如果我们尝试线程和进程的混合使用,例如使用十个进程,每个进程八个线程,性能可能会有所提升。

然而,从最大的多核 CPU 到单个 GPU 的飞跃仍然是一个数量级的提升。而跃升到多 GPU 又是一个数量级的提升,将计算时间缩短到 19 秒,这已经足够短,我可以愿意等待它完成而不用离开我的电脑。

实际上,在仪表板上查看进程非常有趣(尤其是在你已经为顺序解决方案等待了三个小时之后)

结论

这个计算很简单,但我们探索的架构范围却很广泛。我们切换了底层架构,从 CPU 到 GPU(后者有完全不同的代码库),并尝试了多核 CPU 并行和多 GPU 多核并行。

我们在不到二十行代码内完成了这一切,使得这个实验成为本科生或其他新手在家也能进行的尝试。我们正接近这样一个节点:非专家也能轻松地进行多 GPU 系统的实验(至少对于数组计算而言)。

上述实验的 Notebook 在这里:

改进空间

我们可以朝着各种方向扩展上述计算。要使其稳定可靠,我们还有大量工作要做。

  1. 使用更复杂的数组计算工作负载
  2. Dask Array 算法最初是围绕 Numpy 设计的。我们最近才开始使其更通用,以支持其他类型的数组(如 GPU 数组、稀疏数组等)。因此,在探索这些非 Numpy 工作负载时,仍然存在许多 Bug。
  3. 例如,如果你在上面的计算中将 sum 换成 mean,你会得到一个错误,因为我们的 mean 计算包含一个容易修复的错误,它严格假设使用 Numpy 数组。
  4. 使用 Pandas 和 cuDF 替代 Numpy 和 CuPy
  5. cuDF 库旨在在 GPU 上重新实现 Pandas API,就像 CuPy 重新实现 NumPy API 一样。将 Dask DataFrame 与 cuDF 结合使用需要双方做一些工作,但这是完全可行的。
  6. 我相信这里有很多“低垂的果实”(容易解决的问题)。
  7. 改进并迁移 LocalCUDACluster
  8. 上面使用的 LocalCUDACluster 类是一种实验性的集群类型,它在本地创建与您拥有的 GPU 数量相同的工作进程,并让每个工作进程优先使用不同的 GPU。这使得用户无需过多考虑即可轻松在单节点系统上的 GPU 之间进行负载均衡。这似乎是当前生态系统中一个常见的痛点。
  9. 然而,LocalCUDACluster 可能不应该位于 dask/distributed 仓库中(它看起来过于特定于 CUDA),因此可能会迁移到某个 dask-cuda 仓库。此外,关于如何在 GPU 之上处理并发、平衡 CPU 核心和 GPU 核心等问题仍有很多。
  10. 多节点计算
  11. 没有理由我们不能通过使用多个多 GPU 节点来进一步加速此类计算。手动设置今天就可以实现,但我们也应该改进现有的部署解决方案,例如 dask-kubernetesdask-yarndask-jobqueue,以便非专业人士更容易使用多 GPU 资源集群。
  12. 成本
  13. 我运行这个实验的机器很昂贵。当然,与获得这些结果所需的传统集群相比,它的拥有和运营成本远没有那么高,但对于业余爱好者或学生来说,它的价格仍然远远超出。
  14. 在更经济实惠的系统上运行此实验,以了解价格更合理的系统上的权衡取舍将会很有帮助。我可能还需要学习更多关于在云端配置 GPU 的知识。

来帮忙吧!

如果上面的工作听起来让你感兴趣,那就来帮忙吧!有很多容易入手且影响深远的工作可以做。

如果您有兴趣通过专注于这些主题获得报酬,那么可以考虑申请一份工作。NVIDIA 公司正在招聘与 Dask 在 GPU 上的使用相关的职位。

那是一个相当通用的招聘信息。如果您感兴趣,即使招聘信息看起来不太符合,也请申请,我们会进行调整。