这项工作是气象局(Met Office)与欧洲天气云(European Weather Cloud)之间的合作项目,欧洲天气云是欧洲中期天气预报中心(ECMWF)和欧洲气象卫星开发组织(EUMETSAT)的合作伙伴关系。
我们设计了一种技术,用于创建 Dask 集群,其中工作节点托管在不同的数据中心,通过一个网状 VPN 连接,允许调度器和工作节点通信和交换结果。
对 Dask 资源的创新性(滥)用使我们能够在最靠近源数据的数据中心的工作节点上运行数据处理任务,从而最大限度地减少数据中心之间的通信。如果与 zarr 结合使用,以在对象存储中访问巨大的超立方体数据集,我们相信该技术可以在云中实现数据就近分布式计算的潜力。
英国气象局一直在与欧洲天气云合作研究数据就近计算。我们将 Dask 确定为一项关键技术,但现有 Dask 技术专注于单个数据中心内的并行计算,我们希望将其扩展到跨数据中心的计算。这样,当所需数据托管在多个位置时,任务可以在数据所在的位置运行,而不是复制数据。
Dask 工作节点在调度器的协调下通过网络交换数据块。这假设所有节点都可以自由通信,但这通常不适用于跨数据中心的情况,因为存在防火墙、NAT 等问题,因此真正的分布式方法必须解决这个问题。此外,它还必须高效地管理数据传输,因为在数据中心之间移动数据块比在同一云中的工作节点之间移动数据块成本高得多。
这个笔记本记录了一个正在运行的概念验证,它解决了这些问题。它在3个位置运行计算
其思想是,访问某个位置可用数据的任务应该在该位置运行。同时,计算可以在其他地方定义、调用和呈现结果。所有这些对计算如何完成的提示极少。
首先是一些导入和便利功能
在这种情况下,我们使用 10.8.0.0/24 上的控制平面 IPv4 WireGuard 网络来设置集群——这不是必需的,但简化了这个概念验证。WireGuard 对等点已在 ECMWF 和 EUMETSAT 机器上运行,但我们必须在这里启动一个
4: mo-aws-ec2: <POINTOPOINT,NOARP,UP,LOWER_UP> mtu 8921 qdisc noqueue state UNKNOWN group default qlen 1000
link/none
inet 10.8.0.3/24 scope global mo-aws-ec2
valid_lft forever preferred_lft forever
我们在 ECMWF 和 EUMETSAT 都配置了工作机器,每个地点一台。它们在控制平面网络上可访问,地址如下:
环境变量: ECMWF_HOST=10.8.0.4
环境变量: EUMETSAT_HOST=10.8.0.2
这台机器需要通过网络访问数据文件以读取 NetCDF 元数据。工作节点通过 NFS 共享数据,所以我们在此处挂载它们。(在这个概念验证中,控制平面网络用于 NFS,但数据平面网络同样可以使用,或者使用更合适的技术,例如访问对象存储的zarr。)
%%bash
./data-reset.sh
/data/ecmwf
/data/eumetsat
:/data/ecmwf /data/ecmwf
:/eumetsatdata/ /data/eumetsat
为了演示,我们有两个要处理的大数据文件。在 ECMWF,我们在 /data/ecmwf/000490262cdd067721a34112963bcaa2b44860ab.nc 中有预测数据。在 ECMWF 运行的工作节点可以看到该文件
/data/
└── ecmwf
└── 000490262cdd067721a34112963bcaa2b44860ab.nc
1 目录, 1 文件
并且由于该目录通过 NFS 挂载在此处,所以这台计算机也可以看到
/data/ecmwf
└── 000490262cdd067721a34112963bcaa2b44860ab.nc
0 目录, 1 文件
这是个大文件
总计 2.8G
-rw-rw-r-- 1 ec2-user ec2-user 2.8G Mar 25 13:09 000490262cdd067721a34112963bcaa2b44860ab.nc
在 EUMETSAT,我们有 observations.nc
/data/eumetsat/ad-hoc
└── observations.nc
0 目录, 1 文件
在此计算机上同样可见
-rw-rw-r-- 1 613600004 613600004 4.8M May 20 10:57 /data/eumetsat/ad-hoc/observations.nc
至关重要的是,ECMWF 数据在 EUMETSAT 数据中心不可见,反之亦然。
我们想将预测数据与观测数据进行比较。
我们可以使用 xarray 打开预测文件
<xarray.Dataset>
维度: (realization: 18, height: 33, latitude: 960,
longitude: 1280, bnds: 2)
坐标
* realization (realization) int32 0 18 19 20 21 ... 31 32 33 34
* height (height) float32 5.0 10.0 20.0 ... 5.5e+03 6e+03
* latitude (latitude) float32 -89.91 -89.72 ... 89.72 89.91
* longitude (longitude) float32 -179.9 -179.6 ... 179.6 179.9
forecast_period timedelta64[ns] 1 days 18:00:00
forecast_reference_time datetime64[ns] 2021-11-07T06:00:00
time datetime64[ns] 2021-11-09
没有坐标的维度: bnds
数据变量
air_pressure (realization, height, latitude, longitude) float32 dask.array<chunksize=(18, 33, 192, 160), meta=np.ndarray>
latitude_longitude int32 -2147483647
latitude_bnds (latitude, bnds) float32 dask.array<chunksize=(960, 2), meta=np.ndarray>
longitude_bnds (longitude, bnds) float32 dask.array<chunksize=(1280, 2), meta=np.ndarray>
属性
history: 2021-11-07T10:27:38Z: StaGE Decoupler
institution: Met Office
least_significant_digit: 1
mosg__forecast_run_duration: PT198H
mosg__grid_domain: global
mosg__grid_type: standard
mosg__grid_version: 1.6.0
mosg__model_configuration: gl_ens
source: Met Office Unified Model
title: MOGREPS-G Model Forecast on Global 20 km St...
um_version: 11.5
Conventions: CF-1.7
在这台机器上运行的 Dask 代码通过 NFS 读取了文件的元数据,但尚未读入数据数组本身。
同样,我们可以看到观测数据,因此我们可以在本地执行计算。在这里,我们将预测数据按实现(realisations)取平均值,然后将它们与特定高度的观测数据进行比较。(这是一个故意低效的计算,因为我们本可以在只需要的高度上取平均值,但你明白这个意思。)
CPU times: user 10 µs, sys: 2 µs, total: 12 µs
Wall time: 13.8 µs
当我们取消注释 scope() 并实际运行它时,完成需要超过 14 分钟!通过 NFS 在数据中心之间访问数据(我们在 AWS 中运行此笔记本)实在太慢了。
事实上,仅仅将数据文件复制到运行此笔记本的计算机上也需要类似的时间。至少 2.8 GiB + 4.8 MiB 的数据必须从数据中心传输到这台机器才能执行计算。
相反,我们显然应该在数据所在的位置运行 Dask 任务。我们可以在 Dask 集群上这样做。
集群用一个命令启动。但这需要一些时间
[#] ip link add dasklocal type wireguard
[#] wg setconf dasklocal /dev/fd/63
[#] ip -6 address add fda5:c0ff:eeee:0::1/64 dev dasklocal
[#] ip link set mtu 1420 up dev dasklocal
[#] ip -6 route add fda5:c0ff:eeee:2::/64 dev dasklocal
[#] ip -6 route add fda5:c0ff:eeee:1::/64 dev dasklocal
2022-06-29 14:46:57,237 - distributed.scheduler - INFO - -----------------------------------------------
2022-06-29 14:46:58,602 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-06-29 14:46:58,643 - distributed.scheduler - INFO - -----------------------------------------------
2022-06-29 14:46:58,644 - distributed.scheduler - INFO - Clear task state
2022-06-29 14:46:58,646 - distributed.scheduler - INFO - Scheduler at: tcp://172.17.0.2:8786
2022-06-29 14:46:58,646 - distributed.scheduler - INFO - dashboard at: :8787
2022-06-29 14:47:16,104 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:1::11]:37977', name: ecmwf-1-2, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:16,107 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:1::11]:37977
2022-06-29 14:47:16,108 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:16,108 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:1::11]:44575', name: ecmwf-1-3, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:16,109 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:1::11]:44575
2022-06-29 14:47:16,109 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:16,113 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:1::11]:40121', name: ecmwf-1-1, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:16,114 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:1::11]:40121
2022-06-29 14:47:16,114 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:16,119 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:1::11]:40989', name: ecmwf-1-0, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:16,121 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:1::11]:40989
2022-06-29 14:47:16,121 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:23,342 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:2::11]:33423', name: eumetsat-2-0, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:23,343 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:2::11]:33423
2022-06-29 14:47:23,343 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:23,346 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:2::11]:43953', name: eumetsat-2-1, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:23,348 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:2::11]:43953
2022-06-29 14:47:23,348 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:23,350 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:2::11]:46089', name: eumetsat-2-3, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:23,352 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:2::11]:46089
2022-06-29 14:47:23,352 - distributed.core - INFO - Starting established connection
2022-06-29 14:47:23,357 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://[fda5:c0ff:eeee:2::11]:43727', name: eumetsat-2-2, status: undefined, memory: 0, processing: 0>
2022-06-29 14:47:23,358 - distributed.scheduler - INFO - Starting worker compute stream, tcp://[fda5:c0ff:eeee:2::11]:43727
2022-06-29 14:47:23,358 - distributed.core - INFO - Starting established connection
我们需要等待 8 行 distributed.core - INFO - Starting established connection 日志——每台工作机器上的 4 个工作进程各一条。
这里发生的情况是
每个数据中心托管一个控制进程,可通过控制平面网络访问。在调用时
结果是这台计算机上运行调度器的一个容器,通过一个临时的 WireGuard IPv6 数据平面网络与每台工作机器上的一个容器通信,该网络允许每个(在本例中为 8 个)Dask 工作进程彼此以及与调度器通信,即使它们分布在 3 个数据中心。
大概是这样
图例
集群的调度器现在正在这台机器上的 Docker 容器中运行,并在 localhost 上暴露,因此我们可以创建客户端与其通信
2022-06-29 14:47:35,535 - distributed.scheduler - INFO - Receive client connection: Client-69f22f41-f7ba-11ec-a0a2-0acd18a5c05a
2022-06-29 14:47:35,536 - distributed.core - INFO - Starting established connection
/home/ec2-user/miniconda3/envs/jupyter/lib/python3.10/site-packages/distributed/client.py:1287: VersionMismatchWarning: 检测到版本不匹配
+---------+--------+-----------+---------+
| 包 | 客户端 | 调度器 | 工作节点 |
+---------+--------+-----------+---------+
| msgpack | 1.0.4 | 1.0.3 | 1.0.3 |
| numpy | 1.23.0 | 1.22.3 | 1.22.3 |
| pandas | 1.4.3 | 1.4.2 | 1.4.2 |
+---------+--------+-----------+---------+
备注
- msgpack: 版本差异可以接受,只要所有版本都在 0.6 以上
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
如果您点击进入客户端,您应该能在“调度器信息”节点下看到工作节点
# client
您也可以点击进入 http://localhost:8787/status 上的 Dashboard。在那里我们可以看到任务流上的工作节点
现在 Dask 客户端在作用域内,计算将在集群上运行。我们可以定义要运行的任务
但当我们尝试执行计算时,它失败了
"[Errno 2] 没有那个文件或目录: b'/data/eumetsat/ad-hoc/observations.nc'"
它失败是因为 Dask 调度器将一些读取数据的任务发送到了在 EUMETSAT 运行的工作节点。它们看不到 ECMWF 中的数据,我们也不希望它们看到,因为在数据中心之间读取所有数据会太慢。
Dask 有资源(resources)的概念。任务可以被调度到仅在资源(如 GPU 或内存量)可用之处运行。我们可以滥用(abuse)这个机制,通过将数据中心视为一个资源,将任务绑定到特定的数据中心。
为此,当创建工作节点时,我们将其标记为具有 pool-ecmwf 或 pool-eumetsat 资源。然后,当我们想创建只能在一个数据中心运行的任务时,我们将其标注为需要相应的资源
我们可以将这些样板代码隐藏在一个 Python 上下文管理器(context manager)—— pool——内部,并写成
pool 上下文管理器是与 Dask 开发者的合作成果,并发布在 GitHub 上。您可以在Dask Discourse上阅读更多关于该概念演变的内容。
然而,我们可以做得比标注计算任务更好。如果在上下文管理器块内加载数据,数据加载任务将带上标注信息
在这种情况下,我们需要库中的另一个上下文管理器 propagate_pools,以确保在处理和执行任务图时标注信息不会丢失
这两个上下文管理器允许我们用数据的池(pool)来标注数据,从而确定加载任务将在哪里运行
定义一些与数据来源无关的延迟计算
然后执行最终计算
CPU times: user 127 ms, sys: 6.34 ms, total: 133 ms
Wall time: 4.88 s
记住,我们的目标是在数据中心之间分布式计算,同时防止工作节点读取外部的批量数据。
这里我们知道数据只被适当位置的工作节点读取,因为两个数据中心都无法读取对方的数据。一旦数据加载到内存中,Dask 倾向于将任务调度到拥有数据的工作节点上,这样本地工作节点倾向于执行后续计算,并且数据块倾向于保留在它们被读取的数据中心。
然而,通常情况下,如果工作节点空闲,即使它们没有数据,Dask 也会使用它们来执行计算。如果允许,这种工作窃取(work stealing)会导致数据在数据中心之间不必要地移动,这是一项潜在的高成本操作。为了防止这种情况,propagate_pools 上下文管理器安装了一个调度器优化,禁止不同池中的工作节点之间进行工作窃取。
一旦一个池中加载的数据需要与另一个池中的数据结合(例如上面 averaged_predictions.isel(height=10) - observations 中的减法),这就不再被归类为工作窃取,Dask 将根据需要移动数据中心之间的数据。
一次性完成的计算看起来像这样
CPU times: user 234 ms, sys: 27.6 ms, total: 261 ms
Wall time: 6.04 s
在代码方面,与上面的本地版本相比,这只增加了使用 with 块来标记数据和管理执行,执行速度快了大约 100 倍。
这是一个最佳情况,因为在演示中,文件实际上托管在工作机器上,因此在本地读取文件和通过 NFS 读取文件之间的速度差异最大化了。也许更有说服力的是测量的网络流量。
方法耗时测得网络流量
Calculation over NFS> 14 minutes2.8 GiB
分布式计算~ 10 seconds8 MiB
除了一些控制和状态消息外,只有用于绘制可视化图表所需的数据通过网络发送到这台计算机。
查看任务流(task stream),我们可以看到 ECMWF 工作节点(在底部)执行了大部分读取和计算工作,红色传输任务将其与 EUMETSAT 上的数据连接起来。
我们可以进一步简化这段代码。因为数据加载任务已用其资源池标记,这对科学家来说可以是透明的。所以我们可以写成
允许我们忽略数据来自哪里
当然,集群必须在适当的数据中心配置计算资源,尽管经过一些工作,这可以作为目录代码的一部分实现动态化。
此笔记本及其背后的代码已发布在GitHub 存储库中。
有关原型实现的详细信息以及改进建议,请参阅
感谢 Armagan Karatosun(EUMETSAT)和 Vasileios Baousis(ECMWF)为支持这个概念验证提供的基础设施帮助和支持。
Gabe Joseph(Coiled)编写了巧妙的 pool 上下文管理器,Jacob Tomlinson(NVIDIA)审阅了本文档。
英国皇家版权所有 2022