这项工作由 Anaconda Inc 支持
Dask DataFrame 与 pandas 新的 Extension Array(扩展数组)接口配合良好,包括第三方扩展数组。这使得 Dask 可以
Pandas 0.23 引入了 ExtensionArray(扩展数组),这是一种在 DataFrame 或 Series 中存储非简单 NumPy 数组的方式。在内部,pandas 将其用于 NumPy 本身无法原生处理的数据类型,例如带时区的时间日期、Categorical 或(新的!)可空整数数组。
>>> s = pd.Series(pd.date_range('2000', periods=4, tz="US/Central"))
>>> s
0 2000-01-01 00:00:00-06:00
1 2000-01-02 00:00:00-06:00
2 2000-01-03 00:00:00-06:00
3 2000-01-04 00:00:00-06:00
dtype: datetime64[ns, US/Central]
dask.dataframe 一直支持 pandas 定义的扩展类型。
>>> import dask.dataframe as dd
>>> dd.from_pandas(s, npartitions=2)
Dask Series 结构
npartitions=2
0 datetime64[ns, US/Central]
2 ...
3 ...
dtype: datetime64[ns, US/Central]
Dask 名称:from_pandas,2 个任务
较新版本的 pandas 允许第三方库编写自定义扩展数组。这些数组可以放在 DataFrame 或 Series 中,并且与 pandas 自身定义的任何扩展数组一样工作良好。然而,第三方扩展数组对 Dask 来说是一个小小的挑战。
回顾:dask.dataframe 是惰性的。我们使用熟悉的类似 pandas 的 API 来构建任务图,而不是立即执行。但如果 Dask DataFrame 是惰性的,那么以下这些操作是如何工作的呢?
>>> df = pd.DataFrame({"A": [1, 2], 'B': [3, 4]})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf[['B']].columns
Index(['B'], dtype='object')
ddf[['B']](惰性地)从 DataFrame 中选择列 'B'。但访问 .columns 会立即返回一个只包含所选列的 pandas Index 对象。
没有发生实际的计算(你可以轻易地将 from_pandas 替换为对一个超出内存大小的数据集执行 dd.read_parquet,行为将是相同的)。Dask 能够进行这类“仅元数据”的计算,其输出仅依赖于列和 dtypes,而无需执行任务图。在内部,Dask 通过在每个 Dask DataFrame 上保留一对虚拟 pandas DataFrame 来实现这一点。
>>> ddf._meta
空 DataFrame
列:[A, B]
索引:[]
>>> ddf._meta_nonempty
ddf._meta_nonempty
A B
0 1 1
1 1 1
我们需要 _meta_nonempty,因为 pandas 中的一些操作在 Empty DataFrame 上的行为与在非空 DataFrame 上的行为不同(无论是设计如此,还是偶尔出现的 pandas 中的 bug)。
第三方扩展数组的问题在于,Dask 不知道 _meta_nonempty 中应该放入什么值。对于每个 NumPy dtype 和 pandas 自身的每个扩展 dtype,我们很乐意这样做。但任何第三方库都可以为任何类型创建 ExtensionArray,Dask 将无法知道对其而言什么是有效值。
与其让 Dask 猜测 _meta_nonempty 应该使用什么值,扩展数组的作者(或用户)可以将他们的扩展 dtype 注册到 Dask。一旦注册,Dask 就能够生成 _meta_nonempty,并且从那里开始一切应该都能正常工作。例如,我们可以将 pandas 用于测试的虚拟 DecimalArray(这不是 pandas 公共 API 的一部分)注册到 Dask。
from decimal import Decimal
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype
# 第三方库中将进行的实际注册
from dask.dataframe.extensions import make_array_nonempty
@make_array_nonempty.register(DecimalDtype)
def _(dtype)
return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
dtype=dtype)
现在,该扩展类型的用户可以将这些数组放入 Dask DataFrame 或 Series 中。
>>> df = pd.DataFrame({"A": DecimalArray([Decimal('1.0'), Decimal('2.0'),
... Decimal('3.0')])})
>>> ddf = dd.from_pandas(df, 2)
>>> ddf
Dask DataFrame 结构
A
npartitions=1
0 decimal
2 ...
Dask 名称:from_pandas,1 个任务
>>> ddf.dtypes
A decimal
dtype: object
从那里开始,就可以像在 pandas 中一样进行常规操作了。
>>> from random import choices
>>> df = pd.DataFrame({"A": DecimalArray(choices([Decimal('1.0'),
... Decimal('2.0')],
... k=100)),
... "B": np.random.choice([0, 1, 2, 3], size=(100,))})
>>> ddf = dd.from_pandas(df, 2)
In [35]: ddf.groupby("A").B.mean().compute()
Out[35]
A
1.0 1.50
2.0 1.48
Name: B, dtype: float64
Dask 现在支持扩展数组真是太棒了。但对我来说,最令人兴奋的是这项工作所需的努力之少。实现对第三方扩展数组支持的PR相当短,只需要定义第三方可以注册的对象,并在检测到 dtype 时使用它来生成数据。支持 pandas 0.24.0 中的三个新扩展数组(IntegerArray、PeriodArray 和 IntervalArray)只需要几行代码
@make_array_nonempty.register(pd.Interval)
def _(dtype)
return IntervalArray.from_breaks([0, 1, 2], closed=dtype.closed)
@make_array_nonempty.register(pd.Period)
def _(dtype)
return period_array([2000, 2001], freq=dtype.freq)
@make_array_nonempty.register(_IntegerDtype)
def _(dtype)
return integer_array([0, None], dtype=dtype)
Dask 直接受益于 pandas 所做的改进。Dask 不需要构建新的并行扩展数组接口,也无需使用并行接口重新实现所有新的扩展数组。我们只是重复使用了 pandas 已有的东西,并且它完美地融入了现有的 Dask 结构。
对于第三方扩展数组的作者,例如 cyberpandas,所需的工作同样微乎其微。他们无需从头开始重新实现一切,只需与 Dask 良好配合即可。
这突显了 Dask 项目核心价值观之一的重要性:与社区合作。如果您访问 dask.org,您会看到类似以下短语:
与现有项目集成
以及
与更广泛的社区共同构建
在 Dask 的初期,开发者本来可以从头开始重写 pandas 或 NumPy 以使其更适合并行(尽管我们今天可能还在处理那部分,因为那是一项巨大的工程)。相反,Dask 开发者选择与社区合作,偶尔向有助于 Dask 的方向推动。例如,pandas 的许多地方持有 GIL(Global Interpreter Lock,全局解释器锁),阻止了基于线程的并行。Dask 和 pandas 的开发者没有放弃 pandas,而是在 GIL 成为 dask.dataframe 的瓶颈时,共同努力在可能的地方释放 GIL。这不仅使 Dask 受益,也使其他任何尝试使用 pandas DataFrame 进行基于线程的并行处理的人受益。
现在,当 pandas 引入可空整数等新特性时,dask.dataframe 只需将其注册为扩展类型即可立即从中受益。第三方扩展数组的作者也可以为他们的扩展数组这样做。
如果您正在编写 ExtensionArray,请务必将其添加到pandas 生态系统页面,并将其注册到 Dask!