提交新活动

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

提交新闻报道

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

订阅通讯

谢谢!您的提交已收到!
糟糕!提交表单时出错了。
2020年7月23日

Dask 分布式集群的现状

作者

Dask 使您能够构建所需计算的图,然后为您并行执行。这对于充分利用计算机硬件非常有用。当您想突破单台机器的限制时,这也非常有用。

在这篇文章中,我们将介绍:

手动设置

让我们首先介绍设置分布式 Dask 集群最直接的方法。

设置调度器和工作节点

假设我们有三台计算机,分别称为 MachineA、MachineB 和 MachineC。每台机器都有一个正常的 Python 环境,并且我们已经使用 `conda install dask` 安装了 Dask。如果想将它们组合成一个 Dask 集群,首先要在 MachineA 上运行一个调度器。

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Local Directory: /tmp/scheduler-btqf8ve1
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://MachineA:8786
distributed.scheduler - INFO - dashboard at: :8787

接下来,我们需要在 MachineB 和 MachineC 上启动一个工作节点进程。

$ dask-worker tcp://MachineA:8786
distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:51224'
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:51225
distributed.worker - INFO - Listening to: tcp://127.0.0.1:51225
distributed.worker - INFO - dashboard at: 127.0.0.1:51226
distributed.worker - INFO - Waiting to connect to: tcp://MachineA:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 4
distributed.worker - INFO - Memory: 8.00 GB
distributed.worker - INFO - Local Directory: /tmp/worker-h3wfwg7j
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://MachineA:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

如果我们在另外两台机器上启动工作节点,Dask 将自动检测机器上的资源并提供给调度器。在上面的示例中,工作节点检测到了 4 个 CPU 核心和 8GB 内存。因此,我们的调度器总共可以访问 8 个核心和 16GB 内存,它将利用这些资源尽可能快地执行计算图。如果我们在更多机器上添加更多工作节点,那么调度器可用的资源量就会增加,计算时间应该会更快。

注意:虽然调度器机器可能与其他两台机器具有相同的资源,但这些资源不会用于计算。

最后,我们需要从我们的 Python 会话连接到调度器。

from dask.distributed import Client
client = Client("tcp://MachineA:8786")

在 Python 全局命名空间中创建这个 `Client` 对象意味着您执行的任何 Dask 代码都会检测到它,并将计算交给调度器,然后由调度器在工作节点上执行。

访问仪表板

Dask 分布式调度器还有一个可以在网页浏览器中打开的仪表板。正如您在上面的输出中看到的,它的默认位置是在调度器机器的 8787 端口上。因此,您应该能够导航到 http://MachineA:8787。

Dask dashboard

如果您使用 Jupyter Lab 作为 Python 环境,还可以使用 Dask Lab Extension 将仪表板中的单个图表作为窗口在 Jupyter Lab 中打开。

Dask Lab Extension

回顾

在这个最小的示例中,我们在一些机器上安装了 Dask,在一台机器上运行了分布式调度器,在其他机器上运行了工作节点。然后我们从 Python 会话连接到我们的集群,并打开仪表板来监控集群。

我们尚未介绍的是这些机器最初从何而来。在这篇文章的其余部分,我们将讨论人们在实际中运行集群的不同方式,并概述存在哪些工具可以帮助您在各种基础设施上设置 Dask 集群。

集群要求

为了运行 Dask 集群,您必须能够在机器上安装 Dask 并启动调度器和工作节点组件。这些机器需要能够通过网络通信,以便这些组件之间可以互相“交流”。

您还需要能够通过网络从您的 Python 会话访问调度器,以便连接 `Client` 并访问仪表板。

最后,您创建 `Client` 的 Python 会话中的 Python 环境必须与工作节点运行的 Python 环境匹配。这是因为 Dask 使用 cloudpickle 来序列化对象并将它们发送给工作节点并检索结果。因此,两个位置的软件包版本必须一致。

在讨论人们通常希望运行 Dask 的不同平台时,我们需要记住这些要求。

集群类型

我通常看到人们运行两种“类型”的集群:固定集群和临时集群。

固定集群

设置集群的一种常见方法是如上所述运行调度器和工作节点命令,但让它们无限期地运行。为了本文的目的,我将其称为“固定集群”。您可以使用诸如 systemdsupervisord 之类的工具来管理进程,并确保它们始终在机器上运行。然后可以将 Dask 集群视为一项服务。

在这种模式下,一旦集群设置好,人们可以启动他们的 Python 会话,将其客户端连接到这个现有集群,完成一些工作然后再次断开连接。他们稍后可能会回到该集群并运行更多工作。在此期间,集群将处于空闲状态。

在这种模式下,多个用户共享这一个集群也很常见,但不建议这样做,因为 Dask 调度器不单独管理用户或客户端,并且工作将按先来先服务的方式执行。因此,我们建议用户一次使用一个集群。

临时集群

临时集群是指仅在工作期间存在的集群。在这种情况下,用户可以通过 SSH 连接到机器,运行命令设置集群,连接客户端并执行工作,然后断开连接并退出 Dask 进程。一种基本的方法是创建一个调用 ssh 并设置集群的 bash 脚本。您可以在执行工作时在后台运行此脚本,并在完成后将其终止。我们将在后续章节中介绍其他实现方式。

临时集群允许您利用大量机器,并在完成后再次释放它们。当您使用云服务或批处理调度器等系统,并且信用额度有限或按预置资源收费时,这尤其有用。

自适应性

临时集群通常也更容易扩展,因为您很可能有自动启动工作节点的机制。Dask 调度器会估算待处理工作完成所需的时间。如果调度器具有启动和停止工作节点的机制,它将尝试在 5 秒内完成所有待处理工作,从而扩展工作节点。这被称为自适应模式。

启动和停止工作节点的机制通过插件添加。我们即将讨论的许多实现都包含此逻辑。

连接性

Dask 默认使用 TCP 在客户端、调度器和工作节点之间进行通信。这意味着所有这些组件必须位于具有开放路由的 TCP/IP 网络上,以便机器之间可以相互“交流”。许多连接问题源于防火墙或专用网络阻止某些组件之间的连接。一个例子是在像 AWS 这样的云平台上运行 Dask,但同时在咖啡店使用免费 Wi-Fi 在您的笔记本电脑上运行 Python 会话和客户端。您必须确保能够在组件之间路由流量,无论是通过将 Dask 集群暴露到互联网,还是通过 VPN 或隧道将您的笔记本电脑连接到专用网络。

还有一项正在进行的工作是将对 UCX 的支持添加到 Dask,这将使其能够利用 InfiniBand 或 NVLink 网络(如果可用)。

集群管理器

在接下来的部分,我们将介绍 Dask 社区中可用的各种集群管理器实现。

在 Dask 分布式代码库中,有一个 `Cluster` 超类,可以被子类化以构建用于不同平台的各种集群管理器。社区成员利用这一点构建了自己的软件包,这些软件包可以在特定平台上创建 Dask 集群,例如 Kubernetes

这些类的设计理念是,您将集群管理器导入到 Python 会话中并实例化它。该对象随后负责在目标平台上启动调度器和工作节点进程。然后,您可以像往常一样从该集群对象创建一个 `Client` 对象来连接到它。

所有这些集群管理器对象都是临时集群,它们只在 Python 会话期间存在,然后就会被清理。

本地集群

让我们从 Dask 分布式代码库中的 `Cluster` 参考实现之一 `LocalCluster` 开始。

from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)

这个集群管理器在您的本地机器上启动一个调度器,然后为它在机器上找到的每个 CPU 核心启动一个工作节点。

SSH 集群

另一个参考实现是 SSHCluster。这是使用 Dask 分布式处理多台机器最纯粹和简单的方式之一,与本文开头提到的示例非常相似。

from dask.distributed import SSHCluster, Client

cluster = SSHCluster(["MachineA", "MachineB", "MachineC"])
client = Client(cluster)

这里的第一个参数是我们可以通过 SSH 连接并设置 Dask 集群的机器列表。列表中的第一台机器将用作调度器,其余的用作工作节点。

由于调度器可能比工作节点使用的资源少得多,您甚至可能想在本地运行它,并将所有三台远程机器用作工作节点。

cluster = SSHCluster(["localhost", "MachineA", "MachineB", "MachineC"])

SpecCluster

包含在核心 Dask 分布式库中的最后一个实现是 SpecCluster。这实际上是另一个超类,旨在供其他开发者构建集群管理器时进行子类化。然而,它比 `Cluster` 更进一步,要求开发者以 Python 类的形式提供调度器和工作节点的完整规范。还有一个名为 `ProcessInterface` 的超类,用于创建这些调度器和工作节点类。

拥有标准接口意味着为用户提供更一致的体验。我们接下来将介绍的许多集群管理器都使用了 SpecCluster。

Dask Kubernetes

Dask KubernetesKubernetes 提供了一个名为 KubeCluster 的集群管理器。

Kubernetes 提供了高级 API 和抽象概念,用于在机器集群上调度 Linux 容器。它提供了进程、容器、网络、存储等抽象概念,以更好地利用数据中心规模的资源。

作为 Dask 用户,您通常不需要关心集群是如何设置的。但是,如果您由您的组织或机构授予了对 Kubernetes 集群的访问权限,您将需要理解这些概念才能在其上调度您的工作。

KubeCluster 集群管理器将这些概念进一步抽象,使其符合我们熟悉的 Dask 术语。

from dask.distributed import Client
from dask_kubernetes import KubeCluster

cluster = KubeCluster(**cluster_specific_kwargs)
client = Client(cluster)

为了使这段代码工作,您需要配置您的 Kubernetes 凭据,就像 SSH 示例中需要配置您的密钥一样。

您的客户端还需要能够访问 Dask 调度器,并且您可能希望能够在浏览器中打开仪表板。然而,Kubernetes 使用覆盖网络,这意味着分配给调度器和工作节点的 IP 地址仅在集群内部可路由。这对于它们之间的通信没有问题,但意味着您将无法从外部访问。

解决这个问题的一种方法是确保您的 Python 会话也在 Kubernetes 集群内部运行。在 Kubernetes 上设置交互式 Python 环境的一种流行方法是使用 Zero to Jupyter Hub,它使您能够访问在 Kubernetes 集群内部运行的 Jupyter Notebook。

另一种选择是将您的调度器暴露给外部网络。您可以通过暴露与调度器关联的 Kubernetes Service 对象或通过为您的 Kubernetes 集群设置和配置 Ingress 组件来实现。这两种选项都需要一定的 Kubernetes 知识。

Dask Helm chart

在 Kubernetes 集群上运行 Dask 的另一种选择是使用 Dask Helm Chart

这是一个固定集群设置的示例。Helm 是一种在 Kubernetes 集群上安装特定资源的方式,类似于 apt 或 yum 等软件包管理器。Dask Helm chart 包括一个 Jupyter Notebook、一个 Dask 调度器和三个 Dask 工作节点。工作节点可以通过与 Kubernetes API 交互手动扩展,但不能由 Dask 调度器本身进行自适应扩展。

这感觉与我们目前看到的方法不同。它为您提供了一个始终可用的 Dask 集群,以及一个用于从集群驱动的 Jupyter Notebook。然后,您必须将您的工作带到集群的 Jupyter 会话中,而不是从您现有的工作环境生成一个集群。

这种方法的一个好处是,因为 Jupyter Notebook 已作为集群的一部分设置好,所以它已经安装了 Lab Extension,并且也预配置了 Dask 集群的位置。因此,与之前需要向 `Client` 提供调度器地址或 `Cluster` 对象的示例不同,在这种情况下,它将从 Helm chart 设置的环境变量中自动检测集群。

from dask.distributed import Client

client = Client() # The address is loaded from an environment variable

注意: 如果在其他未配置调度器位置的情况下不带任何参数调用 `Client`,它将自动创建一个 LocalCluster 对象并使用它。

Dask Jobqueue

Dask Jobqueue 是一套面向 HPC 用户的集群管理器。

当您作为研究人员或学者并拥有 HPC 或超级计算机的访问权限时,您可能需要通过某种作业队列系统向该机器提交工作。这通常以 bash 脚本的形式出现,其中包含有关您在该机器上需要多少资源以及要运行的命令的元数据。

Dask Jobqueue 包含用于 PBSSlurmSGE 的集群管理器对象。创建这些集群管理器时,它们将根据您的参数构建用于批处理调度器的脚本,并使用您的默认凭据提交它们。

from dask.distributed import Client
from dask_jobqueue import PBSCluster

cluster = PBSCluster(**cluster_specific_kwargs)
client = Client(cluster)

由于像这样的批处理系统通常有较长的等待时间,您可能无法立即访问您的集群对象,并且扩展可能会很慢。根据排队策略,最好将其视为固定大小的集群。但是,如果您有一个响应迅速的交互式队列,那么您可以像使用任何其他自动缩放集群管理器一样使用它。

同样,您的 Python 会话需要能够连接到调度器的 IP 地址。这可能取决于您的 HPC 中心设置,以及如何确保这一点。

Dask Yarn

Dask Yarn 是传统 Hadoop 系统的集群管理器。

Hadoop 是一个框架,允许使用简单的编程模型在计算机集群上分布式处理大型数据集。它是 Java/Scala 生态系统中用于处理大量数据的常见基础设施。但是,您也可以使用称为 YARN 的调度功能来调度 Dask 工作节点并利用底层硬件资源。

from dask.distributed import Client
from dask_yarn import YarnCluster

cluster = YarnCluster(**cluster_specific_kwargs)
client = Client(cluster)

Dask Yarn 仅适用于从 Hadoop 边缘节点使用,该节点将能够访问 Hadoop 集群的内部网络。

Dask Cloudprovider

Dask Cloudprovider 是一组用于利用云原生 API 的集群管理器。

云提供商,例如 亚马逊微软谷歌,提供了许多可用的 API,用于构建和运行各种类型的基础设施。这些范围从运行 Linux 或 Windows 的传统虚拟服务器,到可以按需执行小段代码的更高级 API。它们拥有批处理系统、Hadoop 系统、机器学习系统等等。

在云提供商上运行 Dask 的理想场景是一种服务,它允许您在指定的 Python 环境中运行调度器和工作节点,然后从外部安全地连接到它们。这样的服务尚不存在,但程度不同的类似服务确实存在。

一个例子是 AWS Fargate,这是一个托管容器平台。您可以按需运行Docker 容器,每个容器都有一个唯一的 IP 地址,可以是公共的或私有的。这意味着我们可以在 Dask 容器中运行 Dask 调度器和工作节点进程,并从我们的 Python 会话连接到它们。这项服务按请求资源的每秒计费,因此作为一种临时服务最有意义,不用时没有成本。

from dask.distributed import Client
from dask_cloudprovider import FargateCluster

cluster = FargateCluster(**cluster_specific_kwargs)
client = Client(cluster)

这个集群管理器使用您的AWS 凭据在 Fargate 上进行身份验证并请求 AWS 资源,然后将您的本地会话连接到在云上运行的 Dask 集群。

还有更高级的服务,例如 AWS LambdaGoogle Cloud Functions,它们允许您按需执行代码,并根据代码的执行时间计费。这些被称为“无服务器”服务,因为服务器完全被抽象掉了。这对于我们的 Dask 集群来说是完美的,因为您可以将调度器和工作节点作为要运行的代码提交。然而,运行这些云函数时,无法在它们之间建立网络连接,因为它们没有可路由的 IP 地址,因此无法构建由这些执行函数组成的 Dask 集群。也许有一天吧!

Dask Gateway

Dask Gateway 是一个用于管理 Dask 集群的中心服务。它提供了一个安全的 API,多个用户可以通过该 API 请求 Dask 服务器。它可以在 Kubernetes、Yarn 或批处理系统上生成 Dask 集群。

这个工具面向希望使其用户能够创建 Dask 集群,但又希望保持一定集中控制,而不是让每个用户自行创建的 IT 管理员。这对于跟踪 Dask 使用情况和设置每个用户的限制也很有用。

from dask.distributed import Client
from dask_gateway import GatewayCluster

cluster = GatewayCluster(**cluster_specific_kwargs)
client = Client(cluster)

对于每个用户,创建和使用网关集群的命令是相同的。管理员需要设置和管理网关服务器,并配置通过 Kerberos 或 Jupyter Hub 进行身份验证。他们还应该向用户提供配置信息,以便 Dask Gateway 知道如何连接到网关服务器。在大型组织或机构中,IT 部门也可能会配置员工使用的机器,因此应该能够将配置文件放到用户的计算机上。

本地 CUDA 集群

我要介绍的最后一个集群管理器是来自 Dask CUDA 包的 LocalCUDACluster。

这与其他集群管理器略有不同,因为它构建的 Dask 集群专门针对单个硬件进行优化。在这种情况下,它针对的是具有 GPU 的机器,从带有板载 NVIDIA GPU 的笔记本电脑到数据中心中运行多个 GPU 的 NVIDIA DGX-2

这个集群管理器与 LocalCluster 非常相似,因为它在当前机器上本地创建资源,但不是为每个 CPU 核心创建一个工作节点,而是为每个 GPU 创建一个。它还更改了一些配置默认值,以确保 GPU 工作负载具有良好的性能。

from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(**cluster_specific_kwargs)
client = Client(cluster)

这个包还有一个替代的 Dask 工作节点 bash 命令 `dask-cuda-worker`,它也修改了 Dask 工作节点的默认设置,以确保针对 GPU 工作进行优化。

未来

现在我们已经概述了 Dask 分布式集群生态系统的现状,接下来让我们讨论未来的发展方向。

如开头所示,Dask 集群是调度器、工作节点和客户端的组合,它们支持分布式执行 Python 函数。在自己的机器上设置自己的集群非常简单,但是配置基础设施的方法多种多样,因此我们现在有多种自动化方法。

这种多样性引发了关于如何改进事物的一些问题。

我们需要更多固定集群选项吗?

在介绍各种集群管理器时,我们只介绍了一种固定集群实现,即 Helm chart。是否需要更多固定集群?示例可能包括遵循 Helm chart 相同结构的 CloudFormationTerraform 模板,提供 Jupyter 服务、Dask 调度器和固定数量的工作节点。

我们可以弥合一些差距吗?

Dask Kubernetes 集群管理器是否可以连接到使用 Helm chart 构建的现有集群,然后执行自适应扩展?我经常被问到这个问题,但目前尚不清楚如何实现这一点。集群管理器和 Helm chart 使用不同的 Kubernetes 资源来实现相同的目标,因此在此之前需要进行一些统一。

临时集群是否过于临时?

许多集群管理器只在 Python 会话期间存在。然而,有些(如 YarnCluster)允许您断开连接并重新连接到集群。这使您可以将 YARN 集群更像固定集群来对待。

在其他情况下,Python 会话可能有超时或限制,并在 Dask 集群完成工作之前被终止。让 Dask 集群继续存在是否有好处?随着 Python 会话的清理,客户端和 Future 也会被垃圾回收。所以也许没有。

我们可以更好地管理 conda 环境吗?

目前,创建集群的人有责任确保工作节点的 conda 环境与将创建 `Client` 的环境匹配。在固定集群上,这更容易,因为可以在同一套系统中提供 Python/Jupyter 环境。然而,在临时集群中,您可能正在访问云或批处理系统,它们可能与您的笔记本电脑环境不匹配。

也许可以在工作节点和 conda 之间进行集成,以动态创建环境。探索这带来的性能影响会很有趣。

另一种选择是允许用户在工作节点上启动一个远程 Jupyter 内核。他们将无法访问相同的文件系统,但会共享一个 conda 环境。