开始使用Spark + Cassandra
火花
下载spark并进行构建。
cd spark-1.5.0
make_distribution.sh
火花卡桑德拉连接器
在这里,你可以git克隆spark-cassandra-connector并进行构建。然而,很抱歉的是,该构建过程可能会失败而无法完成。
git clone https://github.com/datastax/spark-cassandra-connector
cd spark-cassandra-connector
git checkout b1.5
sbt package
sbt assembly
用这个命令,spark-cassandra-connector/target/scala-2.10下会生成一个装配jar文件。
如果Spark-Cassandra-Connector构建失败,可以从Maven中央仓库单独下载jar包并使用它。
我下载的是
-
- guava-0.18
-
- spark-cassandra-connector_2.10-1.5.0-M2.jar
- cassandra-driver-core-2.2.0-rc2.jar
因为没有安装guava-0.18,因此被大家生气了,所以我添加了它。
启动spark-shell。
如果spark-cassandra-connector构建成功
bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar
如果spark-cassandra-connector构建失败
bin/spark-shell --jars spark-cassandra-connector_2.10-1.5.0-M2.jar,cassandra-driver-core-2.2.0-rc2.jar,guava-18.0.jar --conf spark.cassandra.connection.host=127.0.0.1
在Mac和Linux上
-
- macでlocalのcassandraでの動作確認は 127.0.0.1, cassandra.yamlのseedsには 127.0.0.1を記述
- linuxで分散モードでcassandraでの動作確認は host=ipaddress of host, このipはcassandra.yamlのseedsに書いてあるip
验证 cassandra keyspace 和 table 的创建操作。
在Spark Shell中执行以下操作
import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions
val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.execute("CREATE KEYSPACE test_from_spark WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
c.withSessionDo ( session => session.execute("CREATE TABLE test_from_spark.fun (k int PRIMARY KEY, v int)"))
请确认在cqlsh中已经创建了keyspace和table。
cqlsh:mykeyspace> describe keyspaces;
system_auth mykeyspace test test_from_spark
system system_distributed system_traces
对数据插入进行操作验证
scala> c.withSessionDo ( session => session.execute("insert into test_from_spark.fun (k,v) values(1, 10)"))
res6: com.datastax.driver.core.ResultSet = ResultSet[ exhausted: true, Columns[]]
确认一下是否在CQLSH中输入到table中。
cqlsh:test_from_spark> select * from fun;
k | v
---+----
1 | 10
2 | 20
(2 rows)
在CassandraTable上以RDD的形式获取表并进行操作确认。
scala> val d = sc.cassandraTable("test_from_spark", "fun");
d: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala> d.count
res4: Long = 2
scala> d
res7: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
scala> d.collect
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 1, v: 10}, CassandraRow{k: 2, v: 20})
scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}
CassandraRow{k: 2, v: 20}
正在进行的错误信息
-
- com.google.util.concurrent.**がclasspathにありませんエラー => guave.0.18.jarを自前でjarsに追加すると治った。
- cassandra connectの動作確認中に以下のエラー ※cassandraの設定をいじったら治った?
scala> c.withSessionDo ( session => session.execute("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
15/11/14 11:21:10 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {192.168.11.2}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
at $iwC$$iwC$$iwC.<init>(<console>:51)
at $iwC$$iwC.<init>(<console>:53)
at $iwC.<init>(<console>:55)
at <init>(<console>:57)
at .<init>(<console>:61)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.11.2:9042 (com.datastax.driver.core.TransportException: [/192.168.11.2:9042] Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:229)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1264)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
... 56 more
scala>