介绍用于大规模数据科学的Apache Spark数据框架

以下是《Apache Spark数据框架用于大规模数据科学》的翻译。

由于这是2015年2月的文章,所以可能存在部分内容已经过时的可能,但基本的思路没有改变。

我很高兴能够宣布新的数据框API的发布,这个API是为了让更多人能够轻松进行大数据分析而设计的。

最初,Apache Spark的开源目标是为通用编程语言(Java、Python、Scala)提供简单的API,以实现分布式数据处理。借助Spark,我们可以通过对分布式数据集合(RDD)进行功能转换来实现分布式处理。这是一个非常强大的API,它可以用十几行代码来描述需要几千行代码来表达的任务。

Spark的增长非常迅速,我们希望不仅仅限于”大数据”工程师,还可以让更多广泛的用户能够利用分布式处理的功能。为了实现这一目标,我们开发了全新的数据框架API。这个API受到R和Python(Pandas)的数据框架的启发,但是经过了从头设计,以支持现代的大数据和数据科学应用。作为现有RDD API的扩展,数据框架提供以下功能。

    • 単体のラップトップにおけるキロバイトのデータから、大規模クラスターでペタバイトのデータにスケール。

 

    • 幅広いデータフォーマット、ストレージシステムをサポート。

 

    • Spark SQL Catalystオプティマイザーによる最新の最適化、コード生成。

 

    • Sparkを介したビッグデータツールとインフラストラクチャのシームレスな統合。

 

    Python、Java、Scala、R向けAPIの提供。

对于熟悉其他语言的数据框架的用户来说,这个API肯定让他们感觉像在自己的家中一样。而对于现有的Spark用户来说,他们可以通过使用这个扩展API来更容易地进行编程,并通过智能优化和代码生成来提高性能。

数据框是什么?

在Spark中,数据帧是一种分布式集合,可以通过命名列对数据进行组织。从概念上讲,数据帧类似于关系型数据库中的表格或者R/Python中的数据帧,但在内部进行了各种优化。数据帧可以通过结构化数据文件、Hive表、外部数据库、现有的RDD等各种数据源构建。

在以下示例中,展示了如何使用Python构建数据框。类似的API也可在Scala、Java和R中使用。

# Constructs a DataFrame from the users table in Hive.
users = context.table("users")

# from JSON files in S3
logs = context.load("s3n://path/to/data.json", "json")

怎样使用数据框(Data Frame)呢?

在构建完成之后,数据框将提供特定领域语言用于分布式数据处理。以下示例展示了如何使用数据框来操作大规模用户的人口统计数据。

# Create a new DataFrame that contains “young users” only
young = users.filter(users.age < 21)

# Alternatively, using Pandas-like syntax
young = users[users.age < 21]

# Increment everybody’s age by 1
young.select(young.name, young.age + 1)

# Count the number of young users by gender
young.groupBy("gender").count()

# Join young users with another DataFrame called logs
young.join(logs, logs.userId == users.userId, "left_outer")

通过使用Spark SQL,您可以在操作数据帧时使用SQL。在下面的示例中,我们计算了young数据帧中的用户数量。

young.registerTempTable("young")
context.sql("SELECT count(*) FROM young")

在Python中,您可以自由地将Pandas数据框和Spark数据框进行转换。

# Convert Spark DataFrame to Pandas
pandas_df = young.toPandas()

# Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)

数据帧与RDD类似,以惰性方式进行评估。也就是说,只有在需要执行操作(例如显示结果,保存输出)时才进行计算。通过这种方式,可以应用在“内部处理:智能优化和代码生成”中介绍的谓词推送和字节码生成等技术来优化处理。所有数据帧操作都自动并行分布在集群上。

数据格式和数据源的支持。

users = context.jdbc("jdbc:postgresql:production", "users")
logs = context.load("/path/to/traffic.log")
logs.join(users, logs.userId == users.userId, "left_outer") \
.groupBy("userId").agg({"*": "count"})

应用程序:先进的分析、机器学习

数据科学家正在开始利用比连接和聚合更高级的技术。为了应对这一趋势,您可以直接在MLlib的机器学习管道API中使用数据框架。此外,您还可以对数据框架应用任意复杂的用户函数。

可以使用MLlib的新管道API来指定最常见的先进分析任务。例如,下面的代码构建了一个简单的文本分类管道,由分词器、哈希单词频率特征提取器和逻辑回归组成。

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

一旦建立了数据管道,就可以直接在数据框上进行训练。

df = context.load("/path/to/data")
model = pipeline.fit(df)

在比机器学习管道API更为复杂的任务中,应用程序可以对数据框应用任意函数。此外,还可以使用Spark的现有RDD API进行操作。下面的代码片段演示了对数据框的bio列执行类似于大数据中的”Hello World”的词频统计任务。

df = context.load("/path/to/people.json")
# RDD-style methods such as map, flatMap are available on DataFrames
# Split the bio text into multiple words.
words = df.select("bio").flatMap(lambda row: row.bio.split(" "))
# Create a new DataFrame to count the number of words
words_df = words.map(lambda w: Row(word=w, cnt=1)).toDF()
word_counts = words_df.groupBy("word").sum()

内部处理:智能优化和代码生成

与R或Python中的立即评估数据框不同,Spark的数据框具有自身的执行过程,并由查询优化器自动优化。在对数据框进行计算之前,Catalyst优化器会将构建数据框所需的操作编译为物理执行计划。优化器可以理解操作的语义和数据结构,从而能够做出明智的决策以加快处理速度。

在高级水平上,存在着两种类型的优化。首先,Catalyst应用了逻辑优化,例如谓词下推。优化器会将谓词过滤器下推到数据源中,以便跳过不需要进行物理执行的数据。对于Parquet文件而言,可以跳过整个数据块,并通过字典编码将字符串比较转换为更低成本的整数比较。对于关系型数据库而言,谓词会被下推到外部数据库,并可以减少数据流量。

下一步,Catalyst会将操作编译成物理执行计划,并生成相应的JVM字节码,以便于优化比手写代码更好。例如,为了减少网络流量,可以适当选择广播连接或洗牌连接。还会进行低级别的优化,例如排除高成本对象分配和减少虚拟函数调用等。因此,通过将现有的Spark程序迁移到数据框架,可以期望性能改进。

数据框架受到了之前的分布式数据框架项目Adatao的DDF和Ayasdi的BigDF的启发。然而,与这些项目相比,数据框架的主要区别是它经过Catalyst优化器,并且像Spark SQL查询一样进行优化执行。我们不断改进Catalyst优化器,使得引擎更加智能化,并且随着新的Spark发布,应用程序也得以加速。

Databricks的数据科学团队正在使用数据帧API来构建内部的数据管道。数据帧API简化了使用的Spark程序,使其更易于理解,并提高了性能。我们认为这非常令人高兴,并相信更多的用户将能够访问大数据处理。

这个API将作为Spark 1.3的一部分于2015年3月上旬发布。如果您迫不及待,可以尝试在Github上使用Spark。

没有使用过往的数据框架实现,是无法实现此功能的。对R、Pandas、DDF、BigDF的开发者们表示感谢。

Databricks 免费试用

Databricks 免费试用

广告
将在 10 秒后关闭
bannerAds