提交新活动

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

提交新闻专题

谢谢!您的提交已收到!
糟糕!提交表单时出错了。

订阅时事通讯

谢谢!您的提交已收到!
糟糕!提交表单时出错了。
2017年10月16日

流式处理 Dataframes

作者

这项工作得到了 Anaconda Inc 和来自 Moore Foundation 的数据驱动发现计划的支持。

本文介绍的是实验性软件。该软件尚未准备好供公众使用。本文中的所有代码示例和 API 都可能随时更改,恕不另行通知。

摘要

本文描述了一个原型项目,该项目旨在使用 Pandas 和 Streamz 处理表格数据的连续数据源。

引言

有些数据永不停歇。它们以持续不断的、永无止境的流形式到达。这发生在金融时间序列、Web 服务器日志、科学仪器、物联网遥测等领域。处理这些数据的算法与 NumPy 和 Pandas 等库中发现的算法略有不同,后者的假设是它们预先知道所有数据。仍然可以使用 NumPy 和 Pandas,但需要将它们与一些巧妙的方法结合起来,并保留足够的中间数据,以便在新数据传入时计算边际更新。

示例:流式处理均值

例如,想象一下我们有一个持续到达的 CSV 文件流,并且我们希望随时间打印出我们数据的均值。每当有新的 CSV 文件到达时,我们需要重新计算整个数据集的均值。如果我们足够聪明,我们会保留足够的中间状态,以便在不回顾其余历史数据的情况下计算出这个均值。我们可以通过保留运行总计和运行计数来实现这一点,如下所示:

total = 0
count = 0

for filename in filenames: # filenames 是一个无限迭代器
df = pd.read_csv(filename)
total = total + df.sum()
count = count + df.count()
mean = total / count
print(mean)

现在,随着我们将新文件添加到 filenames 迭代器中,我们的代码会打印出随时间更新的新均值。我们没有一个单一的均值结果,而是一个连续的均值结果流,每个结果都对直到那一刻的数据有效。我们的输出数据是一个无限流,就像我们的输入数据一样。

当我们的计算像这样是线性的、直截了当的时,一个 for 循环就足够了。然而,当我们的计算有多个流分支或汇聚,可能在它们之间有限速或缓冲时,这种 for 循环方法会变得复杂且难以管理。

Streamz

几个月前,我发布了一个名为 streamz 的小型库,它处理管道的控制流,包括线性映射操作、累积状态的操作、分支、合并,以及背压、流量控制、反馈等等。Streamz 旨在处理所有数据移动并在适当的时间发出计算信号。这个库被几个团队悄悄使用,现在感觉相当干净且有用。

Streamz 旨在处理此类系统的控制流,但在流式处理算法方面没有提供任何帮助。在过去的一周里,我一直在 Streamz 的基础上构建一个 dataframe 模块,以帮助处理常见的流式表格数据场景。该模块使用 Pandas 并实现了 Pandas API 的一部分,因此希望对于具有现有 Python 知识的程序员来说易于使用。

示例:流式处理均值

上面的示例可以使用 Streamz 编写如下:

source = Stream.filenames('path/to/dir/*.csv') # 文件名流
.map(pd.read_csv) # Pandas dataframes 流
.to_dataframe(example=...)) # 逻辑流式 dataframe

sdf.mean().stream.sink(print) # 打印的均值流

这个例子并不比 for 循环版本更清晰。就其本身而言,这可能比我们之前的方法更差,仅仅因为它涉及新技术。然而,在两种情况下它开始变得有用:

  1. 您想进行更复杂的流式处理算法
  2. sdf = sdf[sdf.name == 'Alice']
    sdf.x.groupby(sdf.y).mean().sink(print)

    # or

    sdf.x.rolling('300ms').mean()
  3. 如果像上面那样使用 for 循环来构建这些算法,将需要更多的巧妙方法。
  4. 您想进行多个操作、处理流量控制等。
  5. sdf.mean().sink(print)
    sdf.x.sum().rate_limit(0.500).sink(write_to_database)
    ...
  6. 持续地分支计算、正确地路由数据以及处理时间都可能是一致地实现这些目标的挑战。

Jupyter 集成和流式输出

在开发过程中,我们发现在 Jupyter 中有实时更新的输出非常有用。

通常,当我们在 Jupyter 中评估代码时,我们有静态输入和静态输出

然而现在,我们的输入和输出都是实时的

我们通过结合使用 ipywidgetsBokehplots 来实现这一点,它们都提供了很好的钩子来更改之前的 Jupyter 输出,并且与 Tornado IOLoop 配合良好(streamz、Bokeh、Jupyter 和 Dask 都使用 Tornado 进行并发)。我们能够在发生变化时构建出很好的响应式反馈。

在下面的示例中,我们构建了 CSV 到 dataframe 的管道,当目录中出现新文件时,该管道会更新。每当我们将文件拖到左侧的数据目录中时,我们都会看到右侧的所有输出都会更新。

支持哪些功能?

这个项目非常年轻,需要一些帮助。API 中还有很多空白。话虽如此,以下功能运行良好:

元素级操作

sdf['z'] = sdf.x + sdf.y
sdf = sdf[sdf.z > 2]

简单归约

sdf.sum()
sdf.x.mean()

Groupby 归约

sdf.groupby(sdf.x).y.mean()

按行数或时间窗口滚动归约

sdf.rolling(20).x.mean()
sdf.rolling('100ms').x.quantile(0.9)

使用 Bokeh 进行实时绘图(我最喜欢的功能之一)

sdf.plot()

缺少什么?

  1. 并行计算: Streamz 核心库有一个可选的 Dask 后端用于并行计算。我尚未尝试将其附加到 dataframe 实现中。
  2. 从 Kafka 等常见流式数据源进行数据摄入。我们现在正在构建 Kafka Python 客户端库的异步感知包装器,所以这很可能会很快实现。
  3. 乱序数据访问: 在并行数据摄入(例如一次读取多个 Kafka 分区)之后不久,我们需要弄清楚如何处理乱序数据访问。这是可行的,但需要一些努力。这正是像 Flink 这样更成熟的库非常强大的地方。
  4. 性能: 上述某些操作(特别是滚动操作)确实涉及非平凡的复制,尤其是在窗口较大时。我们严重依赖 Pandas 库,而它并非设计用于快速变化的数据。希望 Pandas 的未来迭代(Arrow/libpandas/Pandas 2.0?)能使其更高效。
  5. 填充完整的 API: 许多常用操作(如方差)尚未实现。部分原因在于懒惰,部分原因在于希望找到正确的算法。
  6. 鲁棒绘图: 目前,这对于带有时间序列索引的数值数据效果很好,但对于其他数据则不太好。

但最重要的是,这需要有实际问题的人们使用,以帮助我们理解这里有哪些有价值的,有哪些令人不快。

欢迎对以上任何方面提供帮助。

您可以从 Github 安装此库

pip install git+https://github.com/mrocklin/streamz.git

文档和代码在这里:

当前工作

当前和即将开展的工作重点是从 Kafka 摄入数据以及使用 Dask 进行并行处理。