Dask 在分析并行计算方面非常通用,但要将其应用于更广泛的领域,仍有一个问题需要解决:允许它透明地与类似 NumPy 的库协同工作。我们之前讨论过如何使用GPU Dask 数组,但这仅限于数组成员方法共享类似 NumPy 的接口的范围,例如 .sum() 方法,因此,仍然无法调用 NumPy 库中的通用功能。NumPy 最近在NEP-18 中通过引入 __array_function__ 协议解决了这个问题。简而言之,该协议允许 NumPy 函数调用根据作为输入给定的数组类型,分派合适的类似 NumPy 的库实现,从而允许 Dask 保持对这些库的无感知,内部只调用 NumPy 函数,NumPy 函数会自动处理合适库实现的调用,例如 CuPy 或 Sparse。
为了理解这一改变的最终目标是什么,请考虑以下示例
import numpy as np
import dask.array as da
x = np.random.random((5000, 1000))
d = da.from_array(x, chunks=(1000, 1000))
u, s, v = np.linalg.svd(d)
现在,考虑我们想加速 Dask 数组的 SVD 计算并将其工作卸载到支持 CUDA 的 GPU 上,我们最终希望简单地用 CuPy 数组替换 NumPy 数组 x,并通过 __array_function__ 协议让 NumPy 发挥其魔力,在底层分派合适的 CuPy 线性代数操作
import numpy as np
import cupy
import dask.array as da
x = cupy.random.random((5000, 1000))
d = da.from_array(x, chunks=(1000, 1000))
u, s, v = np.linalg.svd(d)
我们可以对 Sparse 数组或任何其他支持 __array_function__ 协议和我们尝试执行的计算的类似 NumPy 的数组进行同样的操作。在下一节中,我们将探讨该协议有助于利用的潜在性能优势。
请注意,本文描述的功能仍在实验阶段,部分仍在开发和评审中。有关 __array_function__ 实际进展的更详细讨论,请参阅问题部分。
在进一步讨论之前,假设本文描述的所有性能结果都使用了以下硬件
现在我们来看一个示例,了解在使用 CuPy 作为后端时,__array_function__ 协议与 Dask 配合使用的潜在性能优势。首先创建所有我们稍后将用于计算 SVD 的数组。请注意,我此处关注的是 Dask 如何利用计算性能,因此在此示例中忽略了在 CPU 和 GPU 之间创建或复制数组所花费的时间。
import numpy as np
import cupy
import dask.array as da
x = np.random.random((10000, 1000))
y = cupy.array(x)
dx = da.from_array(x, chunks=(5000, 1000))
dy = da.from_array(y, chunks=(5000, 1000), asarray=False)
如上所示,我们有四个数组
然后我们可以开始使用 NumPy 计算 x 的 SVD,因此它在 CPU 上以单线程处理
u, s, v = np.linalg.svd(x)
我之后获得的计时信息如下所示
CPU times: user 3min 10s, sys: 347 ms, total: 3min 11s
Wall time: 3min 11s
超过 3 分钟似乎有点太慢了,所以现在问题是:我们能做得更好吗,更重要的是,无需更改我们整个代码的情况下?
这个问题的答案是:是的,可以。
现在我们看看其他结果。
首先,这是在引入 __array_function__ 协议之前您必须做的事情
u, s, v = da.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)
上述代码对于一些项目来说可能非常难以接受,因为除了传递正确的数组外,还需要调用合适的库分派器。换句话说,需要找到代码中所有的 NumPy 调用,并根据输入数组类型,将其替换为正确的库函数调用。在有了 __array_function__ 之后,可以使用 Dask 数组 dx 作为输入,调用同一个 NumPy 函数
u, s, v = np.linalg.svd(dx)
u, s, v = dask.compute(u, s, v)
注意:Dask 将计算结果推迟到使用时才执行,因此我们需要对结果数组调用 dask.compute() 函数来实际计算它们。
现在我们来看看计时信息
CPU times: user 1min 23s, sys: 460 ms, total: 1min 23s
Wall time: 1min 13s
现在,除了将 NumPy 数组封装为 Dask 数组外,无需更改任何代码,我们就可以看到 2 倍的加速。还不错。但让我们回到之前的问题:我们能做得更好吗?
我们现在可以像对 Dask 数组一样,简单地在 CuPy 数组 y 上调用 NumPy 的 SVD 函数
u, s, v = np.linalg.svd(y)
我们现在获得的计时信息如下
CPU times: user 17.3 s, sys: 1.81 s, total: 19.1 s
Wall time: 19.1 s
现在我们看到了 4-5 倍的加速,而内部调用没有任何变化!这正是我们期望利用 __array_function__ 协议带来的好处,免费加速现有代码!
让我们最后一次回到最初的问题:我们能做得更好吗?
我们现在可以利用 Dask 数据分块处理的优势以及 CuPy 的 GPU 实现,尽量保持 GPU 繁忙,这仍然像以下代码一样简单
u, s, v = np.linalg.svd(dy)
u, s, v = dask.compute(u, s, v)
由此我们获得了以下计时
CPU times: user 8.97 s, sys: 653 ms, total: 9.62 s
Wall time: 9.45 s
这比单线程 CuPy SVD 计算又带来了 2 倍的加速。
总而言之,我们从超过 3 分钟开始,现在只需在不同的数组上分派工作,就能缩短到不到 10 秒。
现在我们将讨论一下 __array_function__ 协议的潜在应用。为此,我们将讨论用于在大数据集上拟合广义线性模型的 Dask-GLM 库。它构建在 Dask 之上,并提供与 scikit-learn 兼容的 API。
在引入 __array_function__ 协议之前,对于每个希望用作后端的类似 NumPy 的库,我们都需要重写其大部分内部实现,因此,我们需要为 Dask 专门实现一套,为 CuPy 再实现一套,为 Sparse 又实现一套。现在,由于这些库通过兼容接口共享功能,我们根本无需更改实现,只需将不同的数组类型作为输入传递即可,就是这么简单。
为了演示我们获得的能力,请考虑以下 scikit-learn 示例(基于此处的原始示例)
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
N = 1000
# x 从 0 到 N
x = N * np.random.random((40000, 1))
# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)
# 创建一个线性回归模型
est = LinearRegression()
然后我们可以拟合模型,
est.fit(x, y)
并获取其时间测量结果
CPU times: user 3.4 ms, sys: 0 ns, total: 3.4 ms
Wall time: 2.3 ms
然后我们可以用它对一些测试数据进行预测,
# 从数据预测 y
x_new = np.linspace(0, N, 100)
y_new = est.predict(x_new[:, np.newaxis])
并检查其时间测量结果
CPU times: user 1.16 ms, sys: 680 µs, total: 1.84 ms
Wall time: 1.58 ms
最后绘制结果
# 绘制结果
plt.figure(figsize=(4, 3))
ax = plt.axes()
ax.scatter(x, y, linewidth=3)
ax.plot(x_new, y_new, color='black')
ax.set_facecolor((1.0, 1.0, 0.42))
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.axis('tight')
plt.show()
我们只需更改之前的代码的第一个块,即导入库和创建数组的部分
import numpy as np
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt
N = 1000
# x 从 0 到 N
x = N * np.random.random((40000, 1))
# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + np.random.normal(size=x.shape)
# 创建一个线性回归模型
est = LinearRegression(solver='lbfgs')
其余代码以及绘图与之前的 scikit-learn 示例相似,因此此处省略以求简洁。另请注意,我们可以调用 LinearRegression() 而不带任何参数,但在此示例中,我们选择了收敛速度相当快的 lbfgs 求解器。
我们还可以看看使用 Dask-GLM 进行拟合的计时结果,然后是预测的计时结果
# 拟合
CPU times: user 9.66 ms, sys: 116 µs, total: 9.78 ms
Wall time: 8.94 ms
# 预测
CPU times: user 130 µs, sys: 327 µs, total: 457 µs
Wall time: 1.06 ms
如果我们要改用 CuPy 进行计算,只需更改 3 行代码:导入 cupy 而不是 numpy,以及创建随机数组的两行代码,将它们替换为 cupy.random 而不是 np.random。该代码块应如下所示
import cupy
from dask_glm.estimators import LinearRegression
import matplotlib.pyplot as plt
N = 1000
# x 从 0 到 N
x = N * cupy.random.random((40000, 1))
# y = a*x + b 加噪声
y = 0.5 * x + 1.0 + cupy.random.normal(size=x.shape)
# 创建一个线性回归模型
est = LinearRegression(solver='lbfgs')
在此情景下我们获得的计时结果是
# 拟合
CPU times: user 151 ms, sys: 40.7 ms, total: 191 ms
Wall time: 190 ms
# 预测
CPU times: user 1.91 ms, sys: 778 µs, total: 2.69 ms
Wall time: 1.37 ms
对于本文选择的简单示例,scikit-learn 在使用 NumPy 和 CuPy 数组时都优于 Dask-GLM。造成这种情况的原因可能有很多,虽然我们没有深入探究具体原因及其程度,但可以列举一些可能的因素
为了验证使用 NumPy 数组的 Dask-GLM 与 CuPy 数组的对比情况,我们还对 Dask-GLM 求解器进行了一些逻辑回归基准测试。以下结果来自包含 100 维的 102、103、...、106 个特征的训练数据集,以及数量匹配的测试特征。
注意:我们有意省略了 Dask 数组的结果,因为我们发现了一个可能导致 Dask 数组无法收敛的潜在错误。
从上面图表中观察到的结果可以看出,对于使用任何 Dask-GLM 求解器的拟合,CuPy 可以比 NumPy 快一个数量级。另请注意,两个轴都采用对数刻度以便于可视化。
另一个有趣的现象是,对于少量样本,收敛可能需要更长时间。然而,正如我们通常所希望的,收敛所需的计算时间与样本数量呈线性关系。
如上所示,使用 CuPy 进行预测可以比 NumPy 快很多倍,对于所有求解器基本保持不变,并且快约 3-4 个数量级。
本节介绍自 2019 年 2 月以来为实现前面部分描述的功能所做的工作以及仍在进行的工作。如果您对所有细节不感兴趣,可以完全跳过本节。
自 2019 年 2 月初以来,在不同项目中对 __array_function__ 协议提供更深入的支持方面取得了实质性进展,这一趋势仍在继续并将持续到 3 月。下面列出了已修复或正在评审中的问题
当前,我们正在解决的最大问题之一与在 CuPy 数组上调用 Dask 的 diag() 时首次发现的Dask issue #4490 有关。这需要修改 Dask 数组类,并随后修改 Dask 代码库的大部分。我在此不会深入介绍细节,但我们处理此问题的方式是向 Dask 数组添加一个新的属性 _meta,替换当前简单的 dtype。这个新属性不仅包含 dtype 信息,还包含用于最初创建数组的后端类型的空数组,从而使我们能够在内部重建后端类型的数组,而无需明确知道它是 NumPy、CuPy、Sparse 还是任何其他类似 NumPy 的数组。更多详情,请参阅 Dask Issue #2977。
我们还发现了一些正在讨论中的问题
有几种可能性可以丰富 Dask 的使用体验,其中一些在短期和中期可能非常有趣,包括