**将Jedis管道处理“强制”适配于Redis Cluster**
我写了下一部分。
- 続・Jedisパイプライン処理を(無理やり)Redis Cluster対応させる
前提
在主机192.168.53.52上,建立6379、6380和6381端口的Redis集群配置,并创建了以下数据。
这篇文章是以此为前提的。
redis-cli -h 192.168.53.52 -p 6379 -c
192.168.53.52:6379> set key1 value1
-> Redirected to slot [9189] located at 192.168.53.52:6380
OK
192.168.53.52:6379> set key2 value2
-> Redirected to slot [4998] located at 192.168.53.52:6379
OK
192.168.53.52:6379> set key3 value3
OK
192.168.53.52:6379> get key1
-> Redirected to slot [9189] located at 192.168.53.52:6380
"value1"
192.168.53.52:6379> get key2
-> Redirected to slot [4998] located at 192.168.53.52:6379
"value2"
192.168.53.52:6379> get key3
"value3"
另外,与本题不太相关,但是我使用了scalikejdbc的LoanPattern以便使用该方法。
Jedis 2.7.2 当前的实现
尽管Java的客户端库Jedis仍在开发中,但它已支持Redis集群。
package com.zaneli.redis.jedis
import redis.clients.jedis.{HostAndPort, JedisCluster}
import scalikejdbc.using
import scala.collection.JavaConversions._
object JedisTest1 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
using(new JedisCluster(nodes)) { cluster =>
val v1 = cluster.get("key1")
val v2 = cluster.get("key2")
val v3 = cluster.get("key3")
println((v1, v2, v3))
}
}
然而,似乎不支持管道处理,JedisCluster类中没有类似于pipelined的方法。
顺便提一下,如果尝试在不支持Redis Cluster的客户端中进行管道处理,则会引发以下异常。
package com.zaneli.redis.jedis
import redis.clients.jedis.{HostAndPort, Jedis}
import scalikejdbc.using
object JedisTest2 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
using(new Jedis(nodes.head.getHost, nodes.head.getPort)) { jedis =>
val p = jedis.pipelined()
val v1 = p.get("key1")
val v2 = p.get("key2")
val v3 = p.get("key3")
p.sync()
println((v1.get, v2.get, v3.get))
}
}
Exception in thread "main" redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 9189 192.168.53.52:6380
at redis.clients.jedis.Protocol.processError(Protocol.java:108)
at redis.clients.jedis.Protocol.process(Protocol.java:142)
at redis.clients.jedis.Protocol.read(Protocol.java:196)
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:288)
at redis.clients.jedis.Connection.getAll(Connection.java:258)
at redis.clients.jedis.Connection.getAll(Connection.java:250)
at redis.clients.jedis.Pipeline.sync(Pipeline.java:85)
at com.zaneli.redis.jedis.JedisTest2$$anonfun$2.apply(JedisTest2.scala:16)
顺便说一下,在本例中,由于 key2 和 key3 偶然被写在同一个节点上,所以如果不包括上述的 val v1 = p.get(“key1”) 和 v1.get,读取操作就会成功。
JedisPipelinedClusterを自作してがんばる
在参考现有的JedisCluster实现的基础上,尽管有点强制性,但仍试图进行应对。
虽然这个组讨论了这个问题,但是基本上,在Redis Cluster环境中,管道处理也只能在同一个节点中进行,所以与JedisTest2.scala相比,只有稍微改进节点的特定方法才是一个折中的方案。
package com.zaneli.redis.jedis
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisSlotBasedConnectionHandler, Pipeline, Protocol}
import redis.clients.jedis.exceptions.{JedisClusterMaxRedirectionsException, JedisMovedDataException}
import redis.clients.util.JedisClusterCRC16
import scalikejdbc.using
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.{Failure, Try}
class JedisPipelinedCluster(
nodes: Set[HostAndPort],
config: GenericObjectPoolConfig = new GenericObjectPoolConfig(),
timeout: Int = Protocol.DEFAULT_TIMEOUT,
maxRedirections: Int = 5) {
private[this] val handler = new JedisSlotBasedConnectionHandler(nodes, config, timeout)
def pipelined[A](key: String)(cmd: Pipeline => A): Try[A] =
pipelined(key, cmd, maxRedirections)
@tailrec
private[this] def pipelined[A](key: String, cmd: Pipeline => A, redirections: Int): Try[A] = {
val result = Try {
val slot = JedisClusterCRC16.getSlot(key)
using(handler.getConnectionFromSlot(slot)) { jedis =>
val pipeline = jedis.pipelined()
cmd(pipeline)
}
}
result match {
case Failure(e: JedisMovedDataException) =>
if (redirections <= 0) {
Failure(new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"))
} else {
pipelined(key, cmd, redirections - 1)
}
case r => r
}
}
}
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
import scala.util.{Failure, Success}
object JedisTest3 extends App {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
val cluster = new JedisPipelinedCluster(nodes)
val result = cluster.pipelined("key2") { p =>
val v2 = p.get("key2")
val v3 = p.get("key3")
p.sync()
(v2.get, v3.get)
}
result match {
case Success((v2, v3)) => println((v2, v3))
case Failure(e) => e.printStackTrace()
}
}
嗯,最终还是很遗憾需要单独传递节点特定的键值…
如果传递 cluster.pipelined(“key1”) 然后执行 p.get(“key2”),最终还是会出现 JedisClusterMaxRedirectionsException 异常。
再多一点实用
Redis Cluster的插槽可以将关注点放在{…}中而不是整个键上。
换句话说,如果想要将多个键写到同一个节点,可以将键的格式设置为{…},并使其中的字符串匹配即可。
有这样的case class,想要将每个值都写入Redis。
package com.zaneli.redis.jedis
import scala.collection.JavaConversions._
import scala.util.Try
case class SuperWrestler(id: Int, name: String, power: Int, favoriteHolds: List[String], family: Map[String, String])
object SuperWrestler extends RedisAccessor {
override def prefix(id: Int): String = s"sw$id"
def read(id: Int)(implicit cluster: JedisPipelinedCluster): Try[SuperWrestler] = {
cluster.pipelined(prefix(id)) { p =>
val name = p.get(key(id, "name"))
val power = p.get(key(id, "power"))
val favoriteHolds = p.lrange(key(id, "favoriteHolds"), 0, -1)
val family = p.hgetAll(key(id, "family"))
p.sync()
SuperWrestler(id, name.get, power.get.toInt, favoriteHolds.get.toList, family.get.toMap)
}
}
def write(sw: SuperWrestler)(implicit cluster: JedisPipelinedCluster): Boolean = {
cluster.pipelined(prefix(sw.id)) { p =>
p.set(key(sw.id, "name"), sw.name)
p.set(key(sw.id, "power"), sw.power.toString)
p.del(key(sw.id, "favoriteHolds"), key(sw.id, "family"))
sw.favoriteHolds.map(p.rpush(key(sw.id, "favoriteHolds"), _))
sw.family.map { case (k, v) => p.hset(key(sw.id, "family"), k, v) }
p.sync()
}.isSuccess
}
}
trait RedisAccessor {
def prefix(id: Int): String
def key(id: Int, name: String): String = s"{${prefix(id)}}$name"
}
package com.zaneli.redis.jedis
import redis.clients.jedis.HostAndPort
object SuperWrestlerTest {
def main(args: Array[String]) {
val nodes = Set(6379, 6380, 6381).map { port =>
new HostAndPort("192.168.53.52", port)
}
implicit val cluster = new JedisPipelinedCluster(nodes)
val kinnikuman = SuperWrestler(
1, "キン肉マン", 95, List("キン肉バスター", "キン肉ドライバー", "マッスルスパーク"), Map("父" -> "キン肉真弓", "母" -> "キン肉小百合"))
val terryman = SuperWrestler(
2, "テリーマン", 95, List("スピニングトゥホールド", "カーフブランディング", "テキサスコンドルキック"), Map("父" -> "ドリーマン"))
val robinMask = SuperWrestler(
3, "ロビンマスク", 96, List("タワーブリッジ", "ロビンスペシャル"), Map("父" -> "ロビンナイト"))
Seq(kinnikuman, terryman, robinMask) foreach SuperWrestler.write
(1 to 3) foreach (i => println(SuperWrestler.read(i)))
}
}
使用SuperWrestler时,请固定在SuperWrestler内的前缀,并将所有SuperWrestler的信息写入同一个节点。
如果是这种情况,使用我们创建的JedisPipelinedCluster可能会在SuperWrestler的读写过程中更加便利吧…?
顺便提一下,这个例子中金肉人使用了6381端口,特瑞曼使用了6379端口,罗宾面具使用了6380端口。 🙂