执行摘要
我们正在基于cuDF 和Dask Dataframe 构建一个分布式 GPU Pandas 数据框。这项工作尚处于早期阶段。
本文描述了当前情况、我们的总体方法,并提供了当前可行和不可行之处的示例。最后,我们给出了一些关于性能扩展的说明。
您也可以将本文中的实验视为一个笔记本。
以下是结果表
架构 时间 带宽 单 CPU 核 3分 14秒 50 MB/s 八 CPU 核 58秒 170 MB/s 四十 CPU 核 35秒 285 MB/s 一 GPU 11秒 900 MB/s 八 GPU 5秒 2000 MB/s
构建模块:cuDF 和 Dask
构建一个基于 GPU 的分布式数据框是一项艰巨的任务。幸运的是,我们有一个良好的基础,并且可以从现有组件中组装这个系统的很大一部分。
- cuDF 库旨在 GPU 上实现 Pandas API。它在读取 CSV 文件、过滤和聚合列、连接等标准操作上获得了很好的加速效果。
- import cudf # 看起来和用起来都像 Pandas,但运行在 GPU 上
df = cudf.read_csv('myfile.csv')
df = df[df.name == 'Alice']
df.groupby('id').value.mean()
-
-
- cuDF 是不断发展的 RAPIDS initiative 的一部分。
-
- Dask Dataframe 库围绕 Pandas API 提供了并行算法。它根据任务图将分布式 groupbys 或分布式连接等大型操作分解为许多较小的单节点 groupbys 或连接(以及许多其他操作)。
- import dask.dataframe as dd # 看起来和用起来都像 Pandas,但并行运行
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df.groupby('id').value.mean().compute()
-
-
- Dask 分布式任务调度器为复杂的任务图提供了通用的并行执行能力。它非常适合将多节点计算添加到现有代码库中。
有了这些构建模块,我们的方法是让 cuDF API 足够接近 Pandas,以便我们可以重用 Dask Dataframe 算法。
这种方法的优点和挑战
这种方法有几个优点
- 我们可以重用最初为 Pandas 设计的 Dask Dataframe 中的并行算法。
- 它将开发工作整合到一个代码库中,这样未来在 CPU Dataframe 上花费的精力也会惠及 GPU Dataframe,反之亦然。维护成本得以分摊。
- 通过构建能够平等地支持两种 DataFrame 实现(CPU 和 GPU)的代码,我们建立了约定和协议,这将使其他项目更容易做到这一点,无论是使用这两个类似 Pandas 的库,还是未来的类似 Pandas 的库。
- 这种方法还旨在表明,生态系统应该支持类似 Pandas 的库,而不仅仅是 Pandas。例如,如果(何时?)Arrow 库开发出计算系统,那么我们也将更容易将其整合进来。
-
- 在进行任何重构时,我们都倾向于清理现有代码。
- 例如,为了使 dask dataframe 准备好迎接新的 GPU Parquet 阅读器,我们最终重构并简化了我们的 Parquet I/O 逻辑。
-
这种方法也有一些缺点。也就是说,它对 cuDF 造成了 API 压力,使其需要匹配 Pandas,因此
- API 中的微小差异现在会导致更大的问题,例如以下几点
- Join 列顺序不同 rapidsai/cudf #251
- Groupby 聚合列顺序不同 rapidsai/cudf #483
-
-
- cuDF 承受着重复一些人认为 Pandas API 中存在错误的压力的。
- 例如,今天的 cuDF 在处理缺失值方面比 Pandas 更为合理。cuDF 是否需要仅仅为了匹配 Pandas 的语义而退回到旧的处理方式?Dask Dataframe 可能需要更灵活,以处理语义的演变和微小差异。
-
替代方案
我们也可以围绕 cuDF 编写一个新的类似 dask-dataframe 的项目,使其偏离 Pandas/Dask Dataframe API。直到最近,这确实一直是采取的方法,并且 dask-cudf 项目正是这样做的。早期采用这种方法可能是个不错的选择,可以开始工作和进行原型设计。该项目能够使用 dask delayed 实现广泛的功能,包括 groupby-聚合、连接等等。
不过,我们现在正在 dask dataframe 的基础上重新进行这项工作,这意味着我们正在失去 dask-cudf 已经具备的一些功能,但希望我们现在添加的功能会更稳定,并建立在更坚实的基础之上。
当前状态
目前只有很少的功能可用,但可用的部分运行得相当流畅。
这里有一个简单的示例,它从多个 CSV 文件中读取数据,选取一列,并进行一些聚合。
from dask_cuda import LocalCUDACluster
import dask_cudf
from dask.distributed import Client
cluster = LocalCUDACluster() # 在八个本地 GPU 上运行
client = Client(cluster)
gdf = dask_cudf.read_csv('data/nyc/many/*.csv') # 包装多个 CSV 文件
>>> gdf.passenger_count.sum().compute()
184464740
另请注意,纽约市出租车乘客量比几年前显著下降。
我对上面示例感到兴奋的地方
- 围绕 cuDF 代码的所有基础设施,如集群设置、诊断、JupyterLab 环境等,都是免费获得的,就像任何其他新的 Dask 项目一样。
- 这是我的 JupyterLab 设置截图
-
-
-
- 我们的 df 对象实际上只是一个普通的 Dask Dataframe。我们不必编写新的 `__repr__`、`__add__` 或 `.sum()` 实现,而且可能许多我们没有想到的函数今天都能很好地工作(尽管也有许多不行)。
- 我们紧密集成,并且与其他系统联系更紧密。例如,如果我们将 dask-cudf-dataframe 转换为 dask-pandas-dataframe,只需使用 cuDF 的 `to_pandas` 函数
- df = df.map_partitions(cudf.DataFrame.to_pandas)
-
-
- 我们不必编写任何特殊代码,比如单独的 `.to_dask_dataframe` 方法或处理其他特殊情况。
-
- Dask 并行性与选择 CPU 或 GPU 正交。
-
- 切换硬件很容易。通过避免单独的 dask-cudf 代码路径,将 cuDF 添加到现有 Dask+Pandas 代码库以便在 GPU 上运行变得更容易,或者如果我们希望代码在没有 GPU 的情况下运行,则可以移除 cuDF 并使用 Pandas。
- 下面的性能扩展部分有更多示例。
-
上面示例的问题
总的来说,答案是**许多小问题**。
- `cudf.read_csv` 函数尚不支持从单个 CSV 文件读取块,因此不适用于非常大的 CSV 文件。我们不得不先使用普通的 Dask+Pandas 将大型 CSV 文件分割成许多较小的 CSV 文件
- import dask.dataframe as dd
(df = dd.read_csv('few-large/*.csv')
.repartition(npartitions=100)
.to_csv('many-small/*.csv', index=False))
-
-
- (参见 rapidsai/cudf #568)
-
- 许多以前在 dask-cudf 中可以工作的操作,如 groupby-聚合和连接,现在不再工作。未来几个月,我们需要稍微修改许多 cuDF API,使其更接近其 Pandas 对应功能。
- 我运行了计时单元两次,因为目前导入 cudf 需要几秒钟。rapidsai/cudf #627
- 我们不得不让 Dask Dataframe 更灵活一些,减少对其组成数据框必须是完全 Pandas 数据框的假设。(参见 dask/dask #4359 和 dask/dask #4375 获取示例)。我预计未来还需要进行更多类似这样的小改动。
这些问题代表着数十个类似的问题。它们都是可以解决的,事实上,许多问题目前正由致力于 RAPIDS 的优秀人士积极修复中。
近期计划
RAPIDS 小组目前正忙于发布 0.5 版本,其中包括运行上述示例所需的一些修复,以及许多不相关的稳定性改进。这可能会让他们忙碌一到两周,在此期间,我预计除了规划之外,不会有太多 Dask + cuDF 相关工作进展。
之后,Dask 并行支持将成为重中之重,因此我期待在这方面看到快速进展。
性能扩展结果
在我上一篇关于将 Dask Array 与 GPU 加速的 Numpy 库 CuPy 结合的文章中,我们看到在处理一些简单的随机数据的简单问题上使用多个 GPU 带来了令人印象深刻的加速效果。
Dask Array + CuPy 在随机数据上的表现
架构 时间 单 CPU 核 2小时 39分 四十 CPU 核 11分 30秒 一 GPU 1 分 37秒 八 GPU 19秒
这项实验很容易扩展,因为它几乎完全受限于创建随机数据的计算。
Dask DataFrame + cuDF 在 CSV 数据上的表现
我们对上面的 `read_csv` 示例进行了类似的研究,该示例主要受限于从磁盘读取 CSV 数据然后进行解析。您可以在这里看到相应的笔记本。我们获得了相似(尽管不太令人印象深刻)的数据。
架构 时间 带宽 单 CPU 核 3分 14秒 50 MB/s 八 CPU 核 58秒 170 MB/s 四十 CPU 核 35秒 285 MB/s 一 GPU 11秒 900 MB/s 八 GPU 5秒 2000 MB/s
带宽数值是通过注意到磁盘上的数据大约是 10 GB 计算得出的
分析
首先,我想再次强调,由于所有不同项目之间的 Pandas API 兼容性,使用此设置很容易测试各种不同的架构。我们在各种具有不同成本的硬件上看到了广泛的性能表现(跨度达 40 倍)。
其次,请注意,这个问题在 CPU 和 GPU 上的扩展性都低于我们之前的 CuPy 示例。我怀疑这是因为这个示例也受限于 I/O,而不仅仅是数值计算。虽然从单 CPU 到单 GPU 的跳跃很大,但从单 CPU 到多 CPU 或单 GPU 到多 GPU 的跳跃并没有我们期望的那么大。例如,对于 GPU,当我们增加了 8 倍的 GPU 时,只获得了大约 2 倍的加速。
一开始可能会认为这是因为我们饱和了磁盘读取速度。然而,有两个证据与这个猜测相悖
- 熟悉我当前硬件的 NVIDIA 人员告诉我,当他们小心操作时,可以获得更高的 I/O 吞吐量
- CPU 的扩展性也同样差,尽管它显然没有达到完全的 I/O 带宽
相反,很可能是我们没有仔细处理磁盘和 I/O 管道。
我们可能需要考虑更仔细地处理单机内的数据局部性。或者,我们可以选择使用更小的机器,或者许多更小的机器。我的团队一直在让我开始尝试一些比 DGX 更便宜的系统,我可能很快就会进行实验。可能对于数据加载和预处理工作负载,“将尽可能多的计算打包到单个盒子中”的先前智慧不再适用(除非我们付出更多努力)。
来帮忙吧
如果上面的工作听起来您感兴趣,那就来帮忙吧!这里有很多容易实现且影响巨大的工作可以做。
如果您有兴趣从事这些主题并获得报酬,那么可以考虑申请职位。NVIDIA 的 RAPIDS 团队正在招聘工程师,从事基于 GPU 的 Dask 开发以及其他数据分析库开发项目。