这项工作得到了 ETH Zurich、AnacondaInc 和 Berkeley Institute for DataScience 的支持。
在最近于 BIDS 举行的 Scikit-learn/Scikit-image/Dask sprint 活动中,Fabian Pedregosa(一位机器学习研究员和 Scikit-learn 开发者)与 Matthew Rocklin(Dask 核心开发者)坐在一起,共同开发了在并行 Dask 数据集上实现增量优化算法SAGA 的方法。其结果是一个可以在任何 Dask 数组上运行的顺序算法,因此允许将数据存储在磁盘上,甚至分布在不同的机器上。
了解该算法的性能以及在 Dask 分布式数据集上运行研究算法的易用性和挑战都很有趣。
我们从 Fabian 使用 Numba 为 Numpy 数组编写的初始实现开始。以下代码解决了形式如下的优化问题:
\[min_x \sum_{i=1}^n f(a_i^t x, b_i)\]
import numpy as np
from numba import njit
from sklearn.linear_model.sag import get_auto_step_size
from sklearn.utils.extmath import row_norms
@njit
def deriv_logistic(p, y)
# derivative of logistic loss
# same as in lightning (with minus sign)
p *= y
if p > 0
phi = 1. / (1 + np.exp(-p))
else
exp_t = np.exp(p)
phi = exp_t / (1. + exp_t)
return (phi - 1) * y
@njit
def SAGA(A, b, step_size, max_iter=100)
"""
SAGA algorithm
A : n_samples x n_features numpy array
b : n_samples numpy array with values -1 or 1
"""
n_samples, n_features = A.shape
memory_gradient = np.zeros(n_samples)
gradient_average = np.zeros(n_features)
x = np.zeros(n_features) # vector of coefficients
step_size = 0.3 * get_auto_step_size(row_norms(A, squared=True).max(), 0, 'log', False)
for _ in range(max_iter)
# sample randomly
idx = np.arange(memory_gradient.size)
np.random.shuffle(idx)
# .. inner iteration ..
for i in idx
grad_i = deriv_logistic(np.dot(x, A[i]), b[i])
# .. update coefficients ..
delta = (grad_i - memory_gradient[i]) * A[i]
x -= step_size * (delta + gradient_average)
# .. update memory terms ..
gradient_average += (grad_i - memory_gradient[i]) * A[i] / n_samples
memory_gradient[i] = grad_i
# monitor convergence
print('gradient norm:', np.linalg.norm(gradient_average))
return x
此实现是 Fabian 作为研究一部分经常使用的 SAGA 实现 的简化版本,它假设 $f$ 是 logistic 损失函数,即 $f(z) = \log(1 + \exp(-z))$。通过覆盖 deriv_logistic 函数,可以使用它来解决具有其他 $f$ 值的问题。
我们希望通过将其一次应用于 Dask 数组的每个块(一个较小的 Numpy 数组),并在过程中带入一组参数,从而在并行 Dask 数组上应用它。
为了更好地理解编写 Dask 算法的挑战,Fabian 一开始做了大部分实际编码工作。Fabian 是一个很好的例子,他是一位既懂编程又会设计机器学习算法的研究员,但没有直接接触过 Dask 库。这对 Fabian 和 Matt 来说都是一次学习机会。Fabian 学会了如何使用 Dask,而 Matt 学会了如何向像 Fabian 这样的研究员介绍 Dask。
一开始我们实际上根本没有使用 Dask,而是 Fabian 通过几种方式修改了他的实现:
这些修改要求对性能产生了一点影响,我们最终会复制更多的参数和中间状态。就编程难度而言,这花了一些时间(大约几个小时),但这是一项直接的任务,Fabian 似乎并不觉得具有挑战性或陌生。
这些更改产生了以下代码:
from numba import njit
from sklearn.utils.extmath import row_norms
from sklearn.linear_model.sag import get_auto_step_size
@njit
def _chunk_saga(A, b, n_samples, f_deriv, x, memory_gradient, gradient_average, step_size)
# Make explicit copies of inputs
x = x.copy()
gradient_average = gradient_average.copy()
memory_gradient = memory_gradient.copy()
# Sample randomly
idx = np.arange(memory_gradient.size)
np.random.shuffle(idx)
# .. inner iteration ..
for i in idx
grad_i = f_deriv(np.dot(x, A[i]), b[i])
# .. update coefficients ..
delta = (grad_i - memory_gradient[i]) * A[i]
x -= step_size * (delta + gradient_average)
# .. update memory terms ..
gradient_average += (grad_i - memory_gradient[i]) * A[i] / n_samples
memory_gradient[i] = grad_i
return x, memory_gradient, gradient_average
def full_saga(data, max_iter=100, callback=None)
"""
data: list of (A, b), where A is a n_samples x n_features
numpy array and b is a n_samples numpy array
"""
n_samples = 0
for A, b in data
n_samples += A.shape[0]
n_features = data[0][0].shape[1]
memory_gradients = [np.zeros(A.shape[0]) for (A, b) in data]
gradient_average = np.zeros(n_features)
x = np.zeros(n_features)
steps = [get_auto_step_size(row_norms(A, squared=True).max(), 0, 'log', False) for (A, b) in data]
step_size = 0.3 * np.min(steps)
for _ in range(max_iter)
for i, (A, b) in enumerate(data)
x, memory_gradients[i], gradient_average = _chunk_saga(
A, b, n_samples, deriv_logistic, x, memory_gradients[i],
gradient_average, step_size)
if callback is not None
print(callback(x, data))
return x
一旦函数既不修改其输入也不依赖于全局状态,我们查看了一个 dask.delayed 示例,然后将 @dask.delayed 装饰器应用到 Fabian 编写的函数上。Fabian 第一次做这个只花了大约五分钟,令我们都感到惊讶的是,它竟然奏效了
@dask.delayed(nout=3) # <<<---- New
@njit
def _chunk_saga(A, b, n_samples, f_deriv, x, memory_gradient, gradient_average, step_size)
...
def full_saga(data, max_iter=100, callback=None)
n_samples = 0
for A, b in data
n_samples += A.shape[0]
data = dask.persist(*data) # <<<---- New
...
for _ in range(max_iter)
for i, (A, b) in enumerate(data)
x, memory_gradients[i], gradient_average = _chunk_saga(
A, b, n_samples, deriv_logistic, x, memory_gradients[i],
gradient_average, step_size)
cb = dask.delayed(callback)(x, data) # <<<---- Changed
x, cb = dask.persist(x, cb) # <<<---- New
print(cb.compute()
然而,它们工作得并不是那么好。当我们查看 Dask 面板时,发现有很多空白区域,这表明我们仍然在客户端进行了大量计算。
虽然功能正常,但速度相当慢。如果您注意到上面的面板图,您会看到彩色矩形之间有大量空白区域。这表明在很长一段时间内,没有任何 worker 正在工作。
这是 worker(显示在面板上)和客户端之间混合工作的常见迹象。解决这个问题的方法通常是更有针对性地使用 dask.delayed。Dask delayed 很容易开始使用,但要用好它需要一些经验。区分哪些操作和变量是延迟的,哪些不是,这一点很重要。在它们之间混合使用会产生一些开销。
此时,Matt 介入并在更多位置添加了 delayed,面板图开始看起来更整洁了。
@dask.delayed(nout=3) # <<<---- New
@njit
def _chunk_saga(A, b, n_samples, f_deriv, x, memory_gradient, gradient_average, step_size)
...
def full_saga(data, max_iter=100, callback=None)
n_samples = 0
for A, b in data
n_samples += A.shape[0]
n_features = data[0][0].shape[1]
data = dask.persist(*data) # <<<---- New
memory_gradients = [dask.delayed(np.zeros)(A.shape[0])
for (A, b) in data] # <<<---- Changed
gradient_average = dask.delayed(np.zeros)(n_features) # Changed
x = dask.delayed(np.zeros)(n_features) # <<<---- Changed
steps = [dask.delayed(get_auto_step_size)(
dask.delayed(row_norms)(A, squared=True).max(),
0, 'log', False)
for (A, b) in data] # <<<---- Changed
step_size = 0.3 * dask.delayed(np.min)(steps) # <<<---- Changed
for _ in range(max_iter)
for i, (A, b) in enumerate(data)
x, memory_gradients[i], gradient_average = _chunk_saga(
A, b, n_samples, deriv_logistic, x, memory_gradients[i],
gradient_average, step_size)
cb = dask.delayed(callback)(x, data) # <<<---- Changed
x, memory_gradients, gradient_average, step_size, cb = \
dask.persist(x, memory_gradients, gradient_average, step_size, cb) # New
print(cb.compute()) # <<<---- changed
return x
从 Dask 的角度来看,现在看起来不错。我们看到任何给定时间只有一个 partial_fit 调用处于活动状态,partial_fit 调用之间没有大的水平间隔。我们没有获得任何并行性(这只是一个顺序算法),但也没有太多空白区域。模型似乎在不同的 worker 之间跳转,处理一个数据块后再转到新的数据。
上面的面板图像证明我们的算法运行正常。该算法的块顺序特性清晰呈现,任务之间的间隔非常短。
然而,当我们查看计算在所有核心上的性能分析图(Dask 在所有 worker 的所有线程上持续运行 profiler 以获取此信息)时,我们看到大部分时间都花费在编译 Numba 代码上。
我们为此在 Numba issue tracker 上发起了一个讨论,该问题现已解决。同一计算在相同时间内的性能分析图现在看起来像这样:
以前需要数秒的任务现在只需要数十毫秒,因此我们可以在相同时间内处理更多的数据块。
这是一次构建有趣算法的有益经验。上述大部分工作都是在一个下午完成的。通过这次活动,我们总结了以下几项自己的任务: