开始使用Spark + Cassandra

火花

下载spark并进行构建。

cd spark-1.5.0
make_distribution.sh

火花卡桑德拉连接器

在这里,你可以git克隆spark-cassandra-connector并进行构建。然而,很抱歉的是,该构建过程可能会失败而无法完成。

git clone https://github.com/datastax/spark-cassandra-connector
cd spark-cassandra-connector
git checkout b1.5
sbt package
sbt assembly

用这个命令,spark-cassandra-connector/target/scala-2.10下会生成一个装配jar文件。

如果Spark-Cassandra-Connector构建失败,可以从Maven中央仓库单独下载jar包并使用它。

我下载的是

    • guava-0.18

 

    • spark-cassandra-connector_2.10-1.5.0-M2.jar

 

    cassandra-driver-core-2.2.0-rc2.jar

因为没有安装guava-0.18,因此被大家生气了,所以我添加了它。

启动spark-shell。

如果spark-cassandra-connector构建成功

bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar 

如果spark-cassandra-connector构建失败

bin/spark-shell --jars spark-cassandra-connector_2.10-1.5.0-M2.jar,cassandra-driver-core-2.2.0-rc2.jar,guava-18.0.jar --conf spark.cassandra.connection.host=127.0.0.1

在Mac和Linux上

    • macでlocalのcassandraでの動作確認は 127.0.0.1, cassandra.yamlのseedsには 127.0.0.1を記述

 

    linuxで分散モードでcassandraでの動作確認は host=ipaddress of host, このipはcassandra.yamlのseedsに書いてあるip

验证 cassandra keyspace 和 table 的创建操作。

在Spark Shell中执行以下操作

import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions

val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.execute("CREATE KEYSPACE test_from_spark WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
c.withSessionDo ( session => session.execute("CREATE TABLE test_from_spark.fun (k int PRIMARY KEY, v int)"))

请确认在cqlsh中已经创建了keyspace和table。

cqlsh:mykeyspace> describe keyspaces;

system_auth  mykeyspace          test           test_from_spark
system       system_distributed  system_traces

对数据插入进行操作验证

scala> c.withSessionDo ( session => session.execute("insert into test_from_spark.fun (k,v) values(1, 10)"))
res6: com.datastax.driver.core.ResultSet = ResultSet[ exhausted: true, Columns[]]

确认一下是否在CQLSH中输入到table中。

cqlsh:test_from_spark> select * from fun;

 k | v
---+----
 1 | 10
 2 | 20

(2 rows)

在CassandraTable上以RDD的形式获取表并进行操作确认。

scala> val d = sc.cassandraTable("test_from_spark", "fun");
d: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.count
res4: Long = 2                                                                  
                                                                          scala> d
res7: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.collect
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 1, v: 10}, CassandraRow{k: 2, v: 20})

scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}                                                       
CassandraRow{k: 2, v: 20}

正在进行的错误信息

    • com.google.util.concurrent.**がclasspathにありませんエラー => guave.0.18.jarを自前でjarsに追加すると治った。

 

    cassandra connectの動作確認中に以下のエラー ※cassandraの設定をいじったら治った?
scala> c.withSessionDo ( session => session.execute("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
15/11/14 11:21:10 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {192.168.11.2}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC.<init>(<console>:53)
    at $iwC.<init>(<console>:55)
    at <init>(<console>:57)
    at .<init>(<console>:61)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.11.2:9042 (com.datastax.driver.core.TransportException: [/192.168.11.2:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:229)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1264)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
    ... 56 more


scala> 

广告
将在 10 秒后关闭
bannerAds