用Spark处理PostgreSQL数据
以下のような話です。Apache Sparkは注目の的で、このようなこともできるとのことです。Sparkのマニュアルを読んで見つけたので、実際に試してみました。試した環境は……。
-
- CentOS 7.1
-
- Apache Spark 1.4.1
- PostgreSQL 9.4.4
是的。
Apache Spark是一个用于大规模数据处理和分析的开源集群计算框架。
不提供Spark的解释。
Apache Spark是一种快速的分布式处理平台,不仅可以与Hadoop和Cassandra等数据存储系统配合使用,还能够提取和处理存储在关系型数据库中的数据。
因此,您可以享受Spark快速处理的好处,而无需迁移现有数据。
将PostgreSQL的表加载到Spark中
因为要使用JDBC连接,所以需要PostgreSQL的JDBC驱动程序。
这次我们决定使用spark-shell进行简便操作。
$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar spark-shell
只需将PostgreSQL JDBC Driver的jar文件添加到类路径,并启动即可。
[2016/01/05 添加]
使用SPARK_CLASSPATH会引发以下弃用警告。
16/01/05 17:05:49 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1206-jdbc42.jar').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
如同消息所述,若要使用driver-class-path命令行选项,
$ spark-shell --driver-class-path=/home/spark/postgresql-9.4-1206-jdbc42.jar
如果是那样的话,那就可以了。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5f3f57ff
举个例子,如果PostgreSQL在本地主机的5432端口上运行,并且希望以postgres用户的身份将foo表的数据导入Spark,则需要执行以下代码。
scala> val fooRDD = sqlContext.load("jdbc", Map(
| "url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
| "dbtable" -> "public.foo"))
这将在Spark上将foo表的数据作为RDD和DataFrame加载。
[2016/01/05 更新]
如果按照上述的方法,将出现已不建议使用的警告。
<console>:25: warning: method load in class SQLContext is deprecated: Use read.format(source).options(options).load(). This will be removed in Spark 2.0.
val fooDF = sqlContext.load("jdbc", Map(
^
让我们使用下述的API。
scala> val fooDF = sqlContext.read.format("jdbc").options(
| Map("url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
| "dbtable" -> "public.foo")).load()
fooDF: org.apache.spark.sql.DataFrame = [a: int, b: string]
令人高兴的是,foo表的模式将以带有模式的RDD为DataFrame形式导入Spark上。
方便( *‘∀‘ )ノ
scala> fooRDD.printSchema()
root
|-- a: integer (nullable = false)
|-- b: string (nullable = true)
由于在列a上设置了主键,非空约束已经正确传递。太好了!
这次没有使用,但似乎可以在加载时指定DataFrame的分区数等。
然后,可以尽情地使用Spark的API进行操作。
scala> fooRDD.filter(fooRDD("a") > 99).show()
+---+--------+
| a| b|
+---+--------+
|100|inserted|
|101|inserted|
|103|inserted|
|104|inserted|
|105|inserted|
+---+--------+
scala> fooRDD.count()
res7: Long = 100
此外,如果注册为临时表,还可以使用HiveQL来执行类似SQL的操作。
scala> fooRDD.registerTempTable("foo")
scala> val res = sqlContext.sql("SELECT * FROM foo WHERE a > 73 AND a < 76")
res: org.apache.spark.sql.DataFrame = [a: int, b: string]
scala> res.show()
+--+--------+
| a| b|
+--+--------+
|74|inserted|
|75|inserted|
+--+--------+
这真厉害。
Spark和RDB的使用场景区分
使用此功能,可以将存储在PostgreSQL中的数据加载到Spark中,在分布式服务器的内存空间中,能够快速执行JOIN和排序操作。
对于PostgreSQL来说,
-
- 大きなテーブル同士のJOIN処理などをSparkにオフロードできる
-
- DBより上の層でのデータ・パーティショニングをしてくれることで、組み合わせることでスケーラビリティを獲得できる
-
- 異なるDB間のデータの突き合わせなど、単体サーバではカバーできない範囲をSparkが補ってくれる
- RDBに不向きなデータはHDFSやCasssandraなどのその他のデータストアに任せておいても、Spark上でPostgreSQLデータと連携してもらえる
有如下优点
对Spark而言,
-
- レコード単位の処理や、トランザクション管理が必要な処理など苦手な範囲をPostgreSQLに任せる
- データの永続性、データの整合性、一貫性の管理もPostgreSQLにお願いできる
有许多令人高兴的方面。
虽然在这个世界上,并不只有美好的事情,可能还存在着我们看不见的陷阱,但是感觉有能力做新事物的机会,真是太棒了。