用Spark处理PostgreSQL数据

以下のような話です。Apache Sparkは注目の的で、このようなこともできるとのことです。Sparkのマニュアルを読んで見つけたので、実際に試してみました。試した環境は……。

    • CentOS 7.1

 

    • Apache Spark 1.4.1

 

    PostgreSQL 9.4.4

是的。

Apache Spark是一个用于大规模数据处理和分析的开源集群计算框架。

不提供Spark的解释。

Apache Spark是一种快速的分布式处理平台,不仅可以与Hadoop和Cassandra等数据存储系统配合使用,还能够提取和处理存储在关系型数据库中的数据。

因此,您可以享受Spark快速处理的好处,而无需迁移现有数据。

将PostgreSQL的表加载到Spark中

因为要使用JDBC连接,所以需要PostgreSQL的JDBC驱动程序。
这次我们决定使用spark-shell进行简便操作。

$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar spark-shell

只需将PostgreSQL JDBC Driver的jar文件添加到类路径,并启动即可。

[2016/01/05 添加]

使用SPARK_CLASSPATH会引发以下弃用警告。

16/01/05 17:05:49 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1206-jdbc42.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

如同消息所述,若要使用driver-class-path命令行选项,

$ spark-shell --driver-class-path=/home/spark/postgresql-9.4-1206-jdbc42.jar

如果是那样的话,那就可以了。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@5f3f57ff

举个例子,如果PostgreSQL在本地主机的5432端口上运行,并且希望以postgres用户的身份将foo表的数据导入Spark,则需要执行以下代码。

scala> val fooRDD = sqlContext.load("jdbc", Map(
     | "url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
     | "dbtable" -> "public.foo"))

这将在Spark上将foo表的数据作为RDD和DataFrame加载。

[2016/01/05 更新]
如果按照上述的方法,将出现已不建议使用的警告。

<console>:25: warning: method load in class SQLContext is deprecated: Use read.format(source).options(options).load(). This will be removed in Spark 2.0.
         val fooDF = sqlContext.load("jdbc", Map(
                                ^

让我们使用下述的API。

scala> val fooDF = sqlContext.read.format("jdbc").options(
     | Map("url" -> "jdbc:postgresql://localhost:5432/postgres?user=postgres",
     | "dbtable" -> "public.foo")).load()
fooDF: org.apache.spark.sql.DataFrame = [a: int, b: string]

令人高兴的是,foo表的模式将以带有模式的RDD为DataFrame形式导入Spark上。
方便( *‘∀‘ )ノ

scala> fooRDD.printSchema()
root
 |-- a: integer (nullable = false)
 |-- b: string (nullable = true)

由于在列a上设置了主键,非空约束已经正确传递。太好了!

这次没有使用,但似乎可以在加载时指定DataFrame的分区数等。

然后,可以尽情地使用Spark的API进行操作。

scala> fooRDD.filter(fooRDD("a") > 99).show()
+---+--------+
|  a|       b|
+---+--------+
|100|inserted|
|101|inserted|
|103|inserted|
|104|inserted|
|105|inserted|
+---+--------+
scala> fooRDD.count()
res7: Long = 100

此外,如果注册为临时表,还可以使用HiveQL来执行类似SQL的操作。

scala> fooRDD.registerTempTable("foo")
scala> val res = sqlContext.sql("SELECT * FROM foo WHERE a > 73 AND a < 76")
res: org.apache.spark.sql.DataFrame = [a: int, b: string]
scala> res.show()
+--+--------+
| a|       b|
+--+--------+
|74|inserted|
|75|inserted|
+--+--------+

这真厉害。

Spark和RDB的使用场景区分

使用此功能,可以将存储在PostgreSQL中的数据加载到Spark中,在分布式服务器的内存空间中,能够快速执行JOIN和排序操作。

对于PostgreSQL来说,

    • 大きなテーブル同士のJOIN処理などをSparkにオフロードできる

 

    • DBより上の層でのデータ・パーティショニングをしてくれることで、組み合わせることでスケーラビリティを獲得できる

 

    • 異なるDB間のデータの突き合わせなど、単体サーバではカバーできない範囲をSparkが補ってくれる

 

    RDBに不向きなデータはHDFSやCasssandraなどのその他のデータストアに任せておいても、Spark上でPostgreSQLデータと連携してもらえる

有如下优点

对Spark而言,

    • レコード単位の処理や、トランザクション管理が必要な処理など苦手な範囲をPostgreSQLに任せる

 

    データの永続性、データの整合性、一貫性の管理もPostgreSQLにお願いできる

有许多令人高兴的方面。

虽然在这个世界上,并不只有美好的事情,可能还存在着我们看不见的陷阱,但是感觉有能力做新事物的机会,真是太棒了。

广告
将在 10 秒后关闭
bannerAds