本文探讨了如何将 ITK 图像处理工具集与 Dask Array 并行使用。
本文涵盖了:
让我们从一个完整的示例开始,将 Richardson Lucy 反卷积应用于一组光片显微镜数据。这是我们在上一篇关于图像加载的博文中展示过如何加载的相同数据
# 从上次加载我们的数据¶
import dask.array as da
imgs = da.from_zarr("AOLLSMData_m4_raw.zarr/", "data")
Array Chunk Bytes 188.74 GB 316.15 MB Shape (3, 199, 201, 1024, 768) (1, 1, 201, 1024, 768) Count 598 Tasks 597 Chunks Type uint16 numpy.ndarray 1993 7681024201
# 加载点扩散函数 (PSF)
import dask.array.image
psf = dask.array.image.imread("AOLLSMData/m4/psfs_z0p1/*.tif")[:, None, ...]
Array Chunk Bytes 2.48 MB 827.39 kB Shape (3, 1, 101, 64, 64) (1, 1, 101, 64, 64) Count 6 Tasks 3 Chunks Type uint16 numpy.ndarray 13 6464101
# 将数据转换为 float32 以进行计算¶
import numpy as np
imgs = imgs.astype(np.float32)
# 注意:psf 的采样体素间距需要
# 与图像的采样一致
psf = psf.astype(np.float32)
# 应用 Richardson-Lucy 反卷积¶
def richardson_lucy_deconvolution(img, psf, iterations=1)
""" 应用反卷积到单个数据块 """
import itk
img = img[0, 0, ...] # 移除前两个长度为一的维度
psf = psf[0, 0, ...] # 移除前两个长度为一的维度
image = itk.image_view_from_array(img) # 转换为 ITK 对象
kernel = itk.image_view_from_array(psf) # 转换为 ITK 对象
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # 转换回 Numpy 数组
result = result[None, None, ...] # 添加回前两个长度为一的维度
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
# 创建 Dask worker 进程的本地集群
# (如果您有分布式集群,也可以指向它)
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # 现在 Dask 操作默认使用此集群
# 触发计算并存储
out.to_zarr("AOLLSMData_m4_raw.zarr", "deconvolved", overwrite=True)
因此在上面的示例中,我们…
从图像科学家角度来看,这里的新技术是 dask.array.map_blocks 函数。给定一个由许多 NumPy 数组组成的 Dask 数组和一个函数,map_blocks 会将该函数并行应用于每个块,并返回一个 Dask 数组作为结果。每当您想以简单的方式对多个块应用操作时,这是一个很棒的工具。因为 Dask 数组仅由 Numpy 数组构成,所以很容易将 Dask 与科学 Python 生态系统的其余部分结合使用。
然而在这种情况下,由于 ITK 和 Dask Array 的特性,构建正确的 Numpy-> Numpy 函数存在一些挑战。让我们再看看我们的函数
def richardson_lucy_deconvolution(img, psf, iterations=1)
""" 应用反卷积到单个数据块 """
import itk
img = img[0, 0, ...] # 移除前两个长度为一的维度
psf = psf[0, 0, ...] # 移除前两个长度为一的维度
image = itk.image_view_from_array(img) # 转换为 ITK 对象
kernel = itk.image_view_from_array(psf) # 转换为 ITK 对象
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image,
kernel_image=kernel,
number_of_iterations=iterations
)
result = itk.array_from_image(deconvolved) # 转换回 Numpy 数组
result = result[None, None, ...] # 添加回前两个长度为一的维度
return result
out = da.map_blocks(richardson_lucy_deconvolution, imgs, psf, dtype=np.float32)
这比我们希望的要长。相反,我们本希望直接使用 itk 函数,而无需前后的所有步骤。
deconvolved = da.map_blocks(itk.richardson_lucy_deconvolution_image_filter, imgs, psf)
我们函数中的额外步骤是什么?为什么它们是必需的?
但是,如果您习惯于处理此类问题,那么如果您想在集群上并行化 ITK 操作,ITK 和 map_blocks 可以成为强大的组合。
上面我们使用了 dask.distributed.LocalCluster 在我们的本地工作站上设置了 20 个单线程 worker
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=20, threads_per_process=1)
client = Client(cluster) # 现在 Dask 操作默认使用此集群
如果您有分布式资源,这就是您连接它的地方。您可以将 LocalCluster 替换为Dask 的其他部署选项之一。
此外,我们发现需要使用许多单线程进程而不是一个多线程进程,因为 ITK 函数似乎仍然持有 GIL。这没关系,我们只需要意识到这一点,以便适当地设置我们的 Dask worker,每个进程一个线程以获得最大效率。有关此主题的活动 Github 问题,请参阅 ITK #1134。
在使用 ITK 库跨多个进程时遇到了一些困难,因为库本身序列化得不好。(如果您不理解这是什么意思,请不用担心)。我们在ITK #1090 中解决了一些问题,但仍有一些问题存在。
我们通过将 import 放在函数内部而不是外部来解决这个问题。
def richardson_lucy_deconvolution(img, psf, iterations=1)
import itk # <--- 我们通过在函数内部导入来解决序列化问题
这样每个任务都会单独导入 itk,我们就避开了这个问题。
我们还尝试了Scikit-Image 中的 Richardson Lucy 反卷积操作。Scikit-Image 以更接近 Scipy/Numpy 原生而闻名,但不总是像 ITK 一样快。我们的经验证实了这种看法。
首先,我们很高兴看到 scikit-image 函数与 map_blocks 立即协同工作,没有任何打包/解包、维度或序列化问题
import skimage.restoration
out = da.map_blocks(skimage.restoration.richardson_lucy, imgs, psf) # 直接可用
因此,这里不再需要将图像对象相互转换或移除和添加单例维度。
在性能方面,我们也很高兴看到 Scikit-Image 释放了 GIL,因此在使用少量多线程进程时,我们获得了非常高的报告 CPU 利用率。然而,尽管 CPU 利用率很高,但我们的并行性能却很差,以至于我们还是坚持使用 ITK 解决方案,尽管它有一些缺点。有关此内容的更多信息可在 Github issue scikit-image #4083 中找到。
注意:在单个块上顺序执行时,ITK 运行时间约为 2 分钟,而 scikit-image 运行时间为 3 分钟。只有当我们开始并行化时,事情才变慢。
无论如何,我们在本次实验中的目标是看看 ITK 和 Dask array 协同工作的效果如何。很高兴看到平滑的集成是什么样子,即使只是为了激励未来 ITK+Dask 关系的开发。
da.map_blocks 的替代方案是泛化通用函数 (gufuncs)。这些函数具有许多神奇的特性,其中之一是它们在 NumPy 和 Dask 数组上都能很好地运行。如果像 ITK 或 Scikit-Image 这样的库将其函数转换为 gufuncs,那么用户无需做任何特殊操作即可使用它们。
目前实现 gufuncs 最简单的方法是使用 Numba。我在我们包装的 richardson_lucy 函数上做了这个,只是为了展示它如何工作,以防其他库将来想要采用这种方式。
import numba
@numba.guvectorize(
["float32[:,:,:], float32[:,:,:], float32[:,:,:]"], # 我们需要指定类型
"(i,j,k),(a,b,c)->(i,j,k)", # 并明确指定维度
forceobj=True,
)
def richardson_lucy_deconvolution(img, psf, out)
# <---- 无需维度解包!
iterations = 1
image = itk.image_view_from_array(np.ascontiguousarray(img))
kernel = itk.image_view_from_array(np.ascontiguousarray(psf))
deconvolved = itk.richardson_lucy_deconvolution_image_filter(
image, kernel_image=kernel, number_of_iterations=iterations
)
out[:] = itk.array_from_image(deconvolved)
# 现在这个函数可以直接在 NumPy 或 Dask 数组上运行
out = richardson_lucy_deconvolution(imgs, psf) # <-- 无需调用 map_blocks!
请注意,我们既摆脱了维度解包,也摆脱了 map_blocks 调用。我们的函数现在知道足够的关于如何广播的信息,Dask 可以在不被明确告知做什么的情况下进行并行化。
这给库维护者带来了一些负担,但使用户体验更加顺畅。
在进行图像处理和 Dask 用户调研时,几乎所有受访者都表示他们想要更快的反卷积。这似乎是一个主要的痛点。现在我们知道原因了。它既非常常见,又非常慢。
在单个大小如此的块上运行反卷积需要大约 2-4 分钟,而我们在单个数据集中有数百个块。多核并行性在此处可能会有所帮助,但此问题也可能适合进行 GPU 加速。类似操作通常在 GPU 上可获得 100 倍的加速。这可能比扩展到大型分布式集群更实用。
这次实验既…
在这些技术问题在后台得到解决的同时,我们也将继续我们的图像实验。下一步是分割!