利用Cassandra进行数据分析的方法——充分利用GPU和RAPIDS
首先
关于Apache Cassandra
Apache Cassandra是一个开源的分布式数据库管理系统。
和其他分布式数据库管理系统一样,我们使用多个通用服务器来构建一个数据库(根据开发等目的,也可以只使用一个服务器来构建)。
在这里,我们将详细说明省略不谈,我们将把介绍给感兴趣的人的角色交给官方网站和维基百科。
https://cassandra.apache.org/_/index.html –
https://en.wikipedia.org/wiki/Apache_Cassandra – 问答助手不能为需求提供中文的选项。
使用Cassandra进行数据分析的方法-利用GPU和RAPIDS技术
本文中,本人將以簡單易懂的方式介紹以下博客內容,特別針對日本讀者。
并行处理系统的出现以及GPU的出现
当企业开始将大量数据存储在诸如Apache Cassandra的NoSQL数据存储中时,对于从数据中获取洞察和进行基于大数据的数据分析的需求也随之出现。
为了满足对这些大量数据的快速处理等需求,出现了类似Apache Spark的并行处理系统。
另一方面,在当今企业的数据分析中,人工智能和机器学习技术正迅速得到采用。
数据科学,特别是机器学习,由于需要使用大量的并行计算,因此非常适合在GPU上执行,比起CPU能够以数个数量级更快地执行”多任务”。
在这个帖子中,我们使用RAPIDS生态系统中的工具来直接将Cassandra SSTable解析到GPU设备内存中,介绍了处理Cassandra数据的最先进方法。
通过这种方式,用户可以更快地访问分析信息,而无需进行大量的初始设置,并且还可以轻松迁移用Python编写的现有分析代码。
在这篇文章中,我将简要介绍RAPIDS项目,并介绍一系列选项,以便使用RAPIDS对Cassandra数据进行分析。
最终,我将解释目前使用C++解析SSTable文件并将其转换为适合GPU的格式,以便更容易将数据加载到GPU设备内存中的方法。
RAPIDS是什么?
RAPIDS 是一个开源的库套件,可在GPU上以端到端的方式执行分析和数据科学任务。
RAPIDS是由Nvidia开发的开发者工具包,旨在让开发者能够利用GPU。它起源于CUDA,是一个让开发者能够利用GPU的开发者工具包。RAPIDS可以让常见的人工智能/机器学习API(如pandas和scikit-learn)利用GPU加速。下面的图片展示了利用GPU加速的数据分析能带来的好处(以下图片引自rapids.ai)。

一旦使用 cuDF(基本上相当于 pandas DataFrame 的 RAPIDS 格式)从 GPU 上获取数据之后,您可以使用熟悉的 Python 库,如 pandas 和 scikit-learn,几乎相同的 API 来操作数据。
下面是在机器学习的各个阶段中使用的库的关系。这个图示表示的是在使用CPU的情况下。

以下的图表展示的是使用GPU而不是CPU的情况。

使用Apache Arrow
在RAPIDS中,使用Apache Arrow作为基础的内存格式。
由于Arrow是基于列而不是行进行设计的,因此可以实现更快速的分析查询。
此外,还附带了用于在进程间传输Arrow记录批处理(类似于表格形式数据)的进程间通信(IPC)机制。IPC格式与内存格式相同,因此可以消除额外的复制和反序列化开销,实现非常快速的数据访问。
在使用GPU进行分析方面的好处是显而易见的。唯一需要的是适当的硬件。只需要将Python数据科学库的API替换为等效的RAPIDS API,就可以将现有的数据科学代码迁移到GPU上运行。
卡桑德拉
在利用GPU有效地使用Cassandra数据的PARIDS之前,应该采取哪些方法呢?这里有五种可能的方法。
-
- 使用Cassandra驱动程序提取数据并将其转换为pandas DataFrame,然后将其转换为cuDF。
虽然使用Cassandra驱动程序提取数据的步骤相同,但跳过pandas的步骤,直接将驱动程序的数据转换为Arrow表。
使用在Cassandra服务器内部执行的逻辑(级别的代码),从磁盘读取SSTable并使用Arrow IPC流格式进行序列化,然后发送给客户端。
与方法3相同,但使用C++进行自定义解析实现,而不是使用Cassandra代码。
与方法4相同,但在解析SSTable期间使用基于CUDA的GPU向量化。
首先,我们将解释这些方法的概要,最后对其进行比较并解释下一步。
使用Cassandra驱动程序从数据库中检索数据。
即使不怎么进行黑客攻击,也可以使用现有的库,所以这种方法非常简单。获取驱动程序的数据,并将session.row_factory设置为pandas_factory函数,告诉驱动程序将接收到的数据转换为pandas.DataFrame的方法。
一旦完成上述步骤,接下来,只需调用cudf.DataFrame.from_pandas函数将数据加载到GPU上非常容易。然后,可以使用RAPIDS库来执行GPU加速分析。
首先,安装必要的 Python 库也是必要的。以下是使用 Conda 环境的示例。
conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql cudf pyarrow pandas numpy cassandra-driver
以下是代码示例。有关详细信息,请参阅DataStax Python驱动程序的文档。
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
import pyarrow as pa
import cudf
from blazingsql import BlazingContext
import config
# connect to the Cassandra server in the cloud and configure the session settings
cloud_config= {
'secure_connect_bundle': '/path/to/secure/connect/bundle.zip'
}
auth_provider = PlainTextAuthProvider(user=’your_username_here’, password=’your_password_here’)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()
def pandas_factory(colnames, rows):
"""Read the data returned by the driver into a pandas DataFrame"""
return pd.DataFrame(rows, columns=colnames)
session.row_factory = pandas_factory
# run the CQL query and get the data
result_set = session.execute("select * from your_keyspace.your_table_name limit 100;")
df = result_set._current_rows # a pandas dataframe with the information
gpu_df = cudf.DataFrame.from_pandas(df) # transform it into memory on the GPU
# do GPU-accelerated operations, such as SQL queries with blazingsql
bc = BlazingContext()
bc.create_table("gpu_table", gpu_df)
bc.describe_table("gpu_table")
result = bc.sql("SELECT * FROM gpu_table")
print(result)
使用Cassandra驱动程序直接将数据提取到Arrow
这种方法的前半部分与上一种方法相同,但将pandas_factory切换为arrow_factory。
def get_col(col):
rtn = pa.array(col) # automatically detects the type of the array
# for a full implementation, we'd need to fully check which arrow types need
# to be manually casted for compatibility with cudf
if pa.types.is_decimal(rtn.type):
return rtn.cast('float32')
return rtn
def arrow_factory(colnames, rows):
# convert from the row format passed by
# CQL into the column format of arrow
cols = [get_col(col) for col in zip(*rows)]
table = pa.table({ colnames[i]: cols[i] for i in range(len(colnames)) })
return table
session.row_factory = arrow_factory
接下来,我们将以相同的方式提取数据,并创建cuDF。
然而,这两种方法都存在重大缺点。
它们都依赖于对现有Cassandra集群的查询,但从性能角度来看,这并不一定是理想的。
由于高读取负载的分析工作负载可能会对重要的事务操作工作负载产生影响,这可能导致实时性能的下降。
在实时数据分析需求中,实际上不能只考虑数据库的分析工作负载的影响。
我想确认一种不经过数据库,直接从磁盘上的 SSTable 文件获取数据的方法。
使用Cassandra服务器代码从磁盘上读取SSTable。
可能的最简单的方法是通过使用现有的Cassandra服务器技术,即SSTableLoader,来读取位于磁盘上的SSTable。一旦获取了从SSTable中获取分区列表,就可以手动将Java对象数据转换为对应于表列的Arrow向量。然后,可以将向量集合序列化为Arrow IPC流格式,并通过套接字进行流式传输。
实际上,这种方法在开发方面并没有比下一个介绍的方法更进展,因此对于这种方法的详细介绍将被省略。
此外,这种方法的避免因素是,这种方法需要在与Cassandra集群进程不同的一个进程(或机器)上执行,而使用SSTableLoader时,首先需要在客户端进程中初始化(嵌入式级别的)Cassandra,这可能需要相当长的时间来进行冷启动。
4. 使用自定义的SSTable解析器
为了避免初始化Cassandra,我们开发了一个自定义的C++实现,用于解析二进制数据SSTable文件。
有关此方法的详细信息,请参阅以下博客文章。
这本由The Last Pickle编写的Cassandra存储引擎指南,在解读数据格式时非常有帮助。
最终在语言解析器中加入了支持CUDA,并且使用C++来进行低级别的二进制数据处理控制。
将CUDA与表格读取集成以加快速度。
计划在进行更全面的定制分析实施后,开始采用这种方法。利用GPU矢量化技术,可以大幅度加快读取和转换的过程。
比較性能
目前阶段的焦点主要在于读取 SSTable 文件所需的时间。
关于1和2的方法,第一种方法依赖于额外的硬件(Cassandra集群),而第二种方法由于在Cassandra本身内部存在复杂的缓存效果,因此实际上无法公平地测量这个时间。然而,在第3和第4种方法中,可以通过执行简单的内省来跟踪程序从头到尾读取SSTable文件所需的时间。
以下是针对包含1k、5k、10k、50k、100k、500k和1m行数据的数据集所生成的NoSQLBench结果。

根据图表显示,即使没有进行额外的优化,如多线程等,自定义实现仍比现有的Cassandra实现稍微快一点。
总结
考虑到分析用例数据访问模式通常包括大规模扫描和频繁地读取整个表,取得这些数据的最有效方法是直接访问SSTable,而不是使用CQL。我们使用C++实现了能够执行此操作的SSTable解析器,并将数据转换为Apache Arrow,以便在包含NVIDIA GPU的RAPIDS生态系统中使用分析库。这个开源项目被称为sstable-to-arrow,符合Apache 2许可证,可在GitHub上获取,并可以通过Docker Hub作为Alpha版本进行访问。
如果对尝试使用sstable-to-arrow感兴趣,请参考以下博客文章。
最后
在这篇文章中介绍的尝试可能会被认为过于专业化,实际应用起来可能有较高的难度。
有趣的是,这里介绍的对于内部实现的方法,具体来说是将数据存储格式通用化,即SSTable,正在考虑将其反映到Cassandra主体实现中,以Cassandra Enhancement Request的形式。