这项工作得到了 Anaconda Inc 和来自 Moore Foundation 的数据驱动发现计划的支持。
本文介绍的是实验性软件。该软件尚未准备好供公众使用。本文中的所有代码示例和 API 都可能随时更改,恕不另行通知。
本文描述了一个原型项目,该项目旨在使用 Pandas 和 Streamz 处理表格数据的连续数据源。
有些数据永不停歇。它们以持续不断的、永无止境的流形式到达。这发生在金融时间序列、Web 服务器日志、科学仪器、物联网遥测等领域。处理这些数据的算法与 NumPy 和 Pandas 等库中发现的算法略有不同,后者的假设是它们预先知道所有数据。仍然可以使用 NumPy 和 Pandas,但需要将它们与一些巧妙的方法结合起来,并保留足够的中间数据,以便在新数据传入时计算边际更新。
例如,想象一下我们有一个持续到达的 CSV 文件流,并且我们希望随时间打印出我们数据的均值。每当有新的 CSV 文件到达时,我们需要重新计算整个数据集的均值。如果我们足够聪明,我们会保留足够的中间状态,以便在不回顾其余历史数据的情况下计算出这个均值。我们可以通过保留运行总计和运行计数来实现这一点,如下所示:
total = 0
count = 0
for filename in filenames: # filenames 是一个无限迭代器
df = pd.read_csv(filename)
total = total + df.sum()
count = count + df.count()
mean = total / count
print(mean)
现在,随着我们将新文件添加到 filenames 迭代器中,我们的代码会打印出随时间更新的新均值。我们没有一个单一的均值结果,而是一个连续的均值结果流,每个结果都对直到那一刻的数据有效。我们的输出数据是一个无限流,就像我们的输入数据一样。
当我们的计算像这样是线性的、直截了当的时,一个 for 循环就足够了。然而,当我们的计算有多个流分支或汇聚,可能在它们之间有限速或缓冲时,这种 for 循环方法会变得复杂且难以管理。
几个月前,我发布了一个名为 streamz 的小型库,它处理管道的控制流,包括线性映射操作、累积状态的操作、分支、合并,以及背压、流量控制、反馈等等。Streamz 旨在处理所有数据移动并在适当的时间发出计算信号。这个库被几个团队悄悄使用,现在感觉相当干净且有用。
Streamz 旨在处理此类系统的控制流,但在流式处理算法方面没有提供任何帮助。在过去的一周里,我一直在 Streamz 的基础上构建一个 dataframe 模块,以帮助处理常见的流式表格数据场景。该模块使用 Pandas 并实现了 Pandas API 的一部分,因此希望对于具有现有 Python 知识的程序员来说易于使用。
上面的示例可以使用 Streamz 编写如下:
source = Stream.filenames('path/to/dir/*.csv') # 文件名流
.map(pd.read_csv) # Pandas dataframes 流
.to_dataframe(example=...)) # 逻辑流式 dataframe
sdf.mean().stream.sink(print) # 打印的均值流
这个例子并不比 for 循环版本更清晰。就其本身而言,这可能比我们之前的方法更差,仅仅因为它涉及新技术。然而,在两种情况下它开始变得有用:
在开发过程中,我们发现在 Jupyter 中有实时更新的输出非常有用。
通常,当我们在 Jupyter 中评估代码时,我们有静态输入和静态输出
然而现在,我们的输入和输出都是实时的
我们通过结合使用 ipywidgets 和 Bokehplots 来实现这一点,它们都提供了很好的钩子来更改之前的 Jupyter 输出,并且与 Tornado IOLoop 配合良好(streamz、Bokeh、Jupyter 和 Dask 都使用 Tornado 进行并发)。我们能够在发生变化时构建出很好的响应式反馈。
在下面的示例中,我们构建了 CSV 到 dataframe 的管道,当目录中出现新文件时,该管道会更新。每当我们将文件拖到左侧的数据目录中时,我们都会看到右侧的所有输出都会更新。
这个项目非常年轻,需要一些帮助。API 中还有很多空白。话虽如此,以下功能运行良好:
元素级操作
sdf['z'] = sdf.x + sdf.y
sdf = sdf[sdf.z > 2]
简单归约
sdf.sum()
sdf.x.mean()
Groupby 归约
sdf.groupby(sdf.x).y.mean()
按行数或时间窗口滚动归约
sdf.rolling(20).x.mean()
sdf.rolling('100ms').x.quantile(0.9)
使用 Bokeh 进行实时绘图(我最喜欢的功能之一)
sdf.plot()
但最重要的是,这需要有实际问题的人们使用,以帮助我们理解这里有哪些有价值的,有哪些令人不快。
欢迎对以上任何方面提供帮助。
您可以从 Github 安装此库
pip install git+https://github.com/mrocklin/streamz.git
文档和代码在这里:
当前和即将开展的工作重点是从 Kafka 摄入数据以及使用 Dask 进行并行处理。