Skip to content

Commit

Permalink
add default slot
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolysergeev committed Mar 4, 2023
1 parent fc43d27 commit a62b266
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 31 deletions.
10 changes: 3 additions & 7 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ final case class ClusterExecutor(
}

for {
key <- ZIO.attempt(command.args(1).value).orElseFail(CusterKeyError)
keySlot = Slot((key.asCRC16 % SlotsAmount).toLong)
keyOpt <- ZIO.succeed(command.args.collectFirst { case key: RespArgument.Key => key })
keySlot = keyOpt.fold(Slot.Default)(key => Slot((key.asCRC16 & (SlotsAmount - 1)).toLong))
result <- executeSafe(keySlot)
} yield result
}
Expand Down Expand Up @@ -165,12 +165,8 @@ object ClusterExecutor {
for (i <- p.slotRange.start to p.slotRange.end) yield Slot(i) -> p.master.address
}.toMap

private final val CusterKeyError =
RedisError.ProtocolError("Key doesn't found. No way to dispatch this command to Redis Cluster")
private final val CusterKeyExecutorError =
RedisError.IOError(
new IOException("Executor for key doesn't found. No way to dispatch this command to Redis Cluster")
)
RedisError.IOError(new IOException("Executor doesn't found. No way to dispatch this command to Redis Cluster"))
private final val CusterConnectionError =
RedisError.IOError(new IOException("The connection to cluster has been failed. Can't reach a single startup node."))
}
7 changes: 7 additions & 0 deletions redis/src/main/scala/zio/redis/RespArgument.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package zio.redis

import zio.Chunk
import zio.redis.RespValue.BulkString
import zio.redis.codecs.CRC16
import zio.schema.Schema
import zio.schema.codec.BinaryCodec

Expand Down Expand Up @@ -48,6 +49,12 @@ object RespArgument {

final case class Key(bytes: Chunk[Byte]) extends RespArgument {
lazy val value: BulkString = RespValue.BulkString(bytes)

lazy val asCRC16: Int = {
val betweenBraces = bytes.dropWhile(b => b != '{').drop(1).takeWhile(b => b != '}')
val key = if (betweenBraces.isEmpty) bytes else betweenBraces
CRC16.get(key)
}
}

object Key {
Expand Down
7 changes: 0 additions & 7 deletions redis/src/main/scala/zio/redis/RespValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package zio.redis

import zio._
import zio.redis.codecs.CRC16
import zio.redis.options.Cluster.Slot
import zio.stream._

Expand Down Expand Up @@ -75,12 +74,6 @@ object RespValue {
private[redis] def asString: String = decode(value)

private[redis] def asLong: Long = internal.unsafeReadLong(asString, 0)

private[redis] def asCRC16: Int = {
val betweenBraces = value.dropWhile(b => b != '{').drop(1).takeWhile(b => b != '}')
val key = if (betweenBraces.isEmpty) value else betweenBraces
CRC16.get(key)
}
}

final case class Array(values: Chunk[RespValue]) extends RespValue
Expand Down
4 changes: 4 additions & 0 deletions redis/src/main/scala/zio/redis/options/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ object Cluster {

final case class Slot(number: Long) extends AnyVal

object Slot {
val Default: Slot = Slot(1)
}

final case class Node(id: String, address: RedisUri)

final case class SlotRange(start: Long, end: Long) {
Expand Down
4 changes: 2 additions & 2 deletions redis/src/test/scala/zio/redis/ApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ object ApiSpec
sortedSetsSuite,
hyperLogLogSuite,
geoSuite,
streamsSuite @@ clusterExecutorUnsupported,
scriptingSpec @@ clusterExecutorUnsupported,
streamsSuite,
scriptingSpec,
clusterSpec
).provideShared(
ClusterExecutor.layer,
Expand Down
2 changes: 1 addition & 1 deletion redis/src/test/scala/zio/redis/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait BaseSpec extends ZIOSpecDefault {

/* TODO
* We can try to support the most unsupported commands for cluster with:
* - default connection for commands without a key and for multiple key commands with
* - [DONE] default connection for commands without a key and for multiple key commands with
* the limitation that all keys have to be in the same slot
* - fork/join approach for commands that operate on keys with different slots
*/
Expand Down
23 changes: 23 additions & 0 deletions redis/src/test/scala/zio/redis/RespArgumentSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package zio.redis

import zio.test._

object RespArgumentSpec extends BaseSpec {
def spec: Spec[Any, RedisError.ProtocolError] =
suite("RespArgument")(
suite("BulkString.asCRC16")(
test("key without braces") {
val key = RespArgument.Key("hello world")
assertTrue(15332 == key.asCRC16)
},
test("key between braces") {
val key = RespArgument.Key("hello{key1}wor}ld")
assertTrue(41957 == key.asCRC16)
},
test("empty key between braces") {
val key = RespArgument.Key("hello{}world")
assertTrue(40253 == key.asCRC16)
}
)
)
}
14 changes: 0 additions & 14 deletions redis/src/test/scala/zio/redis/RespValueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ object RespValueSpec extends BaseSpec {
.runCollect
.map(assert(_)(equalTo(values)))
}
),
suite("BulkString.asCRC16")(
test("key without braces") {
val str = RespValue.bulkString("hello world")
assertTrue(15332 == str.asCRC16)
},
test("key between braces") {
val str = RespValue.bulkString("hello{key1}wor}ld")
assertTrue(41957 == str.asCRC16)
},
test("empty key between braces") {
val str = RespValue.bulkString("hello{}world")
assertTrue(40253 == str.asCRC16)
}
)
)
}

0 comments on commit a62b266

Please sign in to comment.