提交新活动

谢谢!您的提交已收到!
哎呀!提交表单时出错了。

提交新闻报道

谢谢!您的提交已收到!
哎呀!提交表单时出错了。

订阅新闻通讯

谢谢!您的提交已收到!
哎呀!提交表单时出错了。
2021年7月2日

不规则输出,如何处理形状不规则的结果

作者

执行摘要

这篇博文解释了分布式计算和不规则形状或非整齐输出相关的一些困难。我们提出了一种在这些情况下使用 Dask 的推荐方法。

背景

通常,我们会遇到这样的工作流程:数据分析涉及搜索特征(可能存在也可能不存在),然后根据这些特征计算一些结果。由于我们事先不知道会找到多少特征,因此可以预期处理输出的大小会有所不同。

对于分布式工作负载,我们需要分割数据、处理数据,然后重新组合结果。这意味着当 Dask 组合输出时,不规则输出可能会导致问题(例如广播错误)。

问题约束

在这篇博文中,我们将看一个具有以下约束的示例

     
  • 输入数组数据
  •  
  • 需要块之间重叠的处理函数
  •  
  • 返回的输出

解决方案

最简单的策略是一个两步过程

     
  1. 使用 overlap 函数扩展数组块。
  2.  
  3. 使用 map_blocks 以及 drop_axis 关键字参数

示例代码

import dask.array as da

arr = da.random.random((100, 100), chunks=(50,50))  # example input data
expanded = da.overlap.overlap(arr, depth=2, boundary="reflect")
result = expanded.map_blocks(processing_func, drop_axis=1, dtype=float)
result.compute()

支持多种输出类型

此模式支持处理函数的多种输出类型,包括

     
  • numpy 数组
  •  
  • pandas Series
  •  
  • pandas DataFrames

您可以使用下面的任一示例处理函数自行尝试,生成模拟数据输出。或者,您也可以尝试自己的函数。

# Random length, 1D output returned
import numpy as np
import pandas as pd

# function returns numpy array
def processing_func(x):
    random_length = np.random.randint(1, 7)
    return np.arange(random_length)

# function returns pandas series
def processing_func(x):
    random_length = np.random.randint(1, 7)
    output_series = np.arange(random_length)
    return pd.Series(output_series)

# function returns pandas dataframe
def processing_func(x):
    random_length = np.random.randint(1, 7)
    x_data = np.arange(random_length)
    y_data = np.random.random((random_length))
    return pd.DataFrame({"x": x_data, "y": y_data})

为什么不能使用 map_overlapreduction

对于某些 Dask 函数,当输出组合时,不规则输出大小可能导致广播错误。

但是,如果不规则输出大小对于您的特定编程问题不是约束,那么您可以继续随意使用 Dask map_overlapreduction 函数。

备选方案

Dask delayed

作为备选方案,您可以使用 Dask delayed此处提供了一个教程)。

优点

     
  • 您的处理函数可以具有任何类型的输出(它不限于 numpy 或 pandas 对象)
  •  
  • 使用 Dask delayed 的方式更加灵活。

缺点

     

您将不得不自己处理输出的组合。

您必须更注意性能:    

  • 例如,因为下面的代码在列表推导式中使用了 delayed,所以出于性能原因,传递预期的元数据非常重要。幸运的是,dask 提供了 make_meta 函数。      
  • 您可以在此处阅读更多关于 Dask delayed 的性能注意事项和最佳实践。

示例代码

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
import dask

arr = da.ones((20, 10), chunks=(10, 10))

@dask.delayed
def processing_func(x):
    # returns dummy dataframe output
    random_length = np.random.randint(1,10)
    return pd.DataFrame({'x': np.arange(random_length),
                         'y': np.random.random(random_length)})

meta = dd.utils.make_meta([('x', np.int64), ('y', np.int64)])
expanded = da.overlap.overlap(arr, depth=2, boundary="reflect")
blocks = expanded.to_delayed().ravel()
results = [dd.from_delayed(processing_func(b), meta=meta) for b in blocks]
ddf = dd.concat(results)
ddf.compute()

总结

就这样!我们学习了如何在处理返回不规则输出的函数时避免常见错误。此处推荐的方法适用于多种输出类型,包括:numpy 数组、pandas series 和 pandas DataFrames。