本文讨论了 Dask 在任务调度方面的开销成本,然后提出了一个大致的加速计划。
本文写给其他维护者,经常提及内部细节。不适合广泛阅读。
当我们提交大型图时,在调用 .compute() 和实际开始在工作节点上执行任务之间会有一点延迟。在某些情况下,这种延迟会影响可用性和性能。
此外,在极少数情况下,任务之间的间隔也可能是一个问题,尤其是当这些任务非常短且由于某种原因无法延长时。
首先,这个问题影响大约 1-5% 的 Dask 用户。这些人希望相对快速地处理数百万个任务。让我们列举一些用例:
它不影响日常用户,这些用户处理 100GB 到几 TB 的数据,并且不介意等待 10 秒让事情开始运行。
当你调用 x.sum().compute() 时,会发生一些事情:
通常,现在大多数人关注步骤 1-6。一旦事情到达工作节点并且进度条开始移动,人们往往就不那么关心了(但也不是完全不关心)。
让我们看看其他项目做的一些事情,并看看是否有我们可以学习的地方。这些都是常见的建议,但大多数都存在挑战。
因为我们的流水线有许多阶段,每个阶段都可能由于不同的原因而变慢,所以我们必须做很多事情。此外,这是一个难题,因为在此级别更改项目的一个部分会对许多其他部分产生影响。本文的其余部分试图提出一套一致的更改方案。让我们从摘要开始:
我们将在下面更深入地讨论这些内容。
一两年前,我们通过高级图将图生成成本从用户代码编写时间转移到图优化时间。
y = x + 1 # 以前在这里进行图生成
(y,) = dask.optimize(y,) # 现在在这里进行
这确实提高了可用性,也让我们能够进行一些高级优化,有时可以跳过一些低级优化成本。
我们流水线的前四个阶段发生在客户端。
如果我们在这些阶段一直保持高级图表示,直到图通信,那么我们可以向调度器通信一个更紧凑的表示。我们可以减少很多这些成本,至少对于高级集合 API(delayed 和 client.submit 仍然会很慢,尽管 client.map 可能还好)。
这还有另外几个不错的好处:
(相关讨论在这里:https://github.com/dask/distributed/issues/3872)
原则上,修改分布式调度器以接受各种图层类型是一个繁琐但直接的问题。我不担心。
更大的担忧是如何处理低级图优化。目前有三个非常重要的优化:
为了将抽象图层传输到调度器,我们需要移除对这些低级图优化的需求。我认为可以通过结合两种方法来实现这一点:
我们已经在 blockwise 中做了一些,它有自己的融合,并通常消除了大部分对融合的需求。但其他类似 blockwise 的操作,比如 read_* 可能必须加入 Blockwise 系列。
让裁剪(culling)正常工作可能需要我们教导每个独立的图层如何跟踪每种图层类型中的依赖关系并自行裁剪。这可能会很棘手。
切片是可行的,我们只需要有人深入研究,理解所有当前的切片优化,并为这些计算创建高级图层。这对于一位优秀的硕士生来说将是一个很棒的项目。
高级 Blockwise 融合处理了许多低级融合的用例,但不是全部。例如,像 dd.read_parquet 或 da.from_zarr 这样的 I/O 层并未在高层进行融合。
我们可以通过将它们制成 blockwise 层(这需要扩展 blockwise 抽象,可能很难)来解决这个问题,或者替代地,如果我们高度确信知道它们的去向,我们可以在所有依赖项完成之前开始将尚未就绪的任务发送给工作节点。这将给我们带来一些与融合相同的结果,但会将所有任务类型分开(这对于诊断很有好处),并且可能仍然给我们带来与融合相同的性能优势。
因此,在我们移除了对低级优化的需求,并且直接将抽象图层发送到调度器之后,我们需要教导调度器如何解包这些图层。
这有点棘手,因为调度器不能运行用户 Python 代码(出于安全原因)。我们将不得不注册调度器预先知道并信任的层类型(如 blockwise、rechunk、dataframe shuffle)。我们将始终支持自定义层,并且这些层的速度将与以往相同,但如果我们全面采用高级层,希望对它们的需求会少得多。
一旦大多数棘手的细节被移到调度器中,我们将有一个地方可以专注于低级图状态操作。
Dask 的分布式调度器有两部分:
Jim 在这里有一个很有前景的有趣项目:https://github.com/jcrist/ery 减少工作节点和调度器之间的延迟会很好,并有助于加速本文开头列出的流水线中的阶段 7-8。
用一些更低级的语言重写状态机是可行的。理想情况下,这将使用当前维护者社区易于维护的语言(Cython?),但我们也可能考虑在这里建立一个更稳固的接口,允许其他团队安全地进行实验。
这样做有一些优点(不同团队可以进行更多实验),但也有一些成本(核心工作的分散以及用户的不匹配)。此外,我怀疑拆分也可能意味着我们可能会失去仪表板,除非那些其他团队非常小心地将相同的状态暴露给 Bokeh。
这里还有更多需要探索的地方。无论如何,我认为尝试将状态机与网络系统隔离开来可能是有意义的。也许这也使人们更容易在隔离环境下进行性能分析。
在与一些不同的团队交流时,大多数人表示对拥有多个不同的状态机代码持保留意见。MapReduce 和 Spark 就是这样做的,结果导致难以维护的社区动态。
一旦我们将所有内容都放入更智能的高级图层中,我们将更容易进行优化。
我们需要一种更好的方式来记录这些优化,使用一个独立的遍历系统和一套规则。我们中的一些人以前写过这些东西,也许是时候重新审视它们了。
这需要一些努力,但我认为它可以同时解决几个引人注目的问题。有一些棘手的地方需要做好:
为此,我认为我们需要对 Dask 相当熟悉的人才能做好。
并且还有相当多的后续工作:
我一直在思考如何正确地实施这一改变。从历史上看,由于维护者负担沉重,Dask 在过去几年的大部分变化都是增量的或外围的。然而,这个问题可能存在足够的压力,我们可以从一些组织获得一些专注的工程投入,这可能会改变其可行性。我们已经从一些团队获得了 25% 的时间投入。我很好奇我们是否能让一些人在几个月内投入 100% 的时间。