安装
git clone https://github.com/brianmhess/cassandra-loader.git
cd cassandra-load
gradle loader
确认加载操作
#/bin/sh
for i in $(seq 1 100000) ; do
echo "$i,$((i * 10))"
done
make_test_csv.sh > test.csv
cat test.csv
>1,10
>2,20
>3,30
>....
批量插入
build/cassandra-loader -f test.csv -schema "test_from_spark.fun(k,v)" -host 127.0.0.1
请使用 cqlsh 进行确认
cqlsh
cqlsh:test_from_spark> select * from fun limit 10;
k | v
------+-------
4317 | 43170
3372 | 33720
1584 | 15840
7034 | 70340
9892 | 98920
9640 | 96400
9067 | 90670
4830 | 48300
2731 | 27310
5056 | 50560
(10 rows)
在spark-shell上确认一下。
scala> val t = sc.cassandraTable("test_from_spark", "fun");
t: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[3] at RDD at CassandraRDD.scala:15
scala> t.count
res6: Long = 10000
scala> t.collect.slice(1, 10).foreach(println)
CassandraRow{k: 9067, v: 90670}
CassandraRow{k: 4830, v: 48300}
CassandraRow{k: 2731, v: 27310}
CassandraRow{k: 5056, v: 50560}
CassandraRow{k: 6428, v: 64280}
CassandraRow{k: 2713, v: 27130}
CassandraRow{k: 769, v: 7690}
CassandraRow{k: 9973, v: 99730}
CassandraRow{k: 1863, v: 18630}
scala> t.take(10).foreach(println)
CassandraRow{k: 9640, v: 96400}
CassandraRow{k: 9067, v: 90670}
CassandraRow{k: 4830, v: 48300}
CassandraRow{k: 2731, v: 27310}
CassandraRow{k: 5056, v: 50560}
CassandraRow{k: 6428, v: 64280}
CassandraRow{k: 2713, v: 27130}
CassandraRow{k: 769, v: 7690}
CassandraRow{k: 9973, v: 99730}
CassandraRow{k: 1863, v: 18630}