使用cassandra-loader进行批量加载数据

安装

git clone https://github.com/brianmhess/cassandra-loader.git
cd cassandra-load
gradle loader

确认加载操作


#/bin/sh
for i in $(seq 1 100000) ; do
   echo "$i,$((i * 10))"
done
make_test_csv.sh > test.csv
cat test.csv
>1,10
>2,20
>3,30
>....

批量插入

build/cassandra-loader -f test.csv -schema "test_from_spark.fun(k,v)" -host 127.0.0.1

请使用 cqlsh 进行确认

cqlsh

cqlsh:test_from_spark> select * from fun limit 10;

 k    | v
------+-------
 4317 | 43170
 3372 | 33720
 1584 | 15840
 7034 | 70340
 9892 | 98920
 9640 | 96400
 9067 | 90670
 4830 | 48300
 2731 | 27310
 5056 | 50560

(10 rows)

在spark-shell上确认一下。


scala> val t = sc.cassandraTable("test_from_spark", "fun");
t: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[3] at RDD at CassandraRDD.scala:15

scala> t.count
res6: Long = 10000


scala> t.collect.slice(1, 10).foreach(println)
CassandraRow{k: 9067, v: 90670}
CassandraRow{k: 4830, v: 48300}
CassandraRow{k: 2731, v: 27310}
CassandraRow{k: 5056, v: 50560}
CassandraRow{k: 6428, v: 64280}
CassandraRow{k: 2713, v: 27130}
CassandraRow{k: 769, v: 7690}
CassandraRow{k: 9973, v: 99730}
CassandraRow{k: 1863, v: 18630}


scala> t.take(10).foreach(println)
CassandraRow{k: 9640, v: 96400}
CassandraRow{k: 9067, v: 90670}
CassandraRow{k: 4830, v: 48300}
CassandraRow{k: 2731, v: 27310}
CassandraRow{k: 5056, v: 50560}
CassandraRow{k: 6428, v: 64280}
CassandraRow{k: 2713, v: 27130}
CassandraRow{k: 769, v: 7690}
CassandraRow{k: 9973, v: 99730}
CassandraRow{k: 1863, v: 18630}
广告
将在 10 秒后关闭
bannerAds