这篇文章探讨了一个使用 Dask 在 Python 中计算复杂信用模型的实际案例。这是一个复杂的并行系统示例,其工作负载远远超出了传统的“大数据”范围。
大家好,
这是一篇来自 Rich Postelnik 的客座文章,他是一名 Anaconda 员工,与一家大型零售银行合作开发信用建模系统。他们正在使用 Dask 进行有趣的复杂计算管理工作(参见下面的任务图)。这是一个使用 Dask 解决既不是大型数据框也不是大型数组,但仍然高度并行的复杂问题的好例子。Rich 非常友善地写下了他们问题的描述并在此分享。
谢谢 Rich!
本文同步发布在Anaconda 开发者博客。
附:如果其他人有类似的解决方案并愿意分享,我也很乐意将它们发布在这个博客上。
当申请贷款时,如信用卡、抵押贷款、汽车贷款等,我们想要估计违约的可能性以及获得的利润(或损失)。这些模型由一组相互依赖的复杂方程组成。可以有数百个方程,每个方程最多可以有 20 个输入并产生 20 个输出。需要追踪的信息量巨大!我们希望避免手动追踪依赖关系,以及像下面这样的混乱 Python 代码
def final_equation(inputs)
out1 = equation1(inputs)
out2_1, out2_2, out2_3 = equation2(inputs, out1)
out3_1, out3_2 = equation3(out2_3, out1)
...
out_final = equation_n(inputs, out,...)
return out_final
这归结为一个称为任务调度的依赖和排序问题。
一个 有向无环图 (DAG) 通常用于解决任务调度问题。Dask 是一个用于延迟任务计算的库,其核心利用了有向图。dask.delayed 是一个简单的装饰器,可以将 Python 函数转换为图中的顶点。如果我将一个延迟函数的输出作为参数传递给另一个延迟函数,Dask 会在它们之间创建一条有向边。我们来看一个例子
def add(x, y)
return x + y
>>> add(2, 2)
4
这里我们有一个将两个数字相加的函数。我们来看看用 dask.delayed 包装它会发生什么
>>> add = dask.delayed(add)
>>> left = add(1, 1)
>>> left
Delayed('add-f6204fac-b067-40aa-9d6a-639fc719c3ce')
现在 add 返回一个 Delayed 对象。我们可以将其作为参数传回给我们的 dask.delayed 函数,开始构建计算链。
>>> right = add(1, 1)
>>> four = add(left, right)
>>> four.compute()
4
>>> four.visualize()
下面我们可以看到 DAG 是如何开始形成的。
假设我是一家抵押贷款银行,有 10 个人正在申请抵押贷款。我想根据信用记录年限和收入,估计这组人的平均违约可能性。
hist_yrs = range(10)
incomes = range(10)
我们还假设违约是根据增加的信用记录年限和一半的经验年限计算的函数。虽然可以这样写
def default(hist, income)
return (hist + 1) ** 2 + (income / 2)
我知道将来我需要增加的信用记录年限进行另一项计算,并且希望能够重用代码并避免重复计算。相反,我可以将这些函数拆分出来
from dask import delayed
@delayed
def increment(x)
return x + 1
@delayed
def halve(y)
return y / 2
@delayed
def default(hist, income)
return hist**2 + income
注意我是如何用 delayed 包装这些函数的。现在,这些函数不再返回一个数字,而是返回一个 Delayed 对象。更好的是,这些函数还可以接受 Delayed 对象作为输入。正是这种将 Delayed 对象作为输入传递给其他延迟函数的方式,使得 Dask 能够构建任务图。我现在可以像正常的 Python 代码一样在我的数据上调用这些函数
inc_hist = [increment(n) for n in hist_yrs]
halved_income = [halve(n) for n in income]
estimated_default = [default(hist, income) for hist, income in zip(inc_hist, halved_income)]
如果你看这些变量,你会发现实际上还没有进行任何计算。它们都是 Delayed 对象的列表。
现在,要获得平均值,我可以简单地对 estimated_default 求和,但我希望它能够扩展(并生成更有趣的图),所以我们采用一种合并式(merge-style)的归约。
@delayed
def agg(x, y)
return x + y
def merge(seq)
if len(seq) < 2
return seq
middle = len(seq)//2
left = merge(seq[:middle])
right = merge(seq[middle:])
if not right
return left
return [agg(left[0], right[0])]
default_sum = merge(estimated_defaults)
此时 default_sum 是一个长度为 1 的列表,它的第一个元素是所有申请人估计违约的总和。要获得平均值,我们将总和除以申请人数并调用 compute
avg_default = default_sum[0] / 10
avg_default.compute() # 40.75
要查看 Dask 将使用的计算图,我们调用 visualize
avg_default.visualize()
这就是如何使用 Dask 构建一个包含可重用中间计算的复杂方程系统的方法。
对于我们的信用建模问题,我们使用 Dask 创建了一个自定义数据结构来表示各个方程。使用上面的默认示例,它看起来像这样
class Default(Equation)
inputs = ['inc_hist', 'halved_income']
outputs = ['defaults']
@delayed
def equation(self, inc_hist, halved_income, **kwargs)
return inc_hist**2 + halved_income
这允许我们将每个方程编写成一个独立的函数,并标记其输入和输出。有了这组方程对象,我们可以确定计算的顺序(使用拓扑排序),并让 Dask 处理图的生成和计算。这消除了手动在代码库中传递参数的繁重任务。下面是银行实际使用的某个特定模型的任务图示例。
这个图有点太大了,无法用普通的 my_task.visualize() 方法渲染,所以我们转而使用 Gephi 来生成上面这个漂亮的彩色图。图中混乱的上半部分是单个方程的计算。放大后我们可以看到入口点,即我们的输入 pandas DataFrame,它在顶部是一个大的橙色圆圈,并如何被输入到许多方程中。
模型的输出大约是输入的 100 倍,所以在末尾我们通过树形归约进行了一些聚合。这解释了图中结构化程度更高的下半部分。底部的大绿色节点是我们的最终输出。
有了基于 Dask 的数据结构,我们能够将更多时间花在编写模型代码上,而不是维护计算引擎本身。这使得设计和编写模型的分析师与运行模型的计算系统之间实现了清晰的分离。Dask 还提供了许多上面未涉及的优点。例如,使用 Dask,您还可以访问诊断信息,如每个任务运行的时间和使用的资源。此外,您可以使用dask distributed相对轻松地分发计算。现在,如果我想在超出内存大小的数据或分布式集群上运行我们的模型,就不必担心重写代码来集成 Spark 等工具。最后,Dask 允许能够使用 pandas 的业务分析师或技术水平较低的人员通过dask dataframe访问大型数据集。
from dask import delayed
@delayed
def increment(x)
return x + 1
@delayed
def halve(y)
return y / 2
@delayed
def default(hist, income)
return hist**2 + income
@delayed
def agg(x, y)
return x + y
def merge(seq)
if len(seq) < 2
return seq
middle = len(seq)//2
left = merge(seq[:middle])
right = merge(seq[middle:])
if not right
return left
return [agg(left[0], right[0])]
hist_yrs = range(10)
incomes = range(10)
inc_hist = [increment(n) for n in hist_yrs]
halved_income = [halve(n) for n in incomes]
estimated_defaults = [default(hist, income) for hist, income in zip(inc_hist, halved_income)]
default_sum = merge(estimated_defaults)
avg_default = default_sum[0] / 10
avg_default.compute()
avg_default.visualize() # 需要安装 graphviz 和 python-graphviz
特别感谢 Matt Rocklin、Michael Grant、Gus Cavanagh 和 Rory Merritt 在撰写本文时提供的反馈。