这项工作由 Anaconda Inc.和 Moore 基金会的数据驱动发现计划支持。
我很高兴宣布 Dask 0.16.0 版本发布。这是一个主要版本,包含新功能、重大更改和稳定性改进。这篇博文概述了自 9 月 24 日发布 0.15.3 版本以来值得注意的更改。
您可以通过 conda 安装 Dask
conda install dask
或通过 pip 从 PyPI 安装
pip install dask[complete] --upgrade
Conda 包可在 conda-forge 和 default 两个通道上获取。
完整的更改日志可在此处查看
以下是一些值得注意的更改。
现在使用 Dask collection 接口实现自定义集合更容易了。
Dask 集合(数组、数据框、包、延迟对象)通过一些内部方法与 Dask 调度器(单机、分布式)进行交互。我们将此接口规范化为 .__dask_graph__() 和 .__dask_keys__() 等协议,并发布了该接口。该接口。任何实现该文档中描述方法的对象都将作为一流的 Dask 对象与所有 Dask 调度器功能进行交互。
class MyDaskCollection(object)
def __dask_graph__(self)
...
def __dask_keys__(self)
...
def __dask_optimize__(self, ...)
...
...
该接口已在 XArray 项目中实现,用于带标签和索引的数组。现在,所有 XArray 类(DataSet、DataArray、Variable)都能被所有 Dask 调度器完全理解。它们与 dask.arrays 或 dask.dataframes 一样是一流的对象。
import xarray as xa
from dask.distributed import Client
client = Client()
ds = xa.open_mfdataset('*.nc', ...)
ds = client.persist(ds) # XArray 对象与 Dask 调度器无缝集成
Dask collection 接口的工作主要由 Jim Crist 完成。
Dask 构建于用于并发网络编程的 Tornado 库之上。为了改进在特殊硬件(Infiniband)上的工作节点间带宽,Dask 开发者正在提议修改 Tornado 的网络基础设施。
然而,为了使用这些更改,Dask 本身需要在开发中的下一版本 Tornado,即 Tornado 5.0.0 上运行,而 Tornado 5.0.0 打破了 Dask 依赖的许多接口。Dask 开发者一直在解决这些问题,我们鼓励其他 PyData 开发者也这样做。例如,Bokeh 和 Jupyter 都无法在 Tornado 5.0.0-dev 上运行。
Dask 工作节点间带宽在理论上可达到 3GB/s 的网络上峰值约为 1.5-2GB/s。GitHub issue: pangeo #6
网络性能和 Tornado 兼容性主要由 Antoine Pitrou 负责处理。
Dask.dataframe 可以使用 Python 中两个常见的 Parquet 库:Apache Arrow 和 Fastparquet。每个库都有其优势以及偏好它的用户群。我们显著扩展了 Dask 的 parquet 测试套件,以覆盖每个库,扩展了往返兼容性。值得注意的是,您现在可以使用 PyArrow 进行读写。
df.to_parquet('...', engine='fastparquet')
df = dd.read_parquet('...', engine='pyarrow')
这里仍有工作要做。市面上各种 Parquet 读写器和约定使完全解决这个问题变得困难。很高兴看到各个项目正逐渐趋向于通用功能。
这项工作由 Uwe Korn、Jim Crist 和 Martin Durant 共同完成。
Dask.distributed 调度器最受期待的功能之一是重试失败任务的能力。这对于将 Dask 用作任务队列而非大型数据框或数组的用户来说尤为有用。
future = client.submit(func, *args, retries=5)
任务重试功能主要由 Antoine Pitrou 构建。
Dask.distributed 任务调度器通过工作窃取进行负载均衡。以前这有时会导致同一个任务在两个位置同时运行。现在窃取是事务性的,这意味着它将避免意外地将同一个任务运行两次。这种行为对于使用 Dask 任务产生副作用的用户来说尤为重要。
同一个任务仍然可能运行两次,但现在这只发生在更极端的情况下,例如工作节点宕机或 TCP 连接断开时,这在标准硬件上都不常见。
事务性工作窃取主要由 Matthew Rocklin 实现。
仪表板的“信息”选项卡中提供了一组新的诊断网页。这些页面提供了关于每个工作节点和任务的更深入信息,但没有任何动态内容。它们使用 Tornado 模板而非 Bokeh 图形,这意味着它们的响应性较低,但构建起来容易得多。这是一种简单且廉价的方式来暴露更多的调度器状态。
现在在任务内部调用 .compute() 会调用同一个分布式调度器。这使得编写更复杂的工作负载时无需过多考虑启动工作节点客户端。
import dask
from dask.distributed import Client
client = Client() # 仅适用于较新的调度器
@dask.delayed
def f(x)
...
return dask.compute(...) # 可以在延迟任务中调用 dask.compute
dask.compute([f(i) for ...])
嵌套的 compute 调用主要由 Matthew Rocklin 和 Olivier Grisel 开发。
工作节点现在在内存压力大或释放数据时会明确调用 gc.collect()。这有助于避免一些内存泄漏,尤其是在使用 Pandas 数据框时。事实证明,仔细地做这一点需要惊人程度的细节。
改进的垃圾回收主要由 Fabian Keller 和 Olivier Grisel 实现和测试,并得到了 Antoine Pitrou 的建议。
各种 Dask 机器学习项目现在正在一个统一的仓库 dask-ml 下组装。我们鼓励用户和研究人员都阅读该项目。我们相信其中包含许多有用且有趣的方法。
组装和整理这些算法的工作主要由 Tom Augspurger 负责处理。
用于索引和带标签数组的 XArray 项目也于本周发布了其重要的 0.10.0 版本,其中包含许多性能改进,特别是在大型数据集上使用 Dask 时。
以下人员自 9 月 24 日发布 0.15.3 版本以来为 dask/dask 仓库做出了贡献
以下人员自 9 月 24 日发布 1.19.1 版本以来为 dask/distributed 仓库做出了贡献
以下人员为 dask/dask-ml 仓库做出了贡献
此外,我们自豪地宣布 Olivier Grisel 已接受 Dask 项目的提交权限。Olivier 在分布式调度器以及 Joblib、SKLearn 和 Cloudpickle 等相关项目上尤为活跃。