用Scala(幻影)来操作Cassandra
一開始
因为我以前从未使用过Scala操作Cassandra,所以我决定尝试一下。
起初我也考虑使用Java的驱动程序,但是phantom的GitHub点赞数与其相差无几,并且它是用Scala编写的,所以我决定试试这个。
※ Cassandra在单节点上启动了Docker的官方仓库。不会对Cassandra本身进行解释。
构成
Scala版本为2.12
sbt版本为1.1
phantom版本为2.24
Cassandra版本为3.11
成果
sbt 可以用中文翻译为”才华洋溢”。
这是一个 SBT 的配置。为了从配置文件中获取 Cassandra 的 ContactPoint,我们还添加了 Lightbend 的 config。
name := "scala-cassandra-example"
version := "0.1"
scalaVersion := "2.12.5"
val phantomVersion = "2.24.2"
libraryDependencies ++= Seq(
"com.outworkers" %% "phantom-dsl" % phantomVersion,
"com.outworkers" %% "phantom-connectors" % phantomVersion,
"com.typesafe" % "config" % "1.3.3",
"org.scalatest" %% "scalatest" % "3.0.5" % Test
)
桌子模型
Phantom可以使用O/R映射(或者直接调用CQL的话可能会感到有点麻烦)。
虽然可以按照普通方式开发,但既然是Cassandra,就考虑使用分区(复合分区键)的感觉来尝试一下。
package com.example
import java.util.UUID
import scala.concurrent.Future
import com.outworkers.phantom.Table
import com.outworkers.phantom.builder.query.InsertQuery
import com.outworkers.phantom.dsl._
import com.outworkers.phantom.keys.PartitionKey
// データ
// こちらにはPartition情報を含めない
final case class Message(id: UUID, message: String, timestamp: Long)
// テーブル
abstract class Messages extends Table[Messages, Message] {
// categoryとsubcategoryによるComposite Partition Key
object category extends StringColumn with PartitionKey
object subcategory extends StringColumn with PartitionKey
object id extends UUIDColumn with PrimaryKey
object message extends StringColumn
object timestamp extends LongColumn
// Partitionを指定して保存
def store(partition: (String, String), msg: Message): InsertQuery.Default[Messages, Message] =
insert
.value(_.category, partition._1)
.value(_.subcategory, partition._2)
.value(_.id, msg.id)
.value(_.message, msg.message)
.value(_.timestamp, msg.timestamp)
// Partitionを検索
def findByPartition(partition: (String, String)): Future[List[Message]] =
select.where(_.category eqs partition._1).and(_.subcategory eqs partition._2).fetch()
// Partition+IDで検索
def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
select
.where(_.category eqs partition._1)
.and(_.subcategory eqs partition._2)
.and(_.id eqs id)
.one()
}
数据库
我们正在努力作为提供数据库服务的供应商,以便可以在某种程度上进行模拟和其他操作(供参考)。
package com.example
import com.outworkers.phantom.connectors.ContactPoint
import com.outworkers.phantom.database.{Database, DatabaseProvider}
import com.outworkers.phantom.dsl.{CassandraConnection, KeySpace, replication, _}
import com.typesafe.config.ConfigFactory
// コネクタ(Singleton)
object Connector {
private val config = ConfigFactory.load()
config.checkValid(ConfigFactory.defaultReference(), "cassandra")
val connection: CassandraConnection =
ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
.noHeartbeat()
.noHeartbeat()
.keySpace(
KeySpace(config.getString("cassandra.keyspace"))
.ifNotExists()
.`with`(
replication eqs SimpleStrategy.replication_factor(1)
)
)
}
// データベース定義
class AppDatabase(override val connector: CassandraConnection)
extends Database[AppDatabase](connector) {
object messages extends Messages with Connector
}
// プロバイダとして提供
trait AppDatabaseProvider extends DatabaseProvider[AppDatabase]
提供的服务
我只是暂时把服务开通了一下。因为能够进行批处理,所以我尝试加上了。
我记得Cassandra的批处理有一个限制数量,但是不知道在哪里进行设置…。
package com.example
import scala.concurrent.Future
import com.outworkers.phantom.ResultSet
import com.outworkers.phantom.dsl._
trait MessageService extends AppDatabaseProvider {
def store(partition: (String, String), msg: Message): Future[ResultSet] =
db.messages.store(partition, msg).future()
def findPartition(partition: (String, String)): Future[List[Message]] =
db.messages.findByPartition(partition)
def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
db.messages.findById(partition, id)
def batchStore(partition: (String, String), messages: Message*): Future[ResultSet] =
Batch.logged.add(messages.map(msg => db.messages.store(partition, msg))).future()
}
考试
我将测试上述服务。我定义了一个连接器用于测试,并将其传递给服务。
看起来phantom可以根据指定的模型创建表格(messageService.database.create())。
由于ScalaFutures的patienceConfig的默认设置可能会超时,因此我们正在覆盖并延长它。
package com.example
import java.time.{LocalDateTime, ZoneId}
import java.util.UUID
import scala.language.reflectiveCalls
import com.datastax.driver.core.SocketOptions
import com.outworkers.phantom.connectors.{CassandraConnection, ContactPoint}
import com.outworkers.phantom.dsl._
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpecLike}
class MessageServiceSpec
extends WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with OptionValues {
// デフォルトだとタイムアウトする可能性あり
override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(200, Millis))
object TestConnector {
private val config: Config = ConfigFactory.parseString("""cassandra {
| host: "x.x.x.x" // IP指定
| port: 9042
| keyspace: "scala_cassandra_example"
|}
""".stripMargin)
val connection: CassandraConnection =
ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
.withClusterBuilder(
_.withSocketOptions(
new SocketOptions()
.setConnectTimeoutMillis(20000)
.setReadTimeoutMillis(20000)
))
.noHeartbeat()
.keySpace(
KeySpace(config.getString("cassandra.keyspace"))
.ifNotExists()
.`with`(
replication eqs SimpleStrategy.replication_factor(1)
)
)
}
object TestDatabase extends AppDatabase(TestConnector.connection)
trait TestDatabaseProvider extends AppDatabaseProvider {
override def database: AppDatabase = TestDatabase
}
val messageService = new MessageService with TestDatabaseProvider
override def beforeAll(): Unit = {
messageService.database.create()
()
}
override def afterAll(): Unit = {
messageService.database.drop()
()
}
"message service" should {
"store and find by id" in {
val partition: (String, String) = ("A", "1")
val message = Message(UUID.randomUUID(),
"Test",
LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond)
val q = for {
_ <- messageService.store(partition, message)
find <- messageService.findById(partition, message.id)
} yield find
whenReady(q) { find =>
find shouldBe defined
find.value shouldBe message
}
}
"batch store and find by partition" in {
val partition: (String, String) = ("B", "1")
val messages = Seq
.range(0, 1000)
.map(
i =>
Message(UUID.randomUUID(),
"Test" + i,
LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond))
val q = for {
_ <- messageService.batchStore(partition, messages: _*)
res <- messageService.findPartition(partition)
} yield res
whenReady(q) { res =>
res.size shouldBe 1000
}
}
}
}
It can be paraphrased as:
最后
幻影自身可能是高功能的,可以做很多事情,但由于文件很难读懂(而且可能还没有完成),所以就在这个地方妥协了。