Skip to content

Commit

Permalink
add default connection for commands without a key and for multiple ke…
Browse files Browse the repository at this point in the history
…y commands with the limitation that all keys have to be in the same slot

zio#673
  • Loading branch information
anatolysergeev committed Feb 12, 2023
1 parent 636a4d7 commit dd77a80
Show file tree
Hide file tree
Showing 22 changed files with 560 additions and 427 deletions.
12 changes: 4 additions & 8 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final case class ClusterExecutor(
scope: Scope.Closeable
) extends RedisExecutor {

def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] = {
def execute(command: RespCommand): IO[RedisError, RespValue] = {

def execute(keySlot: Slot) =
for {
Expand All @@ -58,8 +58,8 @@ final case class ClusterExecutor(
}

for {
key <- ZIO.attempt(command(1)).orElseFail(CusterKeyError)
keySlot = Slot((key.asCRC16 % SlotsAmount).toLong)
keyOpt <- ZIO.succeed(command.respArgs.collectFirst { case key: RespArgument.Key => key })
keySlot = keyOpt.fold(Slot.Default)(key => Slot((key.asCRC16 % SlotsAmount).toLong))
result <- executeSafe(keySlot)
} yield result
}
Expand Down Expand Up @@ -165,12 +165,8 @@ object ClusterExecutor {
for (i <- p.slotRange.start to p.slotRange.end) yield Slot(i) -> p.master.address
}.toMap

private final val CusterKeyError =
RedisError.ProtocolError("Key doesn't found. No way to dispatch this command to Redis Cluster")
private final val CusterKeyExecutorError =
RedisError.IOError(
new IOException("Executor for key doesn't found. No way to dispatch this command to Redis Cluster")
)
RedisError.IOError(new IOException("Executor doesn't found. No way to dispatch this command to Redis Cluster"))
private final val CusterConnectionError =
RedisError.IOError(new IOException("The connection to cluster has been failed. Can't reach a single startup node."))
}
491 changes: 269 additions & 222 deletions redis/src/main/scala/zio/redis/Input.scala

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions redis/src/main/scala/zio/redis/RedisCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package zio.redis

import zio._
import zio.redis.Input.{StringInput, Varargs}
import zio.redis.Input.{CommandNameInput, Varargs}
import zio.schema.codec.BinaryCodec

final class RedisCommand[-In, +Out] private (
Expand All @@ -34,8 +34,8 @@ final class RedisCommand[-In, +Out] private (
.flatMap[Any, Throwable, Out](out => ZIO.attempt(output.unsafeDecode(out)(codec)))
.refineToOrDie[RedisError]

private[redis] def resp(in: In): Chunk[RespValue.BulkString] =
Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
private[redis] def resp(in: In): RespCommand =
Varargs(CommandNameInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
}

object RedisCommand {
Expand Down
4 changes: 2 additions & 2 deletions redis/src/main/scala/zio/redis/RedisExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package zio.redis

import zio.{Chunk, IO, ZLayer}
import zio.{IO, ZLayer}

trait RedisExecutor {
def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue]
def execute(command: RespCommand): IO[RedisError, RespValue]
}

object RedisExecutor {
Expand Down
68 changes: 67 additions & 1 deletion redis/src/main/scala/zio/redis/RespValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,75 @@ import zio._
import zio.redis.codecs.CRC16
import zio.redis.options.Cluster.Slot
import zio.stream._

import java.nio.charset.StandardCharsets

import zio.redis.RespValue.BulkString
import zio.schema.Schema
import zio.schema.codec.BinaryCodec

final class RespCommand(val respArgs: Chunk[RespArgument]) {
def ++(that: RespCommand): RespCommand = RespCommand(this.respArgs ++ that.respArgs)

def transformArgs(f: RespArgument => RespArgument): RespCommand = RespCommand(respArgs.map(f(_)))
}

object RespCommand {

def empty: RespCommand = new RespCommand(Chunk.empty)

def apply(respArgs: Chunk[RespArgument]): RespCommand = new RespCommand(respArgs)

def apply(respArgs: RespArgument*): RespCommand = new RespCommand(Chunk.fromIterable(respArgs))

def apply(respArg: RespArgument): RespCommand = new RespCommand(Chunk.single(respArg))
}

sealed trait RespArgument {
def respValue: RespValue.BulkString
}

object RespArgument {

case class Unknown(bytes: Chunk[Byte]) extends RespArgument {
lazy val respValue: BulkString = RespValue.BulkString(bytes)
}

object Unknown {
def apply(str: String): Unknown = Unknown(Chunk.fromArray(str.getBytes(StandardCharsets.UTF_8)))
def apply[A](data: A)(implicit codec: BinaryCodec, schema: Schema[A]): Key = Key(codec.encode(schema)(data))
}

case class CommandName(str: String) extends RespArgument {
lazy val respValue: BulkString = RespValue.bulkString(str)
}

case class Literal(str: String) extends RespArgument {
lazy val respValue: BulkString = RespValue.bulkString(str)
}

case class Key(bytes: Chunk[Byte]) extends RespArgument {
lazy val respValue: BulkString = RespValue.BulkString(bytes)

lazy val asCRC16: Int = {
val betweenBraces = bytes.dropWhile(b => b != '{').drop(1).takeWhile(b => b != '}')
val key = if (betweenBraces.isEmpty) bytes else betweenBraces
CRC16.get(key)
}
}

object Key {
def apply[A](data: A)(implicit codec: BinaryCodec, schema: Schema[A]): Key = Key(codec.encode(schema)(data))
}

case class Value(bytes: Chunk[Byte]) extends RespArgument {
lazy val respValue: BulkString = RespValue.BulkString(bytes)
}

object Value {
def apply[A](data: A)(implicit codec: BinaryCodec, schema: Schema[A]): Value = Value(codec.encode(schema)(data))
}
}

sealed trait RespValue extends Product with Serializable { self =>
import RespValue._
import RespValue.internal.{CrLf, Headers, NullArrayEncoded, NullStringEncoded}
Expand Down
4 changes: 2 additions & 2 deletions redis/src/main/scala/zio/redis/SingleNodeExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ final class SingleNodeExecutor(
) extends RedisExecutor {

// TODO NodeExecutor doesn't throw connection errors, timeout errors, it is hanging forever
def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] =
def execute(command: RespCommand): IO[RedisError, RespValue] =
Promise
.make[RedisError, RespValue]
.flatMap(promise => reqQueue.offer(Request(command, promise)) *> promise.await)
.flatMap(promise => reqQueue.offer(Request(command.respArgs.map(_.respValue), promise)) *> promise.await)

/**
* Opens a connection to the server and launches send and receive operations. All failures are retried by opening a
Expand Down
16 changes: 8 additions & 8 deletions redis/src/main/scala/zio/redis/api/Geo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ trait Geo extends RedisEnvironment {
): IO[RedisError, Long] = {
val command = RedisCommand(
GeoAdd,
Tuple2(ArbitraryInput[K](), NonEmptyList(Tuple2(LongLatInput, ArbitraryInput[M]()))),
Tuple2(ArbitraryKeyInput[K](), NonEmptyList(Tuple2(LongLatInput, ArbitraryInput[M]()))),
LongOutput,
codec,
executor
Expand Down Expand Up @@ -74,7 +74,7 @@ trait Geo extends RedisEnvironment {
): IO[RedisError, Option[Double]] = {
val command = RedisCommand(
GeoDist,
Tuple4(ArbitraryInput[K](), ArbitraryInput[M](), ArbitraryInput[M](), OptionalInput(RadiusUnitInput)),
Tuple4(ArbitraryKeyInput[K](), ArbitraryInput[M](), ArbitraryInput[M](), OptionalInput(RadiusUnitInput)),
OptionalOutput(DoubleOutput),
codec,
executor
Expand Down Expand Up @@ -102,7 +102,7 @@ trait Geo extends RedisEnvironment {
): IO[RedisError, Chunk[Option[String]]] = {
val command = RedisCommand(
GeoHash,
Tuple2(ArbitraryInput[K](), NonEmptyList(ArbitraryInput[M]())),
Tuple2(ArbitraryKeyInput[K](), NonEmptyList(ArbitraryInput[M]())),
ChunkOutput(OptionalOutput(MultiStringOutput)),
codec,
executor
Expand All @@ -129,7 +129,7 @@ trait Geo extends RedisEnvironment {
members: M*
): IO[RedisError, Chunk[Option[LongLat]]] = {
val command =
RedisCommand(GeoPos, Tuple2(ArbitraryInput[K](), NonEmptyList(ArbitraryInput[M]())), GeoOutput, codec, executor)
RedisCommand(GeoPos, Tuple2(ArbitraryKeyInput[K](), NonEmptyList(ArbitraryInput[M]())), GeoOutput, codec, executor)
command.run((key, (member, members.toList)))
}

Expand Down Expand Up @@ -172,7 +172,7 @@ trait Geo extends RedisEnvironment {
val command = RedisCommand(
GeoRadius,
Tuple9(
ArbitraryInput[K](),
ArbitraryKeyInput[K](),
LongLatInput,
DoubleInput,
RadiusUnitInput,
Expand Down Expand Up @@ -235,7 +235,7 @@ trait Geo extends RedisEnvironment {
val command = RedisCommand(
GeoRadius,
Tuple11(
ArbitraryInput[K](),
ArbitraryKeyInput[K](),
LongLatInput,
DoubleInput,
RadiusUnitInput,
Expand Down Expand Up @@ -295,7 +295,7 @@ trait Geo extends RedisEnvironment {
val command = RedisCommand(
GeoRadiusByMember,
Tuple9(
ArbitraryInput[K](),
ArbitraryKeyInput[K](),
ArbitraryInput[M](),
DoubleInput,
RadiusUnitInput,
Expand Down Expand Up @@ -358,7 +358,7 @@ trait Geo extends RedisEnvironment {
val command = RedisCommand(
GeoRadiusByMember,
Tuple11(
ArbitraryInput[K](),
ArbitraryKeyInput[K](),
ArbitraryInput[M](),
DoubleInput,
RadiusUnitInput,
Expand Down
34 changes: 17 additions & 17 deletions redis/src/main/scala/zio/redis/api/Hashes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait Hashes extends RedisEnvironment {
*/
final def hDel[K: Schema, F: Schema](key: K, field: F, fields: F*): IO[RedisError, Long] = {
val command =
RedisCommand(HDel, Tuple2(ArbitraryInput[K](), NonEmptyList(ArbitraryInput[F]())), LongOutput, codec, executor)
RedisCommand(HDel, Tuple2(ArbitraryKeyInput[K](), NonEmptyList(ArbitraryInput[F]())), LongOutput, codec, executor)
command.run((key, (field, fields.toList)))
}

Expand All @@ -55,7 +55,7 @@ trait Hashes extends RedisEnvironment {
* true if the field exists, otherwise false.
*/
final def hExists[K: Schema, F: Schema](key: K, field: F): IO[RedisError, Boolean] = {
val command = RedisCommand(HExists, Tuple2(ArbitraryInput[K](), ArbitraryInput[F]()), BoolOutput, codec, executor)
val command = RedisCommand(HExists, Tuple2(ArbitraryKeyInput[K](), ArbitraryInput[F]()), BoolOutput, codec, executor)
command.run((key, field))
}

Expand All @@ -74,7 +74,7 @@ trait Hashes extends RedisEnvironment {
def returning[V: Schema]: IO[RedisError, Option[V]] =
RedisCommand(
HGet,
Tuple2(ArbitraryInput[K](), ArbitraryInput[F]()),
Tuple2(ArbitraryKeyInput[K](), ArbitraryInput[F]()),
OptionalOutput(ArbitraryOutput[V]()),
codec,
executor
Expand All @@ -95,7 +95,7 @@ trait Hashes extends RedisEnvironment {
val command =
RedisCommand(
HGetAll,
ArbitraryInput[K](),
ArbitraryKeyInput[K](),
KeyValueOutput(ArbitraryOutput[F](), ArbitraryOutput[V]()),
codec,
executor
Expand All @@ -119,7 +119,7 @@ trait Hashes extends RedisEnvironment {
*/
final def hIncrBy[K: Schema, F: Schema](key: K, field: F, increment: Long): IO[RedisError, Long] = {
val command =
RedisCommand(HIncrBy, Tuple3(ArbitraryInput[K](), ArbitraryInput[F](), LongInput), LongOutput, codec, executor)
RedisCommand(HIncrBy, Tuple3(ArbitraryKeyInput[K](), ArbitraryInput[F](), LongInput), LongOutput, codec, executor)
command.run((key, field, increment))
}

Expand All @@ -144,7 +144,7 @@ trait Hashes extends RedisEnvironment {
val command =
RedisCommand(
HIncrByFloat,
Tuple3(ArbitraryInput[K](), ArbitraryInput[F](), DoubleInput),
Tuple3(ArbitraryKeyInput[K](), ArbitraryInput[F](), DoubleInput),
DoubleOutput,
codec,
executor
Expand All @@ -163,7 +163,7 @@ trait Hashes extends RedisEnvironment {
final def hKeys[K: Schema](key: K): ResultBuilder1[Chunk] =
new ResultBuilder1[Chunk] {
def returning[F: Schema]: IO[RedisError, Chunk[F]] =
RedisCommand(HKeys, ArbitraryInput[K](), ChunkOutput(ArbitraryOutput[F]()), codec, executor).run(key)
RedisCommand(HKeys, ArbitraryKeyInput[K](), ChunkOutput(ArbitraryOutput[F]()), codec, executor).run(key)
}

/**
Expand All @@ -175,7 +175,7 @@ trait Hashes extends RedisEnvironment {
* number of fields.
*/
final def hLen[K: Schema](key: K): IO[RedisError, Long] = {
val command = RedisCommand(HLen, ArbitraryInput[K](), LongOutput, codec, executor)
val command = RedisCommand(HLen, ArbitraryKeyInput[K](), LongOutput, codec, executor)
command.run(key)
}

Expand All @@ -200,7 +200,7 @@ trait Hashes extends RedisEnvironment {
def returning[V: Schema]: IO[RedisError, Chunk[Option[V]]] = {
val command = RedisCommand(
HmGet,
Tuple2(ArbitraryInput[K](), NonEmptyList(ArbitraryInput[F]())),
Tuple2(ArbitraryKeyInput[K](), NonEmptyList(ArbitraryInput[F]())),
ChunkOutput(OptionalOutput(ArbitraryOutput[V]())),
codec,
executor
Expand Down Expand Up @@ -229,7 +229,7 @@ trait Hashes extends RedisEnvironment {
): IO[RedisError, Unit] = {
val command = RedisCommand(
HmSet,
Tuple2(ArbitraryInput[K](), NonEmptyList(Tuple2(ArbitraryInput[F](), ArbitraryInput[V]()))),
Tuple2(ArbitraryKeyInput[K](), NonEmptyList(Tuple2(ArbitraryInput[F](), ArbitraryInput[V]()))),
UnitOutput,
codec,
executor
Expand Down Expand Up @@ -261,7 +261,7 @@ trait Hashes extends RedisEnvironment {
def returning[F: Schema, V: Schema]: IO[RedisError, (Long, Chunk[(F, V)])] = {
val command = RedisCommand(
HScan,
Tuple4(ArbitraryInput[K](), LongInput, OptionalInput(PatternInput), OptionalInput(CountInput)),
Tuple4(ArbitraryKeyInput[K](), LongInput, OptionalInput(PatternInput), OptionalInput(CountInput)),
Tuple2Output(ArbitraryOutput[Long](), ChunkTuple2Output(ArbitraryOutput[F](), ArbitraryOutput[V]())),
codec,
executor
Expand Down Expand Up @@ -289,7 +289,7 @@ trait Hashes extends RedisEnvironment {
): IO[RedisError, Long] = {
val command = RedisCommand(
HSet,
Tuple2(ArbitraryInput[K](), NonEmptyList(Tuple2(ArbitraryInput[F](), ArbitraryInput[V]()))),
Tuple2(ArbitraryKeyInput[K](), NonEmptyList(Tuple2(ArbitraryInput[F](), ArbitraryInput[V]()))),
LongOutput,
codec,
executor
Expand Down Expand Up @@ -317,7 +317,7 @@ trait Hashes extends RedisEnvironment {
val command =
RedisCommand(
HSetNx,
Tuple3(ArbitraryInput[K](), ArbitraryInput[F](), ArbitraryInput[V]()),
Tuple3(ArbitraryKeyInput[K](), ArbitraryInput[F](), ArbitraryInput[V]()),
BoolOutput,
codec,
executor
Expand All @@ -336,7 +336,7 @@ trait Hashes extends RedisEnvironment {
* string length of the value in field, or zero if either field or key do not exist.
*/
final def hStrLen[K: Schema, F: Schema](key: K, field: F): IO[RedisError, Long] = {
val command = RedisCommand(HStrLen, Tuple2(ArbitraryInput[K](), ArbitraryInput[F]()), LongOutput, codec, executor)
val command = RedisCommand(HStrLen, Tuple2(ArbitraryKeyInput[K](), ArbitraryInput[F]()), LongOutput, codec, executor)
command.run((key, field))
}

Expand All @@ -351,7 +351,7 @@ trait Hashes extends RedisEnvironment {
final def hVals[K: Schema](key: K): ResultBuilder1[Chunk] =
new ResultBuilder1[Chunk] {
def returning[V: Schema]: IO[RedisError, Chunk[V]] =
RedisCommand(HVals, ArbitraryInput[K](), ChunkOutput(ArbitraryOutput[V]()), codec, executor).run(key)
RedisCommand(HVals, ArbitraryKeyInput[K](), ChunkOutput(ArbitraryOutput[V]()), codec, executor).run(key)
}

/**
Expand All @@ -365,7 +365,7 @@ trait Hashes extends RedisEnvironment {
final def hRandField[K: Schema](key: K): ResultBuilder1[Option] =
new ResultBuilder1[Option] {
def returning[V: Schema]: IO[RedisError, Option[V]] =
RedisCommand(HRandField, ArbitraryInput[K](), OptionalOutput(ArbitraryOutput[V]()), codec, executor).run(key)
RedisCommand(HRandField, ArbitraryKeyInput[K](), OptionalOutput(ArbitraryOutput[V]()), codec, executor).run(key)
}

/**
Expand All @@ -387,7 +387,7 @@ trait Hashes extends RedisEnvironment {
def returning[V: Schema]: IO[RedisError, Chunk[V]] = {
val command = RedisCommand(
HRandField,
Tuple3(ArbitraryInput[K](), LongInput, OptionalInput(StringInput)),
Tuple3(ArbitraryKeyInput[K](), LongInput, OptionalInput(StringInput)),
ChunkOutput(ArbitraryOutput[V]()),
codec,
executor
Expand Down
Loading

0 comments on commit dd77a80

Please sign in to comment.