提交新活动

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。

提交新闻报道

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。

订阅新闻邮件

谢谢!您的提交已收到!
哎呀!提交表单时出现问题。
2021年3月11日

使用 dask-memusage 衡量 Dask 内存使用量

作者

在云端扩展时,使用过多的计算资源可能会变得昂贵。

举一个真实的例子,我当时正在为一个空间基因测序设备开发图像处理流水线,该设备不仅可以报告哪些基因正在表达,还可以报告它们在细胞三维体积中的位置。为了获取这些信息,一台专用显微镜拍摄了细胞培养物或组织的快照,并将由此产生的数据通过一个 Dask 流水线进行处理。

该流水线相当慢,所以我粗略计算了一下,一旦我们开始为客户处理更多数据,我们的计算成本会是多少。结果发现,我们收入的 70% 将用于支付云计算费用!

显然,我需要优化这段代码。

当我们考虑大规模计算中的瓶颈时,我们通常会关注 CPU:我们希望使用更多的 CPU 核以获得更快的结果。正如本例所示,为所有这些 CPU 付费可能很昂贵,而我确实成功地大幅降低了 CPU 使用率。

但高内存使用率也是一个问题,解决这个问题促使我构建了一系列工具,这些工具也可以帮助您优化和减少 Dask 内存使用量。

在本文的其余部分,您将了解

问题:固定的处理块和高内存/CPU 比率

回顾一下,我当时正在处理一个 Dask 流水线,用于处理来自专用显微镜的数据。产生的数据量相当大,并且必须将某些图像子集作为一个单元一起处理。从计算的角度来看,我们实际上有一系列输入 X0, X1, X2, …,它们可以由函数 f() 独立处理。

f() 的内部处理无法轻易进一步并行化。从 CPU 调度角度来看,这没问题,考虑到大量的 X 输入,它仍然是一个非常容易并行的任务。

例如,如果我配置一台拥有 4 个 CPU 核的虚拟机,为了处理数据,我可以启动四个进程,每个进程都会占用一个核的最大能力。如果我有 12 个输入,并且每个处理步骤花费的时间大致相同,它们可能会按以下方式运行

  • CPU0: f(X0), f(X4), f(X8)
  • CPU1: f(X1), f(X5), f(X9)
  • CPU2: f(X2), f(X6), f(X10)
  • CPU3: f(X3), f(X7), f(X11)

如果我能让 f() 更快,整个流水线也会运行得更快。

然而,CPU 并不是计算中使用的唯一资源:RAM 也可能是瓶颈。例如,假设每次调用 f(Xi) 需要 12GB RAM。这意味着为了充分利用 4 个 CPU,我将需要 48GB RAM——但如果我的电脑只有 16GB RAM 怎么办?

即使我的电脑有 4 个 CPU,在只有 16GB RAM 的电脑上,我只能利用一个 CPU,因为我没有足够的 RAM 来并行运行多个任务。 实际上,这些任务在云端运行,在那里我可以通过选择合适的预配置 VM 实例来确保所需的 RAM/核比率得到保持。在某些云平台上,您可以自由设置为每个启动的虚拟机分配的 RAM 容量和 CPU 核数量。

然而,我并不清楚峰值内存使用量是多少,所以我不得不限制并行度以减少内存不足错误。结果是,我们使用的默认虚拟机有一半的 CPU 处于空闲状态,我们为这些资源付了费却未使用。

为了适当地配置硬件并充分利用所有 CPU,我需要知道每个任务使用了多少峰值内存。为了做到这一点,我创建了一个新工具。

使用 dask-memusage 衡量任务峰值内存使用量

dask-memusage 是一个用于衡量 Dask 执行图中每个任务峰值内存使用量的工具。

  • 按任务是因为 Dask 将代码作为任务图执行,而任务图决定了可以使用多少并行度。
  • 峰值内存很重要,因为它是瓶颈。如果图中有两个并行任务同时需要 12GB,即使每个任务的平均内存使用量只有 4GB,如果您想在同一台计算机上运行这两个任务,您仍然需要 24GB RAM。

使用 dask-memusage

由于基因测序代码是专有的且相当复杂,我们来使用一个不同的例子。我们将统计一些文本文件中单词的出现次数,然后报告每个文件中出现次数最多的前 10 个单词。您可以想象稍后合并数据,但在这个简单的例子中我们不会处理这部分。

import sys
import gc
from time import sleep
from pathlib import Path
from dask.bag import from_sequence
from collections import Counter
from dask.distributed import Client, LocalCluster
import dask_memusage


def calculate_top_10(file_path: Path)
gc.collect() # See notes below

# Load the file
with open(file_path) as f
data = f.read()

# Count the words
counts = Counter()
for word in data.split()
counts[word.strip(".,'\"").lower()] += 1

# Choose the top 10
by_count = sorted(counts.items(), key=lambda x: x[1])
sleep(0.1) # See notes below
return (file_path.name, by_count[-10:])


def main(directory)
# Setup the calculation

# Create a 4-process cluster (running locally). Note only one thread
# per-worker: because polling is per-process, you can't run multiple
# threads per worker, otherwise you'll get results that combine memory
# usage of multiple tasks.
cluster = LocalCluster(n_workers=4, threads_per_worker=1,
memory_limit=None)
# Install dask-memusage
dask_memusage.install(cluster.scheduler, "memusage.csv")
client = Client(cluster)

# Create the task graph
files = from_sequence(Path(directory).iterdir())
graph = files.map(calculate_top_10)
graph.visualize(filename="example2.png", rankdir="TD")

# Run the calculations
for result in graph.compute()
print(result)
# ... do something with results ...


if __name__ == '__main__'
main(sys.argv[1])

这是任务图的样子

并行度很高!

我们可以对一些文件运行程序

$ pip install dask[bag] dask_memusage
$ python example2.py files/
('frankenstein.txt', [('that', 1016), ('was', 1021), ('in', 1180), ('a', 1438), ('my', 1751), ('to', 2164), ('i', 2754), ('of', 2761), ('and', 3025), ('the', 4339)])
('pride_and_prejudice.txt', [('she', 1660), ('i', 1730), ('was', 1832), ('in', 1904), ('a', 1981), ('her', 2142), ('and', 3503), ('of', 3705), ('to', 4188), ('the', 4492)])
('greatgatsby.txt', [('that', 564), ('was', 760), ('he', 770), ('in', 849), ('i', 999), ('to', 1197), ('of', 1224), ('a', 1440), ('and', 1565), ('the', 2543)])
('big.txt', [('his', 40032), ('was', 45356), ('that', 47924), ('he', 48276), ('a', 83228), ('in', 86832), ('to', 114184), ('and', 152284), ('of', 159888), ('the', 314908)])

正如预期的那样,最常见的单词是词干,但顺序仍然存在一些差异。

接下来,我们来看看 dask-memusage 的结果。

dask-memusage 输出及其工作原理

您会注意到,实际使用 dask-memusage 除了 import 语句外,只需要额外一行代码

dask_memusage.install(cluster.scheduler, "memusage.csv")

这样做将以 10ms 的间隔轮询进程,获取按任务划分的峰值内存使用量。在本例中,memusage.csv 看起来如下

task_key,min_memory_mb,max_memory_mb
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 3)",51.2421875,51.2421875
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 0)",51.70703125,51.70703125
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 1)",51.28125,51.78515625
"('from_sequence-3637e6ff937ef8488894df60a80f62ed', 2)",51.30859375,51.30859375
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 2)",56.19140625,56.19140625
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 0)",51.70703125,54.26953125
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 1)",52.30078125,52.30078125
"('calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca', 3)",51.48046875,384.00390625

图中的每个任务都会显示最小内存使用量和峰值内存使用量,单位为 MB。

更易读的形式

task_key min_memory_mb max_memory_mb ”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 3)” 51.2421875 51.2421875 ”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 0)” 51.70703125 51.70703125 ”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 1)” 51.28125 51.78515625 ”(‘from_sequence-3637e6ff937ef8488894df60a80f62ed’, 2)” 51.30859375 51.30859375 ”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 2)” 56.19140625 56.19140625 ”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 0)” 51.70703125 54.26953125 ”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 1)” 52.30078125 52.30078125 ”(‘calculate_top_10-afc867e38c3bd0aac8c18bb00d3634ca’, 3)” 51.48046875 384.00390625

最下面的四行是值得关注的;这四行都以约 50MB RAM 的最小内存使用量开始,然后内存可能随代码运行而增加或不增加。增加多少可能取决于文件大小;大多数文件都很小,因此内存使用量变化不大。其中一个文件使用的最大内存比其他文件多得多,为 384MB RAM;这很可能是 25MB 的 big.txt 文件,因为其他文件都小于 1MB。

使用的机制,即轮询进程峰值内存,存在一些限制

  • 您会注意到 calculate_top_10() 的开头有一个 gc.collect();这确保我们不计算尚未清理的先前代码的内存。
  • calculate_top_10() 的末尾还有一个 sleep()。由于使用了轮询,运行过快的任务将无法获得准确信息——轮询大约每 10ms 发生一次,所以您需要至少 sleep 20ms。
  • 最后,由于轮询是按进程进行的,您不能为每个 worker 运行多个线程,否则您将获得合并了多个任务内存使用量的数据。

解释数据

我们学到的是 calculate_top_10() 的内存使用量随文件大小而增长;这可以用来描述工作负载的内存需求特性。也就是说,我们可以创建一个模型,将数据输入大小与所需的 RAM 联系起来,然后我们可以计算任何给定并行级别所需的 RAM。如果假定每个 CPU 核一个任务,这就可以指导我们选择硬件。

回到我最初的那个基因测序流水线问题:使用 dask-memusage 的数据,我能够得出一个公式,说明“对于这种大小的输入,需要这么多内存”。因此,每当我们运行批处理作业时,就可以根据机器上的 CPU 数量和 RAM 尽可能地设置最高的并行度。

尽管这允许了更多的并行性,但仍然不够——处理过程仍然使用了大量的 RAM,这些 RAM 我们要么通过时间(使用较少的 CPU)来支付,要么通过金钱(支付更昂贵的具有更多 RAM 的虚拟机)来支付。所以下一步是减少内存使用。

使用 Fil 减少内存使用量

如果我们查看 word-counting 示例的 dask-memusage 输出,内存使用量似乎相当高:对于一个 25MB 的文件,我们使用了 330MB 的 RAM 来统计单词。思考一下这段代码的理想版本可能如何工作,我们应该能够以少得多的内存处理文件(例如,我们可以重新设计代码,逐行处理文件,从而减少内存)。

这是 dask-memusage 另一个有用的地方:它可以指向需要优化内存使用的特定代码,粒度精确到任务级别。 不过,一个任务可能是一个相当大的代码块,所以下一步是使用一个内存分析器,它可以指向具体的代码行。

在开发基因测序工具时,我使用了 memory_profiler 包,虽然它能工作,我也成功减少了内存使用,但我发现它用起来相当困难。结果发现,对于 Dask 的典型用例——批处理数据——您需要一种不同类型的内存分析器

所以在我离开那份工作后,我创建了一个名为 Fil 的内存分析器,它专门设计用于查找峰值内存使用量。与可以在生产工作负载上运行的 dask-memusage 不同,Fil 会减慢您的执行速度,并且目前还有其他我正在努力解决的限制(截至 2021 年 3 月,它不支持多进程),因此目前最好用于手动分析。

我们可以编写一个只在 big.txt 上运行的小脚本

from pathlib import Path
from example2 import calculate_top_10

calculate_top_10(Path("files/big.txt"))

在 Fil 下运行它

pip install filprofiler
fil-profile run example3.py

结果向我们展示了大部分内存分配在哪里

读取文件占用 8% 的内存,但 data.split() 占用了 84% 的内存。也许我们不应该将整个文件加载到内存中并将其分割成单词,而应该逐行处理文件。如果这是实际的代码,下一步很好的做法是修改 calculate_top_10() 的实现方式。

后续步骤

如果您的 Dask 工作负载占用了过多内存,您应该怎么做?

如果您正在使用 Distributed 后端运行 Dask 工作负载,并且可以接受每个 worker 只有一个线程,那么运行 dask-memusage 将为您提供生产工作负载上实际的每个任务内存使用情况。然后,您可以使用获得的信息采取多种措施

在我最初的使用案例中,即基因测序流水线,我能够通过结合降低内存使用量和降低 CPU 使用率来将成本降低到一个更合理的水平。在进行研发时,我能够在相同的硬件成本下获得更快的结果。

您可以在此了解更多关于 dask-memusage 的信息,并在此了解更多关于 Fil 内存分析器的信息