用Scala(幻影)来操作Cassandra

一開始

因为我以前从未使用过Scala操作Cassandra,所以我决定尝试一下。
起初我也考虑使用Java的驱动程序,但是phantom的GitHub点赞数与其相差无几,并且它是用Scala编写的,所以我决定试试这个。

※ Cassandra在单节点上启动了Docker的官方仓库。不会对Cassandra本身进行解释。

构成

Scala版本为2.12
sbt版本为1.1
phantom版本为2.24
Cassandra版本为3.11

成果

sbt 可以用中文翻译为”才华洋溢”。

这是一个 SBT 的配置。为了从配置文件中获取 Cassandra 的 ContactPoint,我们还添加了 Lightbend 的 config。

name := "scala-cassandra-example"

version := "0.1"

scalaVersion := "2.12.5"

val phantomVersion = "2.24.2"

libraryDependencies ++= Seq(
  "com.outworkers"  %% "phantom-dsl" % phantomVersion,
  "com.outworkers"  %% "phantom-connectors" % phantomVersion,
  "com.typesafe" % "config" % "1.3.3",
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

桌子模型

Phantom可以使用O/R映射(或者直接调用CQL的话可能会感到有点麻烦)。
虽然可以按照普通方式开发,但既然是Cassandra,就考虑使用分区(复合分区键)的感觉来尝试一下。

package com.example

import java.util.UUID

import scala.concurrent.Future

import com.outworkers.phantom.Table
import com.outworkers.phantom.builder.query.InsertQuery
import com.outworkers.phantom.dsl._
import com.outworkers.phantom.keys.PartitionKey

// データ
// こちらにはPartition情報を含めない
final case class Message(id: UUID, message: String, timestamp: Long)

// テーブル
abstract class Messages extends Table[Messages, Message] {

  // categoryとsubcategoryによるComposite Partition Key
  object category extends StringColumn with PartitionKey
  object subcategory extends StringColumn with PartitionKey

  object id extends UUIDColumn with PrimaryKey
  object message extends StringColumn
  object timestamp extends LongColumn

  // Partitionを指定して保存
  def store(partition: (String, String), msg: Message): InsertQuery.Default[Messages, Message] =
    insert
      .value(_.category, partition._1)
      .value(_.subcategory, partition._2)
      .value(_.id, msg.id)
      .value(_.message, msg.message)
      .value(_.timestamp, msg.timestamp)

  // Partitionを検索
  def findByPartition(partition: (String, String)): Future[List[Message]] =
    select.where(_.category eqs partition._1).and(_.subcategory eqs partition._2).fetch()

  // Partition+IDで検索
  def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
    select
      .where(_.category eqs partition._1)
      .and(_.subcategory eqs partition._2)
      .and(_.id eqs id)
      .one()

}

数据库

我们正在努力作为提供数据库服务的供应商,以便可以在某种程度上进行模拟和其他操作(供参考)。

package com.example

import com.outworkers.phantom.connectors.ContactPoint
import com.outworkers.phantom.database.{Database, DatabaseProvider}
import com.outworkers.phantom.dsl.{CassandraConnection, KeySpace, replication, _}
import com.typesafe.config.ConfigFactory

// コネクタ(Singleton)
object Connector {

  private val config = ConfigFactory.load()

  config.checkValid(ConfigFactory.defaultReference(), "cassandra")

  val connection: CassandraConnection =
    ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
      .noHeartbeat()
      .noHeartbeat()
      .keySpace(
        KeySpace(config.getString("cassandra.keyspace"))
          .ifNotExists()
          .`with`(
            replication eqs SimpleStrategy.replication_factor(1)
          )
      )
}

// データベース定義
class AppDatabase(override val connector: CassandraConnection)
    extends Database[AppDatabase](connector) {
  object messages extends Messages with Connector
}

// プロバイダとして提供
trait AppDatabaseProvider extends DatabaseProvider[AppDatabase]

提供的服务

我只是暂时把服务开通了一下。因为能够进行批处理,所以我尝试加上了。
我记得Cassandra的批处理有一个限制数量,但是不知道在哪里进行设置…。

package com.example

import scala.concurrent.Future

import com.outworkers.phantom.ResultSet
import com.outworkers.phantom.dsl._

trait MessageService extends AppDatabaseProvider {

  def store(partition: (String, String), msg: Message): Future[ResultSet] =
    db.messages.store(partition, msg).future()

  def findPartition(partition: (String, String)): Future[List[Message]] =
    db.messages.findByPartition(partition)

  def findById(partition: (String, String), id: UUID): Future[Option[Message]] =
    db.messages.findById(partition, id)

  def batchStore(partition: (String, String), messages: Message*): Future[ResultSet] =
    Batch.logged.add(messages.map(msg => db.messages.store(partition, msg))).future()

}

考试

我将测试上述服务。我定义了一个连接器用于测试,并将其传递给服务。
看起来phantom可以根据指定的模型创建表格(messageService.database.create())。
由于ScalaFutures的patienceConfig的默认设置可能会超时,因此我们正在覆盖并延长它。

package com.example

import java.time.{LocalDateTime, ZoneId}
import java.util.UUID

import scala.language.reflectiveCalls

import com.datastax.driver.core.SocketOptions
import com.outworkers.phantom.connectors.{CassandraConnection, ContactPoint}
import com.outworkers.phantom.dsl._
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpecLike}

class MessageServiceSpec
    extends WordSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures
    with OptionValues {

  // デフォルトだとタイムアウトする可能性あり
  override implicit val patienceConfig: PatienceConfig =
    PatienceConfig(timeout = Span(5, Seconds), interval = Span(200, Millis))

  object TestConnector {
    private val config: Config = ConfigFactory.parseString("""cassandra {
        |  host: "x.x.x.x" // IP指定
        |  port: 9042
        |  keyspace: "scala_cassandra_example"
        |}
      """.stripMargin)

    val connection: CassandraConnection =
      ContactPoint(config.getString("cassandra.host"), config.getInt("cassandra.port"))
        .withClusterBuilder(
          _.withSocketOptions(
            new SocketOptions()
              .setConnectTimeoutMillis(20000)
              .setReadTimeoutMillis(20000)
          ))
        .noHeartbeat()
        .keySpace(
          KeySpace(config.getString("cassandra.keyspace"))
            .ifNotExists()
            .`with`(
              replication eqs SimpleStrategy.replication_factor(1)
            )
        )
  }

  object TestDatabase extends AppDatabase(TestConnector.connection)

  trait TestDatabaseProvider extends AppDatabaseProvider {
    override def database: AppDatabase = TestDatabase
  }

  val messageService = new MessageService with TestDatabaseProvider

  override def beforeAll(): Unit = {
    messageService.database.create()
    ()
  }

  override def afterAll(): Unit = {
    messageService.database.drop()
    ()
  }

  "message service" should {

    "store and find by id" in {
      val partition: (String, String) = ("A", "1")
      val message = Message(UUID.randomUUID(),
                            "Test",
                            LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond)
      val q = for {
        _ <- messageService.store(partition, message)
        find <- messageService.findById(partition, message.id)
      } yield find

      whenReady(q) { find =>
        find shouldBe defined
        find.value shouldBe message
      }
    }

    "batch store and find by partition" in {
      val partition: (String, String) = ("B", "1")
      val messages = Seq
        .range(0, 1000)
        .map(
          i =>
            Message(UUID.randomUUID(),
                    "Test" + i,
                    LocalDateTime.now().atZone(ZoneId.systemDefault()).toEpochSecond))

      val q = for {
        _ <- messageService.batchStore(partition, messages: _*)
        res <- messageService.findPartition(partition)
      } yield res

      whenReady(q) { res =>
        res.size shouldBe 1000
      }
    }
  }
}

It can be paraphrased as:
最后

幻影自身可能是高功能的,可以做很多事情,但由于文件很难读懂(而且可能还没有完成),所以就在这个地方妥协了。

广告
将在 10 秒后关闭
bannerAds