**将Jedis管道处理“强制”适配于Redis Cluster**

我写了下一部分。

    続・Jedisパイプライン処理を(無理やり)Redis Cluster対応させる

前提

在主机192.168.53.52上,建立6379、6380和6381端口的Redis集群配置,并创建了以下数据。
这篇文章是以此为前提的。

redis-cli -h 192.168.53.52 -p 6379 -c

192.168.53.52:6379> set key1 value1
-> Redirected to slot [9189] located at 192.168.53.52:6380
OK
192.168.53.52:6379> set key2 value2
-> Redirected to slot [4998] located at 192.168.53.52:6379
OK
192.168.53.52:6379> set key3 value3
OK
192.168.53.52:6379> get key1
-> Redirected to slot [9189] located at 192.168.53.52:6380
"value1"
192.168.53.52:6379> get key2
-> Redirected to slot [4998] located at 192.168.53.52:6379
"value2"
192.168.53.52:6379> get key3
"value3"

另外,与本题不太相关,但是我使用了scalikejdbc的LoanPattern以便使用该方法。

Jedis 2.7.2 当前的实现

尽管Java的客户端库Jedis仍在开发中,但它已支持Redis集群。

package com.zaneli.redis.jedis

import redis.clients.jedis.{HostAndPort, JedisCluster}
import scalikejdbc.using

import scala.collection.JavaConversions._

object JedisTest1 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  using(new JedisCluster(nodes)) { cluster =>
    val v1 = cluster.get("key1")
    val v2 = cluster.get("key2")
    val v3 = cluster.get("key3")
    println((v1, v2, v3))
  }
}

然而,似乎不支持管道处理,JedisCluster类中没有类似于pipelined的方法。
顺便提一下,如果尝试在不支持Redis Cluster的客户端中进行管道处理,则会引发以下异常。

package com.zaneli.redis.jedis

import redis.clients.jedis.{HostAndPort, Jedis}
import scalikejdbc.using

object JedisTest2 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  using(new Jedis(nodes.head.getHost, nodes.head.getPort)) { jedis =>
    val p = jedis.pipelined()
    val v1 = p.get("key1")
    val v2 = p.get("key2")
    val v3 = p.get("key3")
    p.sync()
    println((v1.get, v2.get, v3.get))
  }
}
Exception in thread "main" redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9189 192.168.53.52:6380
    at redis.clients.jedis.Protocol.processError(Protocol.java:108)
    at redis.clients.jedis.Protocol.process(Protocol.java:142)
    at redis.clients.jedis.Protocol.read(Protocol.java:196)
    at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)
    at redis.clients.jedis.Connection.getAll(Connection.java:258)
    at redis.clients.jedis.Connection.getAll(Connection.java:250)
    at redis.clients.jedis.Pipeline.sync(Pipeline.java:85)
    at com.zaneli.redis.jedis.JedisTest2$$anonfun$2.apply(JedisTest2.scala:16)

顺便说一下,在本例中,由于 key2 和 key3 偶然被写在同一个节点上,所以如果不包括上述的 val v1 = p.get(“key1”) 和 v1.get,读取操作就会成功。

JedisPipelinedClusterを自作してがんばる

在参考现有的JedisCluster实现的基础上,尽管有点强制性,但仍试图进行应对。

虽然这个组讨论了这个问题,但是基本上,在Redis Cluster环境中,管道处理也只能在同一个节点中进行,所以与JedisTest2.scala相比,只有稍微改进节点的特定方法才是一个折中的方案。

package com.zaneli.redis.jedis

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisSlotBasedConnectionHandler, Pipeline, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import redis.clients.util.JedisClusterCRC16
import scalikejdbc.using

import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.{Failure, Try}

class JedisPipelinedCluster(
    nodes: Set[HostAndPort],
    config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
    timeout: Int = Protocol.DEFAULT_TIMEOUT,
    maxRedirections: Int = 5) {

  private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout)

  def pipelined[A](key: String)(cmd: Pipeline => A): Try[A] =
    pipelined(key, cmd, maxRedirections)

  @tailrec
  private[this] def pipelined[A](key: String, cmd: Pipeline => A, redirections: Int): Try[A] = {
    val result = Try {
      val slot = JedisClusterCRC16.getSlot(key)
      using(handler.getConnectionFromSlot(slot)) { jedis =>
        val pipeline = jedis.pipelined()
        cmd(pipeline)
      }
    }
    result match {
      case Failure(e: JedisMovedDataException) =>
        if (redirections <= 0) {
          Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
        } else {
          pipelined(key, cmd, redirections - 1)
        }
      case r => r
    }
  }
}

package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

import scala.util.{Failure, Success}

object JedisTest3 extends App {

  val nodes = Set(6379, 6380, 6381).map { port =>
    new HostAndPort("192.168.53.52", port)
  }
  val cluster = new JedisPipelinedCluster(nodes)
  val result = cluster.pipelined("key2") { p =>
    val v2 = p.get("key2")
    val v3 = p.get("key3")
    p.sync()
    (v2.get, v3.get)
  }
  result match {
    case Success((v2, v3)) => println((v2, v3))
    case Failure(e) => e.printStackTrace()
  }
}

嗯,最终还是很遗憾需要单独传递节点特定的键值…
如果传递 cluster.pipelined(“key1”) 然后执行 p.get(“key2”),最终还是会出现 JedisClusterMaxRedirectionsException 异常。

再多一点实用

Redis Cluster的插槽可以将关注点放在{…}中而不是整个键上。
换句话说,如果想要将多个键写到同一个节点,可以将键的格式设置为{…},并使其中的字符串匹配即可。

有这样的case class,想要将每个值都写入Redis。

package com.zaneli.redis.jedis

import scala.collection.JavaConversions._
import scala.util.Try

case class SuperWrestler(id: Int, name: String, power: Int, favoriteHolds: List[String], family: Map[String, String])

object SuperWrestler extends RedisAccessor {
  override def prefix(id: Int): String = s"sw$id"

  def read(id: Int)(implicit cluster: JedisPipelinedCluster): Try[SuperWrestler] = {
    cluster.pipelined(prefix(id)) { p =>
      val name = p.get(key(id, "name"))
      val power = p.get(key(id, "power"))
      val favoriteHolds = p.lrange(key(id, "favoriteHolds"), 0, -1)
      val family = p.hgetAll(key(id, "family"))
      p.sync()
      SuperWrestler(id, name.get, power.get.toInt, favoriteHolds.get.toList, family.get.toMap)
    }
  }

  def write(sw: SuperWrestler)(implicit cluster: JedisPipelinedCluster): Boolean = {
    cluster.pipelined(prefix(sw.id)) { p =>
      p.set(key(sw.id, "name"), sw.name)
      p.set(key(sw.id, "power"), sw.power.toString)
      p.del(key(sw.id, "favoriteHolds"), key(sw.id, "family"))
      sw.favoriteHolds.map(p.rpush(key(sw.id, "favoriteHolds"), _))
      sw.family.map { case (k, v) => p.hset(key(sw.id, "family"), k, v) }
      p.sync()
    }.isSuccess
  }
}

trait RedisAccessor {
  def prefix(id: Int): String
  def key(id: Int, name: String): String = s"{${prefix(id)}}$name"
}

package com.zaneli.redis.jedis

import redis.clients.jedis.HostAndPort

object SuperWrestlerTest {

  def main(args: Array[String]) {
    val nodes = Set(6379, 6380, 6381).map { port =>
      new HostAndPort("192.168.53.52", port)
    }
    implicit val cluster = new JedisPipelinedCluster(nodes)

    val kinnikuman = SuperWrestler(
      1, "キン肉マン", 95, List("キン肉バスター", "キン肉ドライバー", "マッスルスパーク"), Map("父" -> "キン肉真弓", "母" -> "キン肉小百合"))

    val terryman = SuperWrestler(
      2, "テリーマン", 95, List("スピニングトゥホールド", "カーフブランディング", "テキサスコンドルキック"), Map("父" -> "ドリーマン"))

    val robinMask = SuperWrestler(
      3, "ロビンマスク", 96, List("タワーブリッジ", "ロビンスペシャル"), Map("父" -> "ロビンナイト"))

    Seq(kinnikuman, terryman, robinMask) foreach SuperWrestler.write

    (1 to 3) foreach (i => println(SuperWrestler.read(i)))
  }
}

使用SuperWrestler时,请固定在SuperWrestler内的前缀,并将所有SuperWrestler的信息写入同一个节点。
如果是这种情况,使用我们创建的JedisPipelinedCluster可能会在SuperWrestler的读写过程中更加便利吧…?

顺便提一下,这个例子中金肉人使用了6381端口,特瑞曼使用了6379端口,罗宾面具使用了6380端口。 🙂

广告
将在 10 秒后关闭
bannerAds