在这篇文章中,我们将深入探讨 Dask 如何计算 groupby 聚合。这些是 ETL 和分析中常用的操作,我们将数据分成组,独立地对每个组应用一个函数,然后将结果重新组合起来。在 PyData/R 领域,这通常被称为 分割-应用-合并(split-apply-combine)策略(由 Hadley Wickham 首次提出),并广泛应用于 Pandas 生态系统。
图片来源:swcarpentry.github.io
Dask 借鉴了这个想法,并使用了另一个同样容易记的名字:应用-连接-应用(apply-concat-apply),简称 aca。在这里,我们将探讨 aca 策略在简单和复杂操作中的应用。
首先,回想一下,Dask DataFrame 是 集合 的 DataFrame 对象集合(例如,每个 分区 的 Dask DataFrame 都是一个 Pandas DataFrame)。例如,假设我们有以下 Pandas DataFrame
>>> import pandas as pd
>>> df = pd.DataFrame(dict(a=[1, 1, 2, 3, 3, 1, 1, 2, 3, 3, 99, 10, 1],
... b=[1, 3, 10, 3, 2, 1, 3, 10, 3, 3, 12, 0, 9],
... c=[2, 4, 5, 2, 3, 5, 2, 3, 9, 2, 44, 33, 2]))
>>> df
a b c
0 1 1 2
1 1 3 4
2 2 10 5
3 3 3 2
4 3 2 3
5 1 1 5
6 1 3 2
7 2 10 3
8 3 3 9
9 3 3 2
10 99 12 44
11 10 0 33
12 1 9 2
要从这些数据创建包含三个分区的 Dask DataFrame,我们可以将 df 在索引 (0, 4)、(5, 9) 和 (10, 12) 之间进行分区。我们可以使用 Dask 的 from_pandas 函数并设置 npartitions=3 来执行此分区操作
>>> import dask.dataframe as dd
>>> ddf = dd.from_pandas(df, npartitions=3)
这 3 个分区就是 3 个独立的 Pandas DataFrame
>>> ddf.partitions[0].compute()
a b c
0 1 1 2
1 1 3 4
2 2 10 5
3 3 3 2
4 3 2 3
当 Dask 将函数和/或算法(例如 sum, mean 等)应用于 Dask DataFrame 时,它会独立地将该操作应用于所有组成的分区,将输出收集(或连接)成中间结果,然后再次将该操作应用于中间结果以产生最终结果。在内部,Dask 将相同的应用-连接-应用方法重复用于许多内部 DataFrame 计算。
让我们通过 aca 过程的每个步骤来分解 Dask 如何计算 ddf.groupby(['a', 'b']).c.sum()。我们将首先把我们的 df Pandas DataFrame 分割成三个分区
>>> df_1 = df[:5]
>>> df_2 = df[5:10]
>>> df_3 = df[-3:]
接下来,我们在三个分区上分别执行相同的 groupby(['a', 'b']).c.sum() 操作
>>> sr1 = df_1.groupby(['a', 'b']).c.sum()
>>> sr2 = df_2.groupby(['a', 'b']).c.sum()
>>> sr3 = df_3.groupby(['a', 'b']).c.sum()
这些操作都产生一个带有 MultiIndex 的 Series
>>> sr1
a b
1 1 2
3 4
2 10 5
3 2 3
3 2
Name: c, dtype: int64
>>> sr2
a b
1 1 5
3 2
2 10 3
3 3 11
Name: c, dtype: int64
>>> sr3
a b
1 9 2
10 0 33
99 12 44
Name: c, dtype: int64
第一次应用之后,下一步是连接中间结果 sr1、sr2 和 sr3。使用 Pandas concat 函数完成此操作相当简单
>>> sr_concat = pd.concat([sr1, sr2, sr3])
>>> sr_concat
a b
1 1 2
3 4
2 10 5
3 2 3
3 2
1 1 5
3 2
2 10 3
3 3 11
1 9 2
10 0 33
99 12 44
Name: c, dtype: int64
我们的最后一步是在连接后的 sr_concat Series 上再次应用相同的 groupby(['a', 'b']).c.sum() 操作。但是我们不再有列 a 和 b,那么应该如何继续呢?
稍微放大一点来看,我们的目标是将索引相同的列中的值相加。例如,有两个索引为 (1, 1) 的行,对应的值是:2, 5。那么如何按相同值的索引进行 groupby 呢?MultiIndex 使用 levels 来定义给定索引处的值。Dask 确定 并 使用这些 levels 在应用-连接-应用计算的最终应用步骤中。在我们的例子中,level 是 [0, 1],也就是说,我们想要第 0 层和第 1 层的索引,如果我们同时按 0 和 1 进行分组,我们将有效地将相同的索引分组在一起
>>> total = sr_concat.groupby(level=[0, 1]).sum()
>>> total
a b
1 1 7
3 6
9 2
2 10 8
3 2 3
3 13
10 0 33
99 12 44
Name: c, dtype: int64
>>> ddf.groupby(['a', 'b']).c.sum().compute()
a b
1 1 7
3 6
2 10 8
3 2 3
3 13
1 9 2
10 0 33
99 12 44
Name: c, dtype: int64
>>> df.groupby(['a', 'b']).c.sum()
a b
1 1 7
3 6
9 2
2 10 8
3 2 3
3 13
10 0 33
99 12 44
Name: c, dtype: int64
此外,我们可以通过 可视化计算的任务图 来轻松查看此应用-连接-应用计算的步骤
>>> ddf.groupby(['a', 'b']).c.sum().visualize()
sum 是一个相当直接的计算。像 mean 这样更复杂的计算又如何呢?
>>> ddf.groupby(['a', 'b']).c.mean().visualize()
Mean 是一个不直接适合 aca 模型的操作的好例子——连接均值并再次取平均值将产生错误的结果。就像任何计算风格一样:向量化、Map/Reduce 等,我们有时需要创造性地将计算与风格/模式相匹配。对于 aca,我们通常可以将计算分解为组成部分。对于 mean,这将是 sum 和 count
\[\bar{x} = \frac{x_1+x_2+\cdots +x_n}{n}\]
从上面的任务图中,我们可以看到每个分区有两个独立的任务:series-groupby-count-chunk 和 series-groupby-sum-chunk。然后将结果聚合成两个最终节点:series-groupby-count-agg 和 series-groupby-sum-agg,然后我们最终计算 mean:total sum / total count。