提交新活动

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。

提交新闻报道

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。

订阅新闻邮件

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。
Mar 18, 2019

Dask 与 __array_function__ 协议

作者

摘要

Dask 在分析并行计算方面非常通用,但要将其应用于更广泛的领域,仍有一个问题需要解决:允许它透明地与类似 NumPy 的库协同工作。我们之前讨论过如何使用GPU Dask 数组,但这仅限于数组成员方法共享类似 NumPy 的接口的范围,例如 .sum() 方法,因此,仍然无法调用 NumPy 库中的通用功能。NumPy 最近在NEP-18 中通过引入 __array_function__ 协议解决了这个问题。简而言之,该协议允许 NumPy 函数调用根据作为输入给定的数组类型,分派合适的类似 NumPy 的库实现,从而允许 Dask 保持对这些库的无感知,内部只调用 NumPy 函数,NumPy 函数会自动处理合适库实现的调用,例如 CuPySparse

为了理解这一改变的最终目标是什么,请考虑以下示例

import numpy as np
import dask.array as da

x = np.random.random((5000, 1000))

d = da.from_array(x, chunks=(1000, 1000))

u, s, v = np.linalg.svd(d)

现在,考虑我们想加速 Dask 数组的 SVD 计算并将其工作卸载到支持 CUDA 的 GPU 上,我们最终希望简单地用 CuPy 数组替换 NumPy 数组 x,并通过 __array_function__ 协议让 NumPy 发挥其魔力,在底层分派合适的 CuPy 线性代数操作

import numpy as np
import cupy
import dask.array as da

x = cupy.random.random((5000, 1000))

d = da.from_array(x, chunks=(1000, 1000))

u, s, v = np.linalg.svd(d)

我们可以对 Sparse 数组或任何其他支持 __array_function__ 协议和我们尝试执行的计算的类似 NumPy 的数组进行同样的操作。在下一节中,我们将探讨该协议有助于利用的潜在性能优势。

请注意,本文描述的功能仍在实验阶段,部分仍在开发和评审中。有关 __array_function__ 实际进展的更详细讨论,请参阅问题部分

性能

在进一步讨论之前,假设本文描述的所有性能结果都使用了以下硬件

  • CPU: 6 核 (12 线程) Intel Core i7-7800X @ 3.50GHz
  • 主内存: 16 GB
  • GPU: NVIDIA Quadro GV100
  • OpenBLAS 0.2.18
  • cuBLAS 9.2.174
  • cuSOLVER 9.2.148

现在我们来看一个示例,了解在使用 CuPy 作为后端时,__array_function__ 协议与 Dask 配合使用的潜在性能优势。首先创建所有我们稍后将用于计算 SVD 的数组。请注意,我此处关注的是 Dask 如何利用计算性能,因此在此示例中忽略了在 CPU 和 GPU 之间创建或复制数组所花费的时间。

import numpy as np
import cupy
import dask.array as da

x = np.random.random((10000, 1000))
y = cupy.array(x)

dx = da.from_array(x, chunks=(5000, 1000))
dy = da.from_array(y, chunks=(5000, 1000), asarray=False)

如上所示,我们有四个数组

  • x:主内存中的 NumPy 数组;
  • y:GPU 内存中的 CuPy 数组;
  • dx:封装在 Dask 数组中的 NumPy 数组;
  • dy:封装在 Dask 数组中的 CuPy 数组的副本;目前尚不支持将 CuPy 数组作为视图(asarray=True)封装在 Dask 数组中。

在 NumPy 数组上计算 SVD

然后我们可以开始使用 NumPy 计算 x 的 SVD,因此它在 CPU 上以单线程处理

u, s, v = np.linalg.svd(x)

我之后获得的计时信息如下所示

CPU times: user 3min 10s, sys: 347 ms, total: 3min 11s
Wall time: 3min 11s

超过 3 分钟似乎有点太慢了,所以现在问题是:我们能做得更好吗,更重要的是,无需更改我们整个代码的情况下?

这个问题的答案是:是的,可以。

现在我们看看其他结果。

在封装在 Dask 数组中的 NumPy 数组上计算 SVD

首先,这是在引入 __array_function__ 协议之前您必须做的事情

u, s, v = da.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)

上述代码对于一些项目来说可能非常难以接受,因为除了传递正确的数组外,还需要调用合适的库分派器。换句话说,需要找到代码中所有的 NumPy 调用,并根据输入数组类型,将其替换为正确的库函数调用。在有了 __array_function__ 之后,可以使用 Dask 数组 dx 作为输入,调用同一个 NumPy 函数

u, s, v = np.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)

注意:Dask 将计算结果推迟到使用时才执行,因此我们需要对结果数组调用 dask.compute() 函数来实际计算它们。

现在我们来看看计时信息

CPU times: user 1min 23s, sys: 460 ms, total: 1min 23s
Wall time: 1min 13s

现在,除了将 NumPy 数组封装为 Dask 数组外,无需更改任何代码,我们就可以看到 2 倍的加速。还不错。但让我们回到之前的问题:我们能做得更好吗?

在 CuPy 数组上计算 SVD

我们现在可以像对 Dask 数组一样,简单地在 CuPy 数组 y 上调用 NumPy 的 SVD 函数

u, s, v = np.linalg.svd(y)

我们现在获得的计时信息如下

CPU times: user 17.3 s, sys: 1.81 s, total: 19.1 s
Wall time: 19.1 s

现在我们看到了 4-5 倍的加速,而内部调用没有任何变化!这正是我们期望利用 __array_function__ 协议带来的好处,免费加速现有代码!

让我们最后一次回到最初的问题:我们能做得更好吗?

在封装在 Dask 数组中的 CuPy 数组上计算 SVD

我们现在可以利用 Dask 数据分块处理的优势以及 CuPy 的 GPU 实现,尽量保持 GPU 繁忙,这仍然像以下代码一样简单

u, s, v = np.linalg.svd(dy)
u, s, v = dask.compute(u, s, v)

由此我们获得了以下计时

CPU times: user 8.97 s, sys: 653 ms, total: 9.62 s
Wall time: 9.45 s

这比单线程 CuPy SVD 计算又带来了 2 倍的加速。

总而言之,我们从超过 3 分钟开始,现在只需在不同的数组上分派工作,就能缩短到不到 10 秒。

应用

现在我们将讨论一下 __array_function__ 协议的潜在应用。为此,我们将讨论用于在大数据集上拟合广义线性模型的 Dask-GLM 库。它构建在 Dask 之上,并提供与 scikit-learn 兼容的 API。

在引入 __array_function__ 协议之前,对于每个希望用作后端的类似 NumPy 的库,我们都需要重写其大部分内部实现,因此,我们需要为 Dask 专门实现一套,为 CuPy 再实现一套,为 Sparse 又实现一套。现在,由于这些库通过兼容接口共享功能,我们根本无需更改实现,只需将不同的数组类型作为输入传递即可,就是这么简单。

scikit-learn 示例

为了演示我们获得的能力,请考虑以下 scikit-learn 示例(基于此处的原始示例)

import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression

N = 1000

# x 从 0 到 N
x = N * np.random.random((40000, 1))

# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)

# 创建一个线性回归模型
est = LinearRegression()

然后我们可以拟合模型,

est.fit(x, y)

并获取其时间测量结果

CPU times: user 3.4 ms, sys: 0 ns, total: 3.4 ms
Wall time: 2.3 ms

然后我们可以用它对一些测试数据进行预测,

# 从数据预测 y
x_new = np.linspace(0, N, 100)
y_new = est.predict(x_new[:, np.newaxis])

并检查其时间测量结果

CPU times: user 1.16 ms, sys: 680 µs, total: 1.84 ms
Wall time: 1.58 ms

最后绘制结果

# 绘制结果
plt.figure(figsize=(4, 3))
ax = plt.axes()
ax.scatter(x, y, linewidth=3)
ax.plot(x_new, y_new, color='black')

ax.set_facecolor((1.0, 1.0, 0.42))

ax.set_xlabel('x')
ax.set_ylabel('y')

ax.axis('tight')

plt.show()

Dask-GLM 示例

我们只需更改之前的代码的第一个块,即导入库和创建数组的部分

import numpy as np
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt

N = 1000

# x 从 0 到 N
x = N * np.random.random((40000, 1))

# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)

# 创建一个线性回归模型
est = LinearRegression(solver='lbfgs')

其余代码以及绘图与之前的 scikit-learn 示例相似,因此此处省略以求简洁。另请注意,我们可以调用 LinearRegression() 而不带任何参数,但在此示例中,我们选择了收敛速度相当快的 lbfgs 求解器。

我们还可以看看使用 Dask-GLM 进行拟合的计时结果,然后是预测的计时结果

# 拟合
CPU times: user 9.66 ms, sys: 116 µs, total: 9.78 ms
Wall time: 8.94 ms

# 预测
CPU times: user 130 µs, sys: 327 µs, total: 457 µs
Wall time: 1.06 ms

如果我们要改用 CuPy 进行计算,只需更改 3 行代码:导入 cupy 而不是 numpy,以及创建随机数组的两行代码,将它们替换为 cupy.random 而不是 np.random。该代码块应如下所示

import cupy
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt

N = 1000

# x 从 0 到 N
x = N * cupy.random.random((40000, 1))

# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + cupy.random.normal(size=x.shape)

# 创建一个线性回归模型
est = LinearRegression(solver='lbfgs')

在此情景下我们获得的计时结果是

# 拟合
CPU times: user 151 ms, sys: 40.7 ms, total: 191 ms
Wall time: 190 ms

# 预测
CPU times: user 1.91 ms, sys: 778 µs, total: 2.69 ms
Wall time: 1.37 ms

对于本文选择的简单示例,scikit-learn 在使用 NumPy 和 CuPy 数组时都优于 Dask-GLM。造成这种情况的原因可能有很多,虽然我们没有深入探究具体原因及其程度,但可以列举一些可能的因素

  • scikit-learn 可能使用了收敛速度更快的求解器;
  • Dask-GLM 完全构建在 Dask 之上,而 scikit-learn 可能内部进行了大量优化;
  • 对于使用 CuPy 的小型数据集,可能会发生过多的同步步骤和数据传输。

不同 Dask-GLM 求解器的性能

为了验证使用 NumPy 数组的 Dask-GLM 与 CuPy 数组的对比情况,我们还对 Dask-GLM 求解器进行了一些逻辑回归基准测试。以下结果来自包含 100 维的 102、103、...、106 个特征的训练数据集,以及数量匹配的测试特征。

注意:我们有意省略了 Dask 数组的结果,因为我们发现了一个可能导致 Dask 数组无法收敛的潜在错误

从上面图表中观察到的结果可以看出,对于使用任何 Dask-GLM 求解器的拟合,CuPy 可以比 NumPy 快一个数量级。另请注意,两个轴都采用对数刻度以便于可视化。

另一个有趣的现象是,对于少量样本,收敛可能需要更长时间。然而,正如我们通常所希望的,收敛所需的计算时间与样本数量呈线性关系。

如上所示,使用 CuPy 进行预测可以比 NumPy 快很多倍,对于所有求解器基本保持不变,并且快约 3-4 个数量级。

问题

本节介绍自 2019 年 2 月以来为实现前面部分描述的功能所做的工作以及仍在进行的工作。如果您对所有细节不感兴趣,可以完全跳过本节。

已解决的问题

自 2019 年 2 月初以来,在不同项目中对 __array_function__ 协议提供更深入的支持方面取得了实质性进展,这一趋势仍在继续并将持续到 3 月。下面列出了已修复或正在评审中的问题

已知问题

当前,我们正在解决的最大问题之一与在 CuPy 数组上调用 Dask 的 diag() 时首次发现的Dask issue #4490 有关。这需要修改 Dask 数组类,并随后修改 Dask 代码库的大部分。我在此不会深入介绍细节,但我们处理此问题的方式是向 Dask 数组添加一个新的属性 _meta,替换当前简单的 dtype。这个新属性不仅包含 dtype 信息,还包含用于最初创建数组的后端类型的空数组,从而使我们能够在内部重建后端类型的数组,而无需明确知道它是 NumPy、CuPy、Sparse 还是任何其他类似 NumPy 的数组。更多详情,请参阅 Dask Issue #2977

我们还发现了一些正在讨论中的问题

未来工作

有几种可能性可以丰富 Dask 的使用体验,其中一些在短期和中期可能非常有趣,包括

  1. Dask-cuDF 与 Dask-GLM 一起使用,展示整个生态系统的有趣实际应用;
  2. Dask-GLM 更全面的示例和基准测试;
  3. Dask-GLM 支持更多模型
  4. 深入研究 Dask-GLM 与 scikit-learn 的性能对比;
  5. 分析 CuPy 矩阵乘法操作 (GEMM) 的性能,并与分布式 Dask 操作的矩阵向量乘法操作 (GEMV) 进行比较。