提交新活动

谢谢您!您的提交已收到!
糟了!提交表单时出了问题。

提交新闻报道

谢谢您!您的提交已收到!
糟了!提交表单时出了问题。

订阅新闻通讯

谢谢您!您的提交已收到!
糟了!提交表单时出了问题。
2020年7月21日

更快的调度

作者:

摘要

本文讨论了 Dask 在任务调度方面的开销成本,然后提出了一个大致的加速计划。

本文写给其他维护者,经常提及内部细节。不适合广泛阅读。

这个问题如何呈现?

当我们提交大型图时,在调用 .compute() 和实际开始在工作节点上执行任务之间会有一点延迟。在某些情况下,这种延迟会影响可用性和性能。

此外,在极少数情况下,任务之间的间隔也可能是一个问题,尤其是当这些任务非常短且由于某种原因无法延长时。

谁会关心?

首先,这个问题影响大约 1-5% 的 Dask 用户。这些人希望相对快速地处理数百万个任务。让我们列举一些用例:

  1. 10-100TB 规模的 Xarray/Pangeo 工作负载
  2. 大型表格数据上的 NVIDIA RAPIDS 工作负载(GPU 使计算变得快速,因此其他成本变得相对更大)
  3. 某些对冲基金内部的一些神秘用例

它不影响日常用户,这些用户处理 100GB 到几 TB 的数据,并且不介意等待 10 秒让事情开始运行。

成本的粗略分解

当你调用 x.sum().compute() 时,会发生一些事情:

  1. 图生成: Dask 集合库中的一些 Python 代码(例如 dask array)调用 sum 函数,这会在客户端生成一个任务图。
  2. 图优化: 然后我们在客户端优化该图,以移除不必要的工作,融合任务,应用重要的高级优化等等。
  3. 图序列化: 现在我们将该图打包成可以发送到调度器的形式。
  4. 图通信: 我们通过网络将这些字节发送到调度器。
  5. Scheduler.update_graph: 调度器接收到这些字节,解包它们,然后更新其内部数据结构。
  6. 调度: 调度器然后将就绪任务分配给工作节点。
  7. 与工作节点通信: 调度器向每个工作节点发送大量更小的消息,其中包含它们可以执行的任务。
  8. 工作节点工作: 工作节点然后执行这项工作,并开始与调度器来回通信以接收新的指令。

通常,现在大多数人关注步骤 1-6。一旦事情到达工作节点并且进度条开始移动,人们往往就不那么关心了(但也不是完全不关心)。

其他人做什么?

让我们看看其他项目做的一些事情,并看看是否有我们可以学习的地方。这些都是常见的建议,但大多数都存在挑战。

  1. 使用 C++/Rust/C/Cython 重写调度器
  2. 提案:Python 慢。想让它更快?别用 Python。参考学术项目。
  3. 挑战:这对于上述流水线中的某些部分有意义,但对其他部分没有。它也使得吸引维护者变得更困难。
  4. 我们应该考虑:调度器和优化算法的某些部分可以用更低级的语言编写,比如 Cython。我们需要注意可维护性。
  5. 分布式调度
  6. 提案:调度器慢,也许可以有很多调度器?参考 Ray。
  7. 挑战:如果调度状态分散在许多计算机上,Dask 必须做出的那种决策实际上非常困难。分布式调度在工作负载非常统一或高度解耦时效果更好。分布式调度对于喜欢解决有趣/困难问题的人来说非常有吸引力。
  8. 我们应该考虑:我们可以将一些简单的逻辑下移到工作节点。不过,我们已经对容易的部分这样做了。不清楚这里还有多少额外的好处。
  9. 围绕集合构建专用调度
  10. 提案:如果 Dask 只成为一个数据框库或一个数组计算库,那么它可以更有效地处理特殊情况。参考 Spark、Mars 和其他项目。
  11. 挑战:是的,但 Dask 不是数据框库或数组库。我们上面提到的三个用例都非常不同。
  12. 我们应该考虑:dask array 和 dask dataframe 等模块应该开发高级查询块,并且我们应该努力直接通过网络传输这些子图,以便它们更紧凑。

我们实际上应该做什么?

因为我们的流水线有许多阶段,每个阶段都可能由于不同的原因而变慢,所以我们必须做很多事情。此外,这是一个难题,因为在此级别更改项目的一个部分会对许多其他部分产生影响。本文的其余部分试图提出一套一致的更改方案。让我们从摘要开始:

  1. 对于 Dask array/dataframe,让我们更积极地使用高级图,以便我们可以在客户端和调度器之间仅通信抽象表示。
  2. 但这尤其会破坏低级图优化,例如融合、裁剪和切片融合。通过两项更改,我们可以使这些变得不必要:
  3. 我们可以让高级图变得更加智能,以处理裁剪和切片融合。
  4. 我们可以将更多的调度下移到工作节点,以在那里复制低级融合的优势。
  5. 然后,一旦所有的图操作都发生在调度器上,让我们尝试加速它,最好使用当前开发者社区能够理解的语言,比如 Cython。
  6. 与此同时并行地,让我们看看我们的网络栈。

我们将在下面更深入地讨论这些内容。

图生成

高级图历史

一两年前,我们通过高级图将图生成成本从用户代码编写时间转移到图优化时间。

y = x + 1 # 以前在这里进行图生成
(y,) = dask.optimize(y,) # 现在在这里进行

这确实提高了可用性,也让我们能够进行一些高级优化,有时可以跳过一些低级优化成本。

我们能进一步推进吗?

我们流水线的前四个阶段发生在客户端。

  1. 图生成: Dask 集合库中的一些 Python 代码(例如 dask array)调用 sum 函数,这会在客户端生成一个任务图。
  2. 图优化: 然后我们在客户端优化该图,以移除不必要的工作,融合任务,应用重要的高级优化等等。
  3. 图序列化: 现在我们将该图打包成可以发送到调度器的形式。
  4. 图通信: 我们通过网络将这些字节发送到调度器。

如果我们在这些阶段一直保持高级图表示,直到图通信,那么我们可以向调度器通信一个更紧凑的表示。我们可以减少很多这些成本,至少对于高级集合 API(delayed 和 client.submit 仍然会很慢,尽管 client.map 可能还好)。

这还有另外几个不错的好处:

  1. 用户的代码不会阻塞,我们可以立即通知用户我们正在处理。
  2. 我们已将成本集中在调度器中,所以现在只有一个地方需要考虑低级代码。

(相关讨论在这里:https://github.com/dask/distributed/issues/3872)

然而,低级图优化将是一个问题。

原则上,修改分布式调度器以接受各种图层类型是一个繁琐但直接的问题。我不担心。

更大的担忧是如何处理低级图优化。目前有三个非常重要的优化:

  1. 任务融合:这就是将你的 read_parquet 任务与后续的 blockwise 任务合并的原因。
  2. 裁剪(Culling):这就是让 df.head() 或 x[0] 快速的原因。
  3. 切片融合:这就是为什么 x[:100][5] 工作得很好的原因。

为了将抽象图层传输到调度器,我们需要移除对这些低级图优化的需求。我认为可以通过结合两种方法来实现这一点:

更聪明的高级图操作

我们已经在 blockwise 中做了一些,它有自己的融合,并通常消除了大部分对融合的需求。但其他类似 blockwise 的操作,比如 read_* 可能必须加入 Blockwise 系列。

让裁剪(culling)正常工作可能需要我们教导每个独立的图层如何跟踪每种图层类型中的依赖关系并自行裁剪。这可能会很棘手。

切片是可行的,我们只需要有人深入研究,理解所有当前的切片优化,并为这些计算创建高级图层。这对于一位优秀的硕士生来说将是一个很棒的项目。

将推测性任务发送给工作节点

高级 Blockwise 融合处理了许多低级融合的用例,但不是全部。例如,像 dd.read_parquet 或 da.from_zarr 这样的 I/O 层并未在高层进行融合。

我们可以通过将它们制成 blockwise 层(这需要扩展 blockwise 抽象,可能很难)来解决这个问题,或者替代地,如果我们高度确信知道它们的去向,我们可以在所有依赖项完成之前开始将尚未就绪的任务发送给工作节点。这将给我们带来一些与融合相同的结果,但会将所有任务类型分开(这对于诊断很有好处),并且可能仍然给我们带来与融合相同的性能优势。

在调度器上解包抽象图层

因此,在我们移除了对低级优化的需求,并且直接将抽象图层发送到调度器之后,我们需要教导调度器如何解包这些图层。

这有点棘手,因为调度器不能运行用户 Python 代码(出于安全原因)。我们将不得不注册调度器预先知道并信任的层类型(如 blockwise、rechunk、dataframe shuffle)。我们将始终支持自定义层,并且这些层的速度将与以往相同,但如果我们全面采用高级层,希望对它们的需求会少得多。

用低级语言重写调度器

一旦大多数棘手的细节被移到调度器中,我们将有一个地方可以专注于低级图状态操作。

Dask 的分布式调度器有两部分:

  1. 一个接收来自客户端和工作节点信号并向客户端和工作节点发送信号的 Tornado TCP 应用程序。
  2. 这是异步繁重的网络代码。
  3. 内部一个复杂的、响应这些状态变化的有限状态机。
  4. 这是复杂数据结构繁重的 Python 代码。

网络

Jim 在这里有一个很有前景的有趣项目:https://github.com/jcrist/ery 减少工作节点和调度器之间的延迟会很好,并有助于加速本文开头列出的流水线中的阶段 7-8。

状态机

用一些更低级的语言重写状态机是可行的。理想情况下,这将使用当前维护者社区易于维护的语言(Cython?),但我们也可能考虑在这里建立一个更稳固的接口,允许其他团队安全地进行实验。

这样做有一些优点(不同团队可以进行更多实验),但也有一些成本(核心工作的分散以及用户的不匹配)。此外,我怀疑拆分也可能意味着我们可能会失去仪表板,除非那些其他团队非常小心地将相同的状态暴露给 Bokeh。

这里还有更多需要探索的地方。无论如何,我认为尝试将状态机与网络系统隔离开来可能是有意义的。也许这也使人们更容易在隔离环境下进行性能分析。

在与一些不同的团队交流时,大多数人表示对拥有多个不同的状态机代码持保留意见。MapReduce 和 Spark 就是这样做的,结果导致难以维护的社区动态。

高级图优化

一旦我们将所有内容都放入更智能的高级图层中,我们将更容易进行优化。

我们需要一种更好的方式来记录这些优化,使用一个独立的遍历系统和一套规则。我们中的一些人以前写过这些东西,也许是时候重新审视它们了。

我们需要什么

这需要一些努力,但我认为它可以同时解决几个引人注目的问题。有一些棘手的地方需要做好:

  1. 高级图层的框架
  2. 高级图层的优化系统
  3. 将调度器分成两部分

为此,我认为我们需要对 Dask 相当熟悉的人才能做好。

并且还有相当多的后续工作:

  1. 为 dask dataframe 构建一个层级结构
  2. 为 dask array 构建一个层级结构
  3. 为这些构建优化,以消除对低级图优化的需求。
  4. 用 Cython 重写调度器的核心部分
  5. 试验网络层,也许使用新的 Comm。

我一直在思考如何正确地实施这一改变。从历史上看,由于维护者负担沉重,Dask 在过去几年的大部分变化都是增量的或外围的。然而,这个问题可能存在足够的压力,我们可以从一些组织获得一些专注的工程投入,这可能会改变其可行性。我们已经从一些团队获得了 25% 的时间投入。我很好奇我们是否能让一些人在几个月内投入 100% 的时间。