我们对大型数据集执行奇异值分解(SVD)计算。
我们通过使用全精度方法和近似方法,以及使用CPU和GPU来修改计算。
最后,我们在多GPU机器上对200GB模拟数据进行了近似SVD计算,耗时15-20秒。然后,我们从存储在云中的数据集运行此计算,发现I/O(正如预期的那样)是一个主要瓶颈。
Dask数组包含一个相对复杂的SVD算法,适用于高瘦或矮胖的情况,但在大致为方形的情况下效果不佳。它的工作原理是对数组的每个块进行QR分解,结合R矩阵,对这些矩阵进行另一个较小的SVD计算,然后执行一些矩阵乘法以获得完整结果。在数值上稳定且速度相当快,前提是QR分解的中间R矩阵大部分能放入内存。
这里的内存限制是,如果您有一个n×m的高瘦数组(n >> m)被切成k个块,那么您需要大约m**2 * k的空间。这在许多情况下都是如此,包括典型的PCA机器学习工作负载,其中您有具有少量列(最多几百列)和许多行的表格数据。
它易于使用且非常健壮。
import dask.array as da
x = da.random.random((10000000, 20))
x
数组 块字节 1.60 GB 100.00 MB 形状 (10000000, 20) (625000, 20) 计数 16 任务 16 块 类型 float64 numpy.ndarray 2010000000
u, s, v = da.linalg.svd(x)
这在矮胖的情况下也工作得很好(当列远远多于行时),但我们总是假设您的一个维度没有分块,而另一个维度的块相当长,否则,东西可能无法放入内存。
如果您的数据集在两个维度上都很大,那么上述算法将无法直接工作。但是,如果您不需要精确结果,或者只需要少数几个分量,那么有许多出色的近似算法。
Dask数组在da.linalg.svd_compressed函数中实现了其中一种近似算法。借助它,我们可以计算非常大矩阵的近似SVD。
我们最近在研究一个问题(下文解释),发现在使用这个算法时仍然会出现内存不足的情况。我们遇到了两个挑战:
在进一步深入技术解决方案之前,我们快速介绍一下这项工作的动机用例。
许多研究正在使用基因组测序来研究物种内不同个体之间的遗传变异。这包括对人类群体以及小鼠、蚊子或致病寄生虫等其他物种的研究。这些研究通常会在基因组序列中找到大量个体之间存在差异的位点。例如,人类基因组中有超过1亿个变异位点,而像UK BioBank这样的现代研究正致力于对100万或更多个体的基因组进行测序。
在人类、小鼠或蚊子等二倍体物种中,每个个体携带两个基因组序列,一个遗传自父亲,一个遗传自母亲。在100万个可变基因组位点中的每一个位点,单个基因组可能携带两个或更多的“等位基因”。一种思考方式是借助于Punnett square,它代表了个体在这些可变位点之一可能携带的不同可能基因型。
在上述情况中,有三种可能的基因型:AA、Aa和aa。在计算基因组学中,这些基因型可以编码为0、1或2。在对一个物种的M个遗传变异和N个个体样本进行研究时,我们可以将这些基因型表示为一个(M × N)的整数数组。对于现代人类遗传学研究,这个数组的规模可能接近(1亿 × 100万)。(虽然在实践中,第一个维度(变异数量)的大小可以适当减少,至少一个数量级,因为许多遗传变异携带的信息很少和/或彼此相关。)
这些遗传差异不是随机的,而是携带着个体间遗传相似性和共同祖先模式的信息,因为它们是通过多代遗传下来的。一项常见的任务是对这些数据进行降维分析,例如主成分分析(SVD),以识别反映这些共同祖先程度差异的遗传结构。这是发现与不同疾病相关的遗传变异以及了解群体和物种遗传历史的重要部分。
减少计算SVD等分析所需的时间,就像所有科学一样,可以在更短的时间内探索更大的数据集和检验更多的假设。实际上,这意味着不仅仅是快速的SVD计算,而是一个端到端的加速流水线,从数据加载到分析再到理解。
我们希望在比泡一杯茶更短的时间内运行一个实验。
了解了这些科学背景后,我们回到计算的话题。
为了阻止Dask保留数据,我们在构建图时特意触发计算。这在Dask计算中有点不典型(我们更倾向于在计算前一次性完成尽可能多的计算),但考虑到这个问题的多遍性质,这很有用。这是一个相当容易的改动,并且在dask/dask #5041中可用。
此外,我们发现开启中等程度的任务融合很有帮助。
import dask
dask.config.set({"optimization.fuse.ave-width": 5})
我们将在几种不同的硬件选择上尝试这个SVD计算,包括:
我们可以在Macbook Pro上轻松地对20GB数组执行SVD计算。
import dask.array as da
x = da.random.random(size=(1_000_000, 20_000), chunks=(20_000, 5_000))
u, s, v = da.linalg.svd_compressed(x, k=10, compute=True)
v.compute()
这个调用不再完全是惰性的,它会重新计算x几次,但它可以工作,并且只需要消费者笔记本电脑上的几GB内存就能运行。
在笔记本电脑上计算大约需要2分30秒。太棒了!尝试起来超级容易,不需要任何特殊硬件或设置,而且在许多情况下速度足够快。通过本地工作,我们可以快速迭代。
现在一切都已奏效,我们可以尝试不同的硬件了。
免责声明:其中一位作者(Ben Zaitlen)受雇于NVIDIA。
通过使用多GPU机器,我们可以显著提高性能。NVIDIA和其他制造商现在生产在同一物理箱体内共存多个GPU的机器。在以下部分,我们将在一台DGX2上运行计算,这台机器配备16个GPU和高速GPU间互连。
下面几乎是相同的代码,运行时间显著缩短,但我们做了以下更改:
在DGX2上,我们可以在10到15秒内在200GB Dask数组上计算SVD。
完整notebook在此,相关代码片段如下:
# 一些GPU特定设置
from dask_cuda import LocalCluster
cluster = LocalCluster(...)
client = Client(cluster)
import cupy
import dask.array as da
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
# 创建数据并正常运行SVD
x = rs.randint(0, 3, size=(10_000_000, 20_000),
chunks=(20_000, 5_000), dtype="uint8")
x = x.persist()
u, s, v = da.linalg.svd_compressed(x, k=10, seed=rs)
v.compute()
要查看运行过程,建议观看此屏幕录像。
尽管令人印象深刻,但上述计算主要受生成随机数据和执行矩阵计算的限制。GPU擅长这两项工作。
然而在实践中,我们的输入数组不会是随机生成的,它将来自存储在磁盘上的某个数据集,或者越来越普遍地,存储在云中。为了让事情更真实,我们使用存储在Zarr格式中GCS的数据执行了类似的计算。
在这个Zarr SVD示例中,我们将一个基于GCS的25GB数据集加载到DGX2上,运行了几个预处理步骤,然后执行了SVD。预处理和SVD计算的总耗时为18.7秒,数据加载耗时为14.3秒。
再次,在DGX2上,从数据加载到SVD计算,我们都在比泡一杯茶更短的时间内完成了。然而,数据加载可以加速。我们从GCS将数据读入机器的主内存(主机内存),解压zarr位,然后将数据从主机内存移动到GPU(设备内存)。在主机内存和设备内存之间来回传输数据会显著降低性能。直接读取到GPU,绕过主机内存,将改善整个流水线。
因此,我们回到了高性能计算的一个常见经验:
高性能计算的重点不在于把某件事做得无比出色,而在于不把任何事做得糟糕。.
在这种情况下,GPU让我们的计算速度足够快,以至于我们现在需要关注系统中的其他组件,特别是磁盘带宽,以及用于将Zarr数据直接读取到GPU内存的读取器。
免责声明:其中一位作者(Matthew Rocklin)受雇于Coiled Computing。
我们也可以使用任何数量的框架在云上运行它。在这种情况下,我们使用了Coiled Cloud服务在AWS上进行部署。
from coiled_cloud import Cloud, Cluster
cloud = Cloud()
cloud.create_cluster_type(
organization="friends",
name="genomics",
worker_cpu=4,
worker_memory="16 GiB",
worker_environment={
"OMP_NUM_THREADS": 1,
"OPENBLAS_NUM_THREADS": 1,
# "EXTRA_PIP_PACKAGES": "zarr"
},
)
cluster = Cluster(
organization="friends",
typename="genomics",
n_workers=20,
)
from dask.distributed import Client
client = Client(cluster)
# 然后正常进行
使用总共80个CPU核心的20台机器处理一个比MacBook Pro示例大10倍的数据集,我们能够在大致相同的时间内完成运行。这表明该问题几乎达到了最优扩展性,考虑到SVD计算的复杂性,这是个不错的现象。
可以在此处观看此问题的屏幕录像。
提高性能的最简单方法之一是通过压缩来减小数据大小。这些数据高度可压缩有两个原因:
我们先解决第二个问题。
理想情况下,Numpy应该有两位整数数据类型。不幸的是它没有,这很难实现,因为计算机中的内存通常以完整字节为单位。
为了解决这个问题,我们可以使用位算术将四个值塞进一个值中。以下是实现此功能的函数,假设我们的数组是方形的,并且最后一个维度可以被四整除。
import numpy as np
def compress(x: np.ndarray) -> np.ndarray
out = np.zeros_like(x, shape=(x.shape[0], x.shape[1] // 4))
out += x[:, 0::4]
out += x[:, 1::4] << 2
out += x[:, 2::4] << 4
out += x[:, 3::4] << 6
return out
def decompress(out: np.ndarray) -> np.ndarray
back = np.zeros_like(out, shape=(out.shape[0], out.shape[1] * 4))
back[:, 0::4] = out & 0b00000011
back[:, 1::4] = (out & 0b00001100) >> 2
back[:, 2::4] = (out & 0b00110000) >> 4
back[:, 3::4] = (out & 0b11000000) >> 6
return back
然后,我们可以结合Dask使用这些函数,将数据存储在压缩状态,但按需进行惰性解压。
x = x.map_blocks(compress).persist().map_blocks(decompress)
就是这样。我们压缩数据的每个块并将其存储在内存中。但是,我们拥有的输出变量x会在对每个块进行操作之前解压它,因此我们无需担心处理压缩块。
一种稍微更通用但可能效率较低的方法是使用像Zarr这样的适当压缩库来压缩我们的数组。
下面的示例展示了我们在实践中如何做到这一点。
import zarr
import numpy as np
from numcodecs import Blosc
compressor = Blosc(cname='lz4', clevel=3, shuffle=Blosc.BITSHUFFLE)
x = x.map_blocks(zarr.array, compressor=compressor).persist().map_blocks(np.array)
此外,如果我们使用dask-distributed调度器,那么我们需要确保Blosc压缩库不使用额外的线程。这样我们就不会出现并行调用并行库的情况,这可能导致一些竞争。
def set_no_threads_blosc()
""" 阻止blosc使用多个线程 """
import numcodecs
numcodecs.blosc.use_threads = False
# 在所有worker上运行
client.register_worker_plugin(set_no_threads_blosc)
这种方法更通用,可能是一个不错的备用技巧,但它不能在GPU上工作,这最终是我们选择上面一节中位操作方法的原因,该方法使用的API在Numpy和CuPy库中都是一致可访问的。
在这篇文章中,我们围绕基因组学中的一个重要问题做了几件事。
这个问题很棒,因为它让我们能够深入研究一个技术性的科学问题,同时也能快速尝试多种架构,以便调查未来可能做出的硬件选择。
今天我们在这里使用了几种技术,它们由不同的社区和公司开发。很高兴看到它们无缝地协同工作,提供了灵活而一致的体验。