我尝试在PlayFramework2.6中访问Redis

首先

这是使用PlayFramework进行学习系列的第三部分。这次我们将尝试访问Redis。

有许多可以从Scala访问Redis的库,但是今次我们选择了scredis,因为它使用起来很容易。

另外,由于我正在学习Scala和Play Framework,所以如果有任何不足之处,请您谅解并提出建议。

实施

前提 tí)

本次使用的各个版本如下所示。

名前バージョンScala2.12.4PlayFramework2.6.7Redis3.0.6scredis2.1.1

Redis 是一种开源的内存数据库系统。

在本次中,我们假设Redis是一个非集群的主读副本结构。

构建文件.sbt

在 build.sbt 中添加所需的库。

// scredis
resolvers += Resolver.bintrayRepo("jastice", "maven")

libraryDependencies ++= Seq(
  "com.github.scredis" %% "scredis" % "2.1.1"
)

Redis访问

关于连接到Redis的方法,这次我们会使用工厂模式来创建scredis.Redis对象。

工厂本身在Module类中注册为单例,并在注入时使用。
单例的定义如下:

package services

import javax.inject.{Inject, Singleton}

import play.api.Configuration
import scredis.Redis

import scala.util.Random

// PrimaryかRead Replicaへのアクセスかを識別するためのtype
sealed trait RedisType
object RedisTypes {
  class WriterType extends RedisType
  class ReaderType extends RedisType

  implicit case object RedisWriterObject extends WriterType
  implicit case object RedisReaderObject extends ReaderType
}

// 公開されたtrait
trait RedisConnector {
  def redis[T <: RedisType](implicit redisType: T): Redis
}

// 実装クラス
@Singleton
class DefaultRedisConnector @Inject()(config: Configuration)
  extends RedisConnector
{
  // configの読み込み
  private val defaultPort = config.getOptional[Int]("redis.port").getOrElse(6379)
  private val writerHost = config.getOptional[String]("redis.writer.host").get
  private val (writerHostName, writerPort) = splitHostAndPort(writerHost)
  private val readerHosts =
    config.getOptional[String]("redis.reader.host").getOrElse(writerHost).split(",").map(splitHostAndPort)

  // read replicaが複数あるときのrandomizer
  private lazy val randomizer = Random

  // ReaderType, WriterTypeの読み込み
  import RedisTypes._

  override def redis[T <: RedisType](implicit redisType: T): Redis = {
    redisType match {
      case _: WriterType => writerClient
      case _: ReaderType => selectReaderClient
    }
  }

  private def splitHostAndPort(host: String): (String, Int) =
    host.split(":") match {
      case Array(hostname, port) => (hostname, port.toInt)
      case Array(hostname)       => (hostname, defaultPort)
    }

  private def writerClient: Redis = Redis(writerHostName, writerPort)

  private def selectReaderClient: Redis = {
    readerHosts.length match {
      // To-Do: case 0
      case 1 => readerClient(0)
      case n if n > 1 => readerClient(randomizer.nextInt(n))
    }
  }

  private def readerClient(n: Int): Redis = {
    val (hostName, port) = readerHosts(n)
    Redis(hostName, port)
  }
}

模块

将上述的类注册到 Module 类中。

import services._

class Module extends AbstractModule {

  override def configure(): Unit = {
    bind(classOf[RedisConnector]).to(classOf[DefaultRedisConnector])
  }
}

application.conf 应用配置文件

Redis服务器的连接主机应在application.conf文件中进行配置。
另外,可以指定多个读取副本(如果有多个指定,则随机选择)。

redis {
  port: 6379
  writer {
    host: "redis.hogehoge.ng.0001.apne1.cache.amazonaws.com"
  }
  reader {
    host: "redis-002.hogehoge.0001.apne1.cache.amazonaws.com,redis-003.hogehoge.0001.apne1.cache.amazonaws.com"
  }
}

控制器的使用方法

package controllers

import java.nio.charset.StandardCharsets
import javax.inject._

import play.api.mvc._
import scredis.serialization.{Reader, Writer}
import services.RedisConnector

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}

@Singleton
class HogeController @Inject()(cc: ControllerComponents,
                               rc: RedisConnector,
                               implicit val ec: ExecutionContext) extends AbstractController(cc)
{
  import services.RedisTypes._

  // Redisに格納されている(String, Int)をGetするためのReader。なお、文字列に","は含まれていないものとする(手抜き)
  implicit val stringIntReader: Reader[(String, Int)] = { (bytes: Array[Byte]) =>
    new String(bytes, StandardCharsets.UTF_8).split(",") match {
      case Array(strValue, intValue) => (strValue, intValue) 
    }
  }

  // Redisに(String, Int)をPutするためのWriter
  implicit val stringIntWriter: Writer[(String, Int)] = { (value: (String, Int)) =>
    (value._1.toString + "," + value._2.toString).getBytes(StandardChasets.UTF_8)

  // ReadReplicaからの読み込み(stringIntReaderが暗黙的に参照される)
  private def getValue: Future[Option[(String, Int)]] =
    rc.redis[ReaderType].get[(String, Int)]("hogehoge_key")

  // Masterへの書き込み(stringIntWriterが暗黙的に参照される)
  private def setValue(textValue: String, intValue: Int): Future[Unit] =
    rc.redis[WriterType].set("hogehoge_key", (textValue, intValue))
}

DNS缓存 (额外奖励)

在AWS环境中,如果DNS查询在短时间内集中,那么会从中途开始返回错误(我在PHP中引用ElastiCache时就遇到了这个问题)。

(参考)https://dev.classmethod.jp/cloud/aws/amazon-dns-threshold-exceeded-action/ 相关文章提供了解决Amazon DNS阈值超过的操作选项。

然而,基于akka.io和java的InetAddress的scredis似乎可以避免此问题,因为它们会在本地进行缓存。但是,如果默认的30秒间隔太长的话,可能需要缩短它。

我会定义一个独立的ApplicationLoader,在这里修改常量。在下面的例子中,我将DNS查询成功和失败时都改为2秒。

import java.security.Security

import play.api.ApplicationLoader
import play.api.inject.guice.{GuiceApplicationBuilder, GuiceApplicationLoader}
class MyApplicationLoader extends GuiceApplicationLoader {
  override protected def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
    Security.setProperty("networkaddress.cache.ttl", "2")
    Security.setProperty("networkaddress.cache.negative.ttl", "2")
    super.builder(context)
  }
}
play.application {
  loader=MyApplicationLoader
}
广告
将在 10 秒后关闭
bannerAds