提交新活动

谢谢!您的提交已收到!
噢!提交表单时出错。

提交新闻报道

谢谢!您的提交已收到!
噢!提交表单时出错。

订阅新闻简报

谢谢!您的提交已收到!
噢!提交表单时出错。
Aug 2, 2019

Dask 2.2.0 版本发布

作者

我很高兴地宣布 Dask 2.2 版本发布。这是一个重要的版本,包含错误修复和新功能。上一篇博客发布的版本是 2019 年 6 月 22 日的 2.0 版本。本篇博客文章概述了自上一篇文章以来值得注意的变更。

您可以使用 conda 安装 Dask

conda install dask

或从 PyPI 使用 pip 安装

pip install dask[complete] --upgrade

完整的变更日志可在此处查看

值得注意的变更

和往常一样,这里列出的变更太多了,因此我们将重点介绍一些读者可能感兴趣或会破坏旧行为的变更。我们将特别讨论以下几点

  1. Parquet 重写
  2. 客户端和日志的 HTML 输出更美观
  3. 使用 Dask-ML 中的 Hyperband 进行超参数选择
  4. 将字节 I/O 处理从 Dask 移至 FSSpec
  5. 随处使用 async/await,并为开发者提供更简洁的设置
  6. 一种新的 SSH 部署解决方案

1 - Parquet 重写

现在,Dask DataFrame 可以使用fastparquetApache Arrow 读取和写入 Parquet 数据。

import dask.dataframe as dd

df = dd.read_parquet("/path/to/mydata.parquet", engine="arrow")
# 或
df = dd.read_parquet("/path/to/mydata.parquet", engine="fastparquet")

在 Dask 中支持这两个库对用户很有帮助,但带来了一些维护负担,特别是考虑到每个库多年来都与 Dask DataFrame 共同演进。Dask DataFrame 与这些库之间的契约关系复杂,使其难以快速发展。

为了解决这个问题,我们将 Dask 对 Parquet 读取/写入器的期望正式化为一个更正式的 Parquet 引擎契约。这降低了维护成本,使得每个项目都能独立开发,并允许新引擎的出现。

RAPIDS cuDF 库的一个 PR 中已经提供了 GPU 加速的 Parquet 读取器。

因此,我们也能够修复许多长期存在的 bug,并改进了这两个引擎的功能。

来自 Sarah Bird 在开发过程中的一些有趣引言

我正在测试这个。目前为止一切顺利。我可以在几秒钟内加载包含 1800 个分区的我的数据集。改变游戏规则!

以及

我现在成功地处理了一个包含 74,000 个分区且没有元数据的数据集。打开数据集并运行 df.head() 需要 7 到 30 秒。(大概取决于 s3fs 缓存是否冷)。这太棒了!以前这简直是不可能的。

API 保持不变,但功能应该更流畅。

感谢 Rick ZamoraMartinDurant 在此方面所做的大部分工作,以及Sarah BirdWesMcKinneyMikeMcCarty 提供的指导和评审。

2 - 客户端和日志的 HTML 输出更美观

from dask.distributed import Client
client = Client()

客户端

集群

  • 工作节点4
  • 内核数12
  • 内存: 17.18 GB

client.cluster.logs()

Schedulerdistributed.scheduler - INFO - 清除任务状态
distributed.scheduler - INFO - 调度器地址: tcp://127.0.0.1:60275
distributed.scheduler - INFO - 仪表盘地址: 127.0.0.1:8787
distributed.scheduler - INFO - 注册 tcp://127.0.0.1:60281
distributed.scheduler - INFO - 注册 tcp://127.0.0.1:60282
distributed.scheduler - INFO - 启动工作节点计算流, tcp://127.0.0.1:60281
distributed.scheduler - INFO - 启动工作节点计算流, tcp://127.0.0.1:60282
distributed.scheduler - INFO - 注册 tcp://127.0.0.1:60285
distributed.scheduler - INFO - 注册 tcp://127.0.0.1:60286
distributed.scheduler - INFO - 启动工作节点计算流, tcp://127.0.0.1:60285
distributed.scheduler - INFO - 启动工作节点计算流, tcp://127.0.0.1:60286
distributed.scheduler - INFO - 接收客户端连接: Client-6b6ba1d0-b3bd-11e9-9bd0-acde48001122tcp://127.0.0.1:60281distributed.worker - INFO - 在以下地址启动工作节点: tcp://127.0.0.1:60281
distributed.worker - INFO - 正在监听: tcp://127.0.0.1:60281
distributed.worker - INFO - 正在等待连接到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 线程数: 3
distributed.worker - INFO - 内存: 4.29 GB
distributed.worker - INFO - 本地目录: /Users/mrocklin/workspace/dask/dask-worker-space/worker-c4_44fym
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 已注册到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------tcp://127.0.0.1:60282distributed.worker - INFO - 在以下地址启动工作节点: tcp://127.0.0.1:60282
distributed.worker - INFO - 正在监听: tcp://127.0.0.1:60282
distributed.worker - INFO - 正在等待连接到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 线程数: 3
distributed.worker - INFO - 内存: 4.29 GB
distributed.worker - INFO - 本地目录: /Users/mrocklin/workspace/dask/dask-worker-space/worker-quu4taje
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 已注册到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------tcp://127.0.0.1:60285distributed.worker - INFO - 在以下地址启动工作节点: tcp://127.0.0.1:60285
distributed.worker - INFO - 正在监听: tcp://127.0.0.1:60285
distributed.worker - INFO - 正在等待连接到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 线程数: 3
distributed.worker - INFO - 内存: 4.29 GB
distributed.worker - INFO - 本地目录: /Users/mrocklin/workspace/dask/dask-worker-space/worker-ll4cozug
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 已注册到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------tcp://127.0.0.1:60286distributed.worker - INFO - 在以下地址启动工作节点: tcp://127.0.0.1:60286
distributed.worker - INFO - 正在监听: tcp://127.0.0.1:60286
distributed.worker - INFO - 正在等待连接到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 线程数: 3
distributed.worker - INFO - 内存: 4.29 GB
distributed.worker - INFO - 本地目录: /Users/mrocklin/workspace/dask/dask-worker-space/worker-lpbkkzj6
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - 已注册到: tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------

注意:在除 IE 和 Edge 之外的任何浏览器下,显示效果更佳

感谢 Jacob Tomlinson 完成这项工作。

3 - 使用 HyperBand 进行超参数选择

Dask-ML 1.0 已发布,其中包含一个新的 HyperbandSearchCV 元估计器用于超参数优化。它可以作为 RandomizedSearchCV 的替代方案,通过不浪费时间在没有前景的超参数上,从而在更短的时间内找到相似的超参数。

>>> import numpy as np
>>> from dask_ml.model_selection import HyperbandSearchCV
>>> from dask_ml.datasets import make_classification
>>> from sklearn.linear_model import SGDClassifier

>>> X, y = make_classification(chunks=20)
>>> est = SGDClassifier(tol=1e-3)
>>> param_dist = {'alpha': np.logspace(-4, 0, num=1000),
>>> 'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
>>> 'average': [True, False]}

>>> search = HyperbandSearchCV(est, param_dist)
>>> search.fit(X, y, classes=np.unique(y))
>>> search.best_params_
{'loss': 'log', 'average': False, 'alpha': 0.0080502}

感谢 Scott Sievert。您可以通过观看他在 SciPy 2019 上的演讲,更深入地了解这个话题。

4 - 将字节 I/O 处理从 Dask 移至 FSSpec

我们已将 Dask 用于读取和写入原始数据到不同存储系统的内部代码分离到一个独立的项目 fsspec 中。

下面是一个小例子

import fsspec

with fsspec.open("https://github.com/dask/dask/edit/master/README.rst") as f
print(f.read(1000))

with fsspec.open("s3://bucket/myfile.csv") as f
df = pd.read_csv(f)

with fsspec.open("hdfs:///path/to/myfile.csv") as f
df = pd.read_csv(f)

with fsspec.open("gcs://bucket/myfile.csv") as f
df = pd.read_csv(f)

Dask 用于从 HDFS、S3、GCS、Azure 和其他远程存储系统读取和写入字节的 I/O 基础设施可以说是目前 Python 中最统一和全面的。通过 s3fsgcsfs 和 hdfs3 pyarrow.hdfs 等工具,可以轻松地以 Pythonic 的方式读写各种远程存储系统中的数据。

很早我们就决定希望这部分代码位于 Dask 主代码库之外,这就是它们是独立项目的原因。这个选择使得 Pandas、Zarr 等其他库可以受益于这项工作,而无需严格依赖于 Dask。然而,Dask 内部仍然有一些代码有助于将它们统一起来。我们已经将这部分代码移至外部项目 fsspec,该项目包含了 Dask 过去提供的所有集中化代码,以及关于远程数据系统为实现兼容性应具备何种特性的正式规范。这也有助于与 Arrow 等其他项目统一努力。

特别感谢 Martin Durant 多年来对 Dask I/O 基础设施的引导,以及完成了将 fsspec 分离出来的更直接的工作。

您可以在此处阅读更多关于 FSSpec 及其从 Dask 中分离出来的信息。

5 - 随处使用 Async/Await,为开发者提供更简洁的设置

在 Dask 2.0 中,我们放弃了对 Python 2 的支持,现在仅支持 Python 3.5 及更高版本。这使我们能够采用 async 和 await 语法进行并发执行,而不是使用基于 yield 的旧协程方法。这些差异最初主要是外观上的,但在我们检查代码库并进行清理的过程中,触发了许多实质性的改进。现在启动和停止内部的 Scheduler、Worker、Nanny 和 Client 对象更加统一,减少了潜在 bug 的出现。

Python API 设置文档中有更详细的讨论,并在此文档的代码示例中体现

import asyncio

from dask.distributed import Scheduler, Worker, Client

async def f()
async with Scheduler() as s
async with Worker(s.address) as w1, Worker(s.address) as w2
async with Client(s.address, asynchronous=True) as client
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)

asyncio.get_event_loop().run_until_complete(f())

由于此项改进和其他内部清理工作,我们 CI 中偶发的测试失败现象已消失,开发者心情也很愉快 :)

6 - 新的 SSHCluster

我们添加了第二种 SSH 集群部署解决方案。它看起来像这样

from distributed.deploy.ssh2 import SSHCluster # 这将在未来版本中移动

cluster = SSHCluster(
hosts=["host1", "host2", "host3", "host4"],
# hosts=["localhost"] * 4 # 如果你想在本地尝试此方法,
worker_kwargs={"nthreads": 4},
scheduler_kwargs={},
connect_kwargs={"known_hosts": None}
)

请注意,此对象是实验性的,可能会在未通知的情况下更改

我们致力于此项工作有两个原因

  1. 我们的用户调查显示,令人惊讶的是许多人正在使用 SSH 部署 Dask。据传他们似乎只是通过 SSH 连接到机器,然后使用 Dask 的标准命令行界面 (CLI)
  2. 我们想要一个比这更容易的解决方案。
  3. 我们一直试图将各种部署解决方案(如 Kubernetes、SLURM、Yarn/Hadoop)的代码统一到一个中央代码库中,而将简单的 SSHCluster 作为测试用例已被证明对测试和实验很有价值。

另请注意,目前 Dask 已有一个更成熟的dask-ssh 解决方案

我们预计部署的统一将在未来几个月的开发中成为一个核心主题。

致谢

自上次发布博客文章以来,共有两个版本发布。以下人员自 6 月 30 日 2.0 版本发布以来,对以下代码库做出了贡献

  • dask/dask
  • Brett Naul
  • Daniel Saxton
  • David Brochart
  • Davis Bennett
  • Elliott Sales de Andrade
  • GALI PREM SAGAR
  • James Bourbeau
  • Jim Crist
  • Loïc Estève
  • Martin Durant
  • Matthew Rocklin
  • Matthias Bussonnier
  • Natalya Rapstine
  • Nick Becker
  • Peter Andreas Entschev
  • Ralf Gommers
  • Richard (Rick) Zamora
  • Sarah Bird
  • Sean McKenna
  • Tom Augspurger
  • Willi Rath
  • Xavier Holt
  • andrethrill
  • asmith26
  • msbrown47
  • tshatrov
  • dask/distributed
  • Christian Hudon
  • Gabriel Sailer
  • Jacob Tomlinson
  • James Bourbeau
  • Jim Crist
  • Martin Durant
  • Matthew Rocklin
  • Pierre Glaser
  • Russ Bubley
  • tjb900
  • dask/dask-jobqueue
  • Guillaume Eynard-Bontemps
  • Leo Singer
  • Loïc Estève
  • Matthew Rocklin
  • Stuart Berg
  • dask/dask-examples
  • Chris White
  • Ian Rose
  • Matthew Rocklin
  • dask/dask-mpi
  • Anderson Banihirwe
  • Kevin Paul
  • Matthew Rocklin
  • dask/dask-kubernetes
  • Matthew Rocklin
  • Tom Augspurger
  • dask/dask-ml
  • Roman Yurchak
  • Tom Augspurger
  • dask/dask-yarn
  • Al Johri
  • Jim Crist
  • dask/dask-examples