Skip to content

Commit

Permalink
fix infinity loop in RedisUri, add tests for ClusterPartitionOutput (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
maizy authored Aug 7, 2024
1 parent 7a7a296 commit 915f5df
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait ClusterSpec extends IntegrationSpec {
.foreach(0 to 5) { n =>
ZIO
.attempt(docker.getServiceHost(s"cluster-node$n", port))
.map(host => RedisUri(s"$host:$port"))
.map(host => RedisUri(host, port))
}
.orDie
actual = res.map(_.master.address) ++ res.flatMap(_.slaves.map(_.address))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait IntegrationSpec extends ZIOSpecDefault {
for {
docker <- ZIO.service[DockerComposeContainer]
hostAndPort <- docker.getHostAndPort(IntegrationSpec.MasterNode)(6379)
uri = RedisUri(s"${hostAndPort._1}:${hostAndPort._2}")
uri = RedisUri(hostAndPort._1, hostAndPort._2)
} yield RedisClusterConfig(Chunk(uri))
}

Expand Down
2 changes: 1 addition & 1 deletion modules/redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object Output {
val host = MultiStringOutput.unsafeDecode(values(0))
val port = LongOutput.unsafeDecode(values(1))
val nodeId = MultiStringOutput.unsafeDecode(values(2))
Node(nodeId, RedisUri(s"$host:$port"))
Node(nodeId, RedisUri(host, port.toInt))
case other => throw ProtocolError(s"$other isn't an array")
}
}
Expand Down
5 changes: 4 additions & 1 deletion modules/redis/src/main/scala/zio/redis/RedisUri.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ object RedisUri {
val splitting = hostAndPort.split(':')
val host = splitting(0)
val port = splitting(1).toInt
RedisUri(s"$host:$port")
RedisUri(host, port, ssl = false, sni = None)
}

def apply(host: String, port: Int): RedisUri =
RedisUri(host, port, ssl = false, sni = None)
}
131 changes: 131 additions & 0 deletions modules/redis/src/test/scala/zio/redis/OutputSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import zio.redis.Output._
import zio.redis.RedisError._
import zio.redis.internal.PubSub.{PushMessage, SubscriptionKey}
import zio.redis.internal.RespValue
import zio.redis.options.Cluster
import zio.test.Assertion._
import zio.test._

Expand Down Expand Up @@ -1028,6 +1029,136 @@ object OutputSpec extends BaseSpec {

assertZIO(ZIO.attempt(PushMessageOutput.unsafeDecode(input)))(equalTo(expected))
}
),
suite("ClusterPartition")(
test("3 masters cluster") {
val response =
RespValue.array(
RespValue.array(
RespValue.Integer(0L),
RespValue.Integer(5460L),
RespValue.array(
RespValue.bulkString("127.0.0.1"),
RespValue.Integer(6379L),
RespValue.bulkString("node1"),
RespValue.array()
)
),
RespValue.array(
RespValue.Integer(5461L),
RespValue.Integer(10922L),
RespValue.array(
RespValue.bulkString("127.0.0.2"),
RespValue.Integer(6379L),
RespValue.bulkString("node2"),
RespValue.array()
)
),
RespValue.array(
RespValue.Integer(10923L),
RespValue.Integer(16383L),
RespValue.array(
RespValue.bulkString("127.0.0.3"),
RespValue.Integer(6379L),
RespValue.bulkString("node3"),
RespValue.array()
)
)
)

val expected = Chunk(
Cluster.Partition(
Cluster.SlotRange(0L, 5460L),
Cluster.Node("node1", RedisUri("127.0.0.1", 6379)),
slaves = Chunk.empty
),
Cluster.Partition(
Cluster.SlotRange(5461L, 10922L),
Cluster.Node("node2", RedisUri("127.0.0.2", 6379)),
slaves = Chunk.empty
),
Cluster.Partition(
Cluster.SlotRange(10923L, 16383L),
Cluster.Node("node3", RedisUri("127.0.0.3", 6379)),
slaves = Chunk.empty
)
)

assertZIO(ZIO.attempt(ChunkOutput(ClusterPartitionOutput).unsafeDecode(response)))(hasSameElements(expected))
},
test("3 masters with 2 replicas cluster") {
val response =
RespValue.array(
RespValue.array(
RespValue.Integer(0L),
RespValue.Integer(5460L),
RespValue.array(
RespValue.bulkString("127.0.0.1"),
RespValue.Integer(6379L),
RespValue.bulkString("node1"),
RespValue.array()
),
RespValue.array(
RespValue.bulkString("127.0.1.1"),
RespValue.Integer(6379L),
RespValue.bulkString("replica1"),
RespValue.array()
)
),
RespValue.array(
RespValue.Integer(5461L),
RespValue.Integer(10922L),
RespValue.array(
RespValue.bulkString("127.0.0.2"),
RespValue.Integer(6379L),
RespValue.bulkString("node2"),
RespValue.array()
),
RespValue.array(
RespValue.bulkString("127.0.1.2"),
RespValue.Integer(6379L),
RespValue.bulkString("replica2"),
RespValue.array()
)
),
RespValue.array(
RespValue.Integer(10923L),
RespValue.Integer(16383L),
RespValue.array(
RespValue.bulkString("127.0.0.3"),
RespValue.Integer(6379L),
RespValue.bulkString("node3"),
RespValue.array()
),
RespValue.array(
RespValue.bulkString("127.0.1.3"),
RespValue.Integer(6379L),
RespValue.bulkString("replica3"),
RespValue.array()
)
)
)

val expected = Chunk(
Cluster.Partition(
Cluster.SlotRange(0L, 5460L),
Cluster.Node("node1", RedisUri("127.0.0.1", 6379)),
slaves = Chunk.single(Cluster.Node("replica1", RedisUri("127.0.1.1", 6379)))
),
Cluster.Partition(
Cluster.SlotRange(5461L, 10922L),
Cluster.Node("node2", RedisUri("127.0.0.2", 6379)),
slaves = Chunk.single(Cluster.Node("replica2", RedisUri("127.0.1.2", 6379)))
),
Cluster.Partition(
Cluster.SlotRange(10923L, 16383L),
Cluster.Node("node3", RedisUri("127.0.0.3", 6379)),
slaves = Chunk.single(Cluster.Node("replica3", RedisUri("127.0.1.3", 6379)))
)
)

assertZIO(ZIO.attempt(ChunkOutput(ClusterPartitionOutput).unsafeDecode(response)))(hasSameElements(expected))
}
)
)

Expand Down

0 comments on commit 915f5df

Please sign in to comment.