提交新活动

谢谢您!您的提交已收到!
糟糕!提交表格时出错了。

提交新闻稿

谢谢您!您的提交已收到!
糟糕!提交表格时出错了。

订阅新闻通讯

谢谢您!您的提交已收到!
糟糕!提交表格时出错了。
2020年7月30日

配置分布式Dask集群

作者

配置Dask集群起初可能看起来令人望而生畏,但好消息是Dask项目内置了许多启发式算法,它们会根据部署机器和接收到的工作量,尽力预测并适应您的工作负载。可能在很长一段时间内,您根本不需要进行任何特殊配置。话虽如此,如果您正在寻找从本地使用Dask转向部署的技巧,或者您的Dask集群已准备好通过更深入的配置进行优化,那么这些提示和技巧将帮助您入门,并引导您找到关于该主题的最佳Dask文档!

如何托管分布式Dask集群

对我来说,最大的飞跃是从开发期间每次只运行一小时左右的本地版Dask,到建立一个生产就绪的Dask版本。大致有两种风格:

  1. 静态Dask集群——始终开启、始终唤醒、随时准备接受工作
  2. 瞬时Dask集群——可以通过Python API轻松启动或关闭,启动后会启动一个最小化的Dask主节点,该节点仅在实际提交工作时才会启动Dask工作节点。

尽管这是两大主要类别,但实际实现的方式有很多选择。这取决于许多因素,包括您想使用哪些云服务提供商的产品,这些资源是否已为您预配置,以及您是想使用Python API还是其他部署工具来实际启动Dask进程。Dask文档的Setup部分详细列出了所有可以配置Dask集群的不同方法。作为这些文档中描述内容的一小部分,您可以:

  • 在您配置的云实例上(例如AWS EC2或GCP GCE),从命令行手动安装并启动Dask进程
  • 使用流行的部署界面,例如用于Kubernetes的Helm,将Dask部署到您配置的云容器集群中,例如AWS Fargate或GCP GKE
  • 使用Dask开发者提供的“原生”部署Python API,在他们支持的部署基础设施上创建(并交互式配置)Dask,可以通过支持多种后端的通用Dask Gateway,或直接对接集群管理器,例如使用dask-kubernetes对接Kubernetes或使用dask-yarn对接YARN,前提是您已经预配置好了Kubernetes集群或Hadoop集群。
  • 使用一个近乎全服务的部署Python API,称为Dask Cloud Provider,它会更进一步,为您配置集群,前提是您提供AWS凭证(截至撰写本文时,它仅支持AWS)。

如您所见,有很多选择。除了所有这些之外,您还可以聘请托管服务提供商根据您的规范为您配置Dask集群,例如Saturn Cloud免责声明:其中一位作者(Julia Signell)为Saturn Cloud工作)。

无论您选择哪种方式,核心目标都是以尽可能可扩展的方式释放Dask提供的Python并行计算能力,而这正是将其运行在分布式基础设施上的意义所在。一旦您确定了部署Dask集群的位置和使用的API,Dask集群及其工作负载的实际配置过程就开始了。

如何为您的集群选择实例类型

当您准备好设置生产环境的Dask集群时,您需要对调度器和工作节点运行的基础设施做出一些决定,特别是如果您使用的是如何托管分布式Dask集群中需要预先配置基础设施的选项。无论您的基础设施是本地的还是在云端,都需要做出经典的决策:

  • 内存需求
  • CPU需求
  • 存储需求

如果您已经在本地测试过您的工作负载,一个简单的启发式方法是,将您工作消耗的CPU、存储和内存乘以一个乘数,该乘数与您的本地实验相比预期生产使用量的缩小比例有关。例如,如果您使用10%的数据样本在本地测试工作负载,将观察到的资源使用量乘以至少10,可能会接近您的最小实例大小。尽管实际上Dask的许多底层优化意味着处理更多数据通常不需要资源线性增长,但这种简单的启发式方法可以作为初步评估的良好起点。

同样地,选择最小的实例,并使用预定的数据子集运行,然后逐步扩展直到有效运行,这为您提供了最小实例大小的线索。如果您的本地环境资源不足,无法使用10%或更多的源数据在本地运行您的流程,如果环境差异很大(例如不同的操作系统,或后台运行着许多竞争应用程序),或者如果使用本地机器难以或不便监控流程执行时的CPU、内存和存储,那么在最小可用的节点上隔离测试用例是更好的选择。

另一方面,选择您能负担得起的最大实例,观察最大CPU/内存/存储指标与实际使用的差异,并根据未使用的资源比例进行缩减,这可能是找到理想大小的更快方法。

节点大小的选择可能很大程度上取决于您的预算,但只要您的节点大小足够大,可以避免严格的内存不足错误,那么在接近您最低运行规格的节点上支付的成本,其另一面就是时间。由于您的Dask集群的目的是运行分布式并行计算,如果您扩展实例以允许更多的并行性,您可以节省大量时间。如果您有需要几个小时训练的长时间运行模型,但可以通过扩展将其缩短到几分钟,从而节省您或您员工的时间,快速看到反馈循环,那么超出最低规格进行扩展是值得的。

您的调度器节点和工作节点应该使用相同的大小吗?使用不同的实例大小来配置它们以优化资源当然很有吸引力。快速了解一下它们各自的一般资源需求是值得的,以便有个概念。

对于调度器,每个提交给它的任务的序列化版本会保留在内存中,直到它决定哪个工作节点应该接收该工作。这不一定与实际执行任务所需的内存量相同,但在内存方面过于吝啬可能会阻止工作被调度。从CPU的角度看,调度器的需求可能远低于您的工作节点,但如果调度器CPU不足会导致死锁,当调度器卡住或崩溃时,您的工作节点也无法获得任何工作。在存储方面,Dask调度器不会将太多内容持久化到磁盘,即使是临时文件,所以其存储需求非常低。

对于工作节点,您的任务代码的具体资源需求可能会超出我们能做出的任何概括。至少,它们需要足够的内存和CPU来反序列化每个任务负载,并将其再次序列化作为Future返回给Dask调度器。Dask工作节点可能会将计算结果保存在内存中,包括分布在集群内存中,您可以在此处阅读更多相关信息。关于存储需求,提交给Dask工作节点的任务基本上不应该写入本地存储——调度器不保证工作会在给定工作节点上运行——因此存储成本应直接与工作节点依赖项的安装 footprint 以及Dask工作节点的任何临时存储有关。工作节点创建的临时文件可能包括在内存不足时将内存数据溢出到本地磁盘,前提是该行为未被禁用,这意味着减少内存可能会影响您的临时存储需求。

通常我们建议简化您的操作,将调度器和工作节点设置为相同的节点大小,但如果您想优化它们,可以使用上述CPU、内存和存储模式作为单独配置它们的起点。

如何选择工作节点的数量

每个Dask集群有一个调度器和任意数量的工作节点。调度器跟踪需要完成的工作和已经完成的工作。工作节点执行工作,在它们之间共享结果,并向调度器报告。有关这方面的更多背景信息,请参阅dask.distributed文档

设置Dask集群时,您必须决定使用多少工作节点。使用许多工作节点可能很有诱惑力,但这并非总是好事。如果您使用太多工作节点,有些可能没有足够的工作可做,大部分时间处于空闲状态。即使它们有足够的工作,它们之间可能需要共享数据,这会很慢。此外,如果您的机器资源有限(而不是每个工作节点一个节点),那么每个工作节点会更弱——它们可能会耗尽内存,或者完成任务需要很长时间。

另一方面,如果您使用的工人节点太少,就无法充分利用Dask的并行性,您的工作总体完成时间可能会更长。

在决定使用多少工作节点之前,请尝试使用默认设置。在许多情况下,Dask可以根据您机器的大小和形状选择一个默认设置。如果这不起作用,那么您需要了解一些关于您的工作大小和形状的信息。特别是,您会想知道:

  1. 您的计算机有多大,或者您能访问哪些类型的计算节点?
  2. 您的数据有多大?
  3. 您尝试进行的计算的结构是什么?

如果您在本地机器上工作,那么计算机的大小是固定的且可知。如果您在HPC或云实例上工作,则可以选择分配给每个工作节点的资源。您根据我们在如何为您的集群选择实例类型中讨论的因素来决定集群的大小。

Dask通常用于数据太大无法完全载入内存的情况。在这种情况下,数据被分割成块(chunks)或分区(partitions)。每个任务都在块上计算,然后结果被聚合。您将在下方了解如何改变数据的形状。

计算的结构可能是最难理解的。如果可能,尝试在非常小的数据子集上进行计算可能会有所帮助。您可以通过调用 .visualize() 来查看特定计算的任务图。如果任务图太大,无法舒适地在线查看,那么可以查看Dask仪表板的图形标签页。它在任务图运行时显示并点亮每个部分。为了使Dask最有效率,您需要一个不太大也不太相互关联的任务图。Dask文档讨论了几种优化任务图的技术。

要选择使用多少工作节点,请考虑在图的任何给定部分有多少并发任务。如果每个任务都包含相当数量的工作,那么运行Dask的最快方法是为每个并发任务分配一个工作节点。对于分块数据,如果每个工作节点都能轻松地在内存中容纳一个数据块并对其进行一些计算,那么块的数量应该是工作节点数量的倍数。这确保了工作节点总是有足够的工作可做。

如果您的任务数量变化很大,那么您也可以考虑使用自适应集群。在自适应集群中,您设置最小和最大工作节点数量,并让集群根据需要添加和移除工作节点。当调度器确定某些工作节点不再需要时,它会要求集群关闭它们;当需要更多工作节点时,调度器会要求集群启动更多。这对于任务图很有用,例如开始时输入任务较少,中间任务较多,最后有一些聚合或归约操作。

一旦您启动了一些工作节点,您可以在Dask仪表板中监控它们的进度。在那里您可以查看它们的内存消耗,观察它们在任务图中的进展,以及访问工作节点级别的日志。通过这种方式观察您的计算,可以深入了解潜在的加速方法,并建立未来使用工作节点数量的直觉。

选择使用多少工作节点的难点在于,实际上您的机器、数据和任务图的大小和形状可能会发生变化。弄清楚要使用多少工作节点最终可能感觉像是在不停地调整旋钮。如果这让您开始抓狂,请记住,即使集群正在运行,您也可以随时更改工作节点的数量。

如何选择nthreads以利用多线程

当启动Dask工作节点时,有两个非常重要的配置选项可以互相配合:工作节点的数量和每个工作节点的线程数(nthreads)。您实际上可以使用标志在同一个工作节点进程上同时操作这两个选项,例如使用 dask-worker --nprocs 2 --nthreads 2 的形式,不过 --nprocs 只是在后台启动另一个工作节点,因此更清晰的配置是避免设置 --nprocs,而是使用您用于指定总工作节点数量的方式来操作该配置。我们已经讨论了如何选择工作节点数量,但如果您更改工作节点的 --nthreads 以增加单个工作节点可以完成的工作量,您可能会修改关于工作节点数量的决定。

在决定工作节点的最佳线程数(nthreads)时,关键在于您期望这些工作节点完成的工作类型。基本原则是,多线程最适合在任务之间共享数据,但如果运行的代码不释放Python的GIL(“全局解释器锁”),则效果会更差。增加不释放Python GIL的工作的线程数没有任何效果;如果GIL被锁定,工作节点无法使用线程来优化计算速度。这对于想要增加并行性但提高工作节点线程限制却看不到任何好处的Dask新用户来说,可能是困惑点。

Dask关于工作节点的文档中所述,有一些经验法则可以判断何时需要担心GIL锁定,并因此优先选择更多的工作节点,而不是使用高线程数(nthreads)的更“重”的单个工作节点:

  • 如果您的代码主要是在非数值数据上使用纯Python(非优化Python库)
  • 如果您的代码导致Python外部长时间运行的计算,并且没有显式释放GIL

方便的是,很多Dask用户完全使用针对多线程优化的Python库(即PyData堆栈中的NumPy、Pandas、SciPy等)运行数值计算。如果您主要使用这些或类似优化的库进行数值计算,您应该强调使用更高的线程数。如果您确实主要进行数值计算,您可以指定的总线程数可以与您拥有的核心数一样多;如果您进行的任何工作会导致线程暂停,例如任何I/O操作(比如将结果写入磁盘,或许),您可以指定比核心数更多的线程,因为有些线程偶尔会处于空闲状态。在这种情况下,设置比核心数多多少个线程的理想数量很难估计,并且取决于您的工作负载,但参考concurrent.futures的一些建议,机器处理器数量的5倍是一个历史悠久的上限,可用于限制高度依赖I/O的工作负载的总线程数。

如何对数组进行分块和对DataFrame进行分区

在Dask中有许多不同的触发工作的方法。例如:您可以使用delayed包装函数或直接将工作提交给客户端(选项比较请参阅用户界面)。如果您将结构化数据加载到Dask对象中,那么您很可能在使用dask.arraydask.dataframe。这些模块分别模仿了numpy和pandas——使与大型数组和大型表格数据集的交互更加容易。

当使用dask.dataframe和dask.array时,计算通过将数据分割成小块在工作节点之间分配。在dask.dataframe中,这些小块称为分区(partitions),在dask.array中称为块(chunks),但原理是相同的。在dask.array的情况下,每个块包含一个numpy数组;在dask.dataframe的情况下,每个分区包含一个pandas dataframe。无论哪种情况,每个小块都包含数据的一小部分,但代表整体,并且必须足够小以便轻松地装入工作节点内存。

通常在加载数据时,分区/块会自动确定。例如,从包含许多csv文件的目录中读取时,每个文件将成为一个分区。如果您的数据默认没有分割,可以使用df.set_index或array.rechunk手动完成。如果数据默认已经分割,并且您想更改块的形状,文件级别的块应该是Dask级别块的倍数(在此处阅读更多)。

作为用户,您知道数据将如何使用,因此通常可以以导致更高效计算的方式对其进行分区。例如,如果您要按月聚合数据,沿时间轴进行分块可能是有意义的。如果您改为查看不同海拔高度的特定特征,沿海拔高度进行分块可能是有意义的。在最佳实践中描述了更多关于dask.arrays分块的技巧。另一种可能有助于重新分区的情况是,如果您已将数据过滤到原始数据的一个子集。在这种情况下,您的分区可能会太小。有关如何处理这种情况的更多详细信息,请参阅dask.dataframe的最佳实践

选择块大小时,最好既不要太小也不要太大(通常100MB左右是合理的)。每个块都需要能够装入工作节点内存,并且对该块的操作应该花费一些非零时间(超过100毫秒)。有关更多建议,请查看关于分区的文档。

我们希望这些信息能帮助您决定是否以不同的方式配置您的Dask部署,并给您尝试的信心。我们在Dask文档中找到了所有这些很棒的信息,所以如果您感到受到启发,请点击我们散布在文中的链接,了解更多关于Dask的信息!