Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1713,17 +1713,26 @@ public RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query) {
}

@Override
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count) {
return dispatch(searchCommandBuilder.ftCursorread(index, cursorId, count));
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason to iterate over the aggregated result with the cursor is that the reply is too big, contains too many results, etc. Using AggregationReply as an argument to ftCursorread defeats the original purpose. A much cleaner approach would be to introduce an abstraction for Cursor that encapsulates the cursorId and nodeId and pass it to the ftCursor commands.

if (aggregateReply == null)
throw new IllegalArgumentException("aggregateReply must not be null");
return dispatch(searchCommandBuilder.ftCursorread(index, aggregateReply.getCursorId(), count));
}

@Override
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId) {
return dispatch(searchCommandBuilder.ftCursorread(index, cursorId, -1));
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply) {
return ftCursorread(index, aggregateReply, -1);
}

@Override
public RedisFuture<String> ftCursordel(K index, long cursorId) {
public RedisFuture<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply) {
if (aggregateReply == null)
throw new IllegalArgumentException("aggregateReply must not be null");
long cursorId = aggregateReply.getCursorId();
if (cursorId <= 0) {
// idempotent OK for non-existent/finished cursor
return dispatch(searchCommandBuilder.ftCursordel(index, 0));
}
return dispatch(searchCommandBuilder.ftCursordel(index, cursorId));
}

Expand Down
22 changes: 16 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1748,8 +1748,13 @@ public Mono<String> ftAlter(K index, List<FieldArgs<K>> fieldArgs) {
}

@Override
public Mono<String> ftCursordel(K index, long cursorId) {
return createMono(() -> searchCommandBuilder.ftCursordel(index, cursorId));
public Mono<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply) {
return createMono(() -> {
if (aggregateReply == null)
throw new IllegalArgumentException("aggregateReply must not be null");
long cursorId = aggregateReply.getCursorId();
return searchCommandBuilder.ftCursordel(index, cursorId > 0 ? cursorId : 0);
});
}

@Override
Expand Down Expand Up @@ -1783,13 +1788,18 @@ public Mono<AggregationReply<K, V>> ftAggregate(K index, V query) {
}

@Override
public Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count) {
return createMono(() -> searchCommandBuilder.ftCursorread(index, cursorId, count));
public Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count) {
return createMono(() -> {
if (aggregateReply == null)
throw new IllegalArgumentException("aggregateReply must not be null");
long cursorId = aggregateReply.getCursorId();
return searchCommandBuilder.ftCursorread(index, cursorId > 0 ? cursorId : 0, count);
});
}

@Override
public Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId) {
return createMono(() -> searchCommandBuilder.ftCursorread(index, cursorId, -1));
public Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply) {
return ftCursorread(index, aggregateReply, -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ public interface RediSearchAsyncCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count);

/**
* Read next results from an existing cursor using the default batch size.
Expand Down Expand Up @@ -1190,7 +1190,7 @@ public interface RediSearchAsyncCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply);

/**
* Delete a cursor and free its associated resources.
Expand Down Expand Up @@ -1229,6 +1229,6 @@ public interface RediSearchAsyncCommands<K, V> {
* @see #ftCursorread(Object, long, int)
*/
@Experimental
RedisFuture<String> ftCursordel(K index, long cursorId);
RedisFuture<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply);

}
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ public interface RediSearchReactiveCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId, int count);
Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count);

/**
* Read next results from an existing cursor using the default batch size.
Expand Down Expand Up @@ -1192,7 +1192,7 @@ public interface RediSearchReactiveCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
Mono<AggregationReply<K, V>> ftCursorread(K index, long cursorId);
Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply);

/**
* Delete a cursor and free its associated resources.
Expand Down Expand Up @@ -1231,6 +1231,6 @@ public interface RediSearchReactiveCommands<K, V> {
* @see #ftCursorread(Object, long, int)
*/
@Experimental
Mono<String> ftCursordel(K index, long cursorId);
Mono<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply);

}
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ public interface RediSearchCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
AggregationReply<K, V> ftCursorread(K index, long cursorId, int count);
AggregationReply<K, V> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count);

/**
* Read next results from an existing cursor using the default batch size.
Expand Down Expand Up @@ -1190,7 +1190,7 @@ public interface RediSearchCommands<K, V> {
* @see #ftAggregate(Object, Object, AggregateArgs)
*/
@Experimental
AggregationReply<K, V> ftCursorread(K index, long cursorId);
AggregationReply<K, V> ftCursorread(K index, AggregationReply<K, V> aggregateReply);

/**
* Delete a cursor and free its associated resources.
Expand Down Expand Up @@ -1229,6 +1229,6 @@ public interface RediSearchCommands<K, V> {
* @see #ftCursorread(Object, long, int)
*/
@Experimental
String ftCursordel(K index, long cursorId);
String ftCursordel(K index, AggregationReply<K, V> aggregateReply);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -64,6 +65,8 @@
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ConnectionIntent;
import io.lettuce.core.search.AggregationReply;
import io.lettuce.core.search.arguments.AggregateArgs;

/**
* An advanced asynchronous and thread-safe API for a Redis Cluster connection.
Expand Down Expand Up @@ -197,10 +200,107 @@ public RedisFuture<List<K>> clusterGetKeysInSlot(int slot, int count) {
}

@Override
public RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
// Route by index slot to ensure we know the creating node; stamp nodeId if a cursor is created
int slot = SlotHash.getSlot(codec.encodeKey(index));
RedisClusterNode node = getStatefulConnection().getPartitions().getPartitionBySlot(slot);
if (node == null) {
return super.ftAggregate(index, query, args);
}
String nodeId = node.getNodeId();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
RedisFuture<AggregationReply<K, V>> f = byNode.async().ftAggregate(index, query, args);
CompletableFuture<AggregationReply<K, V>> mapped = new CompletableFuture<>();
f.whenComplete((reply, err) -> {
if (err != null) {
mapped.completeExceptionally(err);
return;
}
if (reply != null && reply.getCursorId() > 0) {
AggregationReply.stampNodeId(reply, nodeId);
}
mapped.complete(reply);
});
return new PipelinedRedisFuture<>(mapped);
}

@Override
public RedisFuture<AggregationReply<K, V>> ftAggregate(K index, V query) {
return ftAggregate(index, query, null);
}

public RedisFuture<Long> dbsize() {
return MultiNodeExecution.aggregateAsync(executeOnUpstream(RedisServerAsyncCommands::dbsize));
}

@Override
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count) {
if (aggregateReply == null) {
CompletableFuture<AggregationReply<K, V>> failed = new CompletableFuture<>();
failed.completeExceptionally(new IllegalArgumentException("aggregateReply must not be null"));
return new PipelinedRedisFuture<>(failed);
}
long cursorId = aggregateReply.getCursorId();
if (cursorId <= 0) {
// terminal: return empty reply
CompletableFuture<AggregationReply<K, V>> done = new CompletableFuture<>();
done.complete(new AggregationReply<>());
return new PipelinedRedisFuture<>(done);
}
Optional<String> nodeIdOpt = aggregateReply.getNodeId();
if (!nodeIdOpt.isPresent()) {
CompletableFuture<AggregationReply<K, V>> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalArgumentException("AggregationReply missing nodeId; cannot route cursor READ in cluster mode"));
return new PipelinedRedisFuture<>(failed);
}
String nodeId = nodeIdOpt.get();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
RedisFuture<AggregationReply<K, V>> f = byNode.async().ftCursorread(index, aggregateReply, count);
CompletableFuture<AggregationReply<K, V>> mapped = new CompletableFuture<>();
f.whenComplete((reply, err) -> {
if (err != null) {
mapped.completeExceptionally(err);
return;
}
if (reply != null) {
AggregationReply.stampNodeId(reply, nodeId);
}
mapped.complete(reply);
});
return new PipelinedRedisFuture<>(mapped);
}

@Override
public RedisFuture<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply) {
return ftCursorread(index, aggregateReply, -1);
}

@Override
public RedisFuture<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply) {
if (aggregateReply == null) {
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(new IllegalArgumentException("aggregateReply must not be null"));
return new PipelinedRedisFuture<>(failed);
}
long cursorId = aggregateReply.getCursorId();
if (cursorId <= 0) {
CompletableFuture<String> done = new CompletableFuture<>();
done.complete("OK");
return new PipelinedRedisFuture<>(done);
}
Optional<String> nodeIdOpt = aggregateReply.getNodeId();
if (!nodeIdOpt.isPresent()) {
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalArgumentException("AggregationReply missing nodeId; cannot route cursor DEL in cluster mode"));
return new PipelinedRedisFuture<>(failed);
}
String nodeId = nodeIdOpt.get();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
return byNode.async().ftCursordel(index, aggregateReply);
}

@Override
public RedisFuture<Long> del(K... keys) {
return del(Arrays.asList(keys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@
import io.lettuce.core.output.KeyValueStreamingChannel;
import io.lettuce.core.protocol.ConnectionIntent;
import reactor.core.publisher.Flux;
import io.lettuce.core.search.AggregationReply;

import io.lettuce.core.search.arguments.AggregateArgs;

import reactor.core.publisher.Mono;

import java.util.Optional;

/**
* An advanced reactive and thread-safe API to a Redis Cluster connection.
*
Expand Down Expand Up @@ -384,12 +390,80 @@ public Mono<String> scriptLoad(byte[] script) {
return Flux.merge(publishers.values()).last();
}

@Override
public Mono<AggregationReply<K, V>> ftAggregate(K index, V query, AggregateArgs<K, V> args) {
int slot = SlotHash.getSlot(codec.encodeKey(index));
RedisClusterNode node = getStatefulConnection().getPartitions().getPartitionBySlot(slot);
if (node == null) {
return super.ftAggregate(index, query, args);
}
String nodeId = node.getNodeId();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
return byNode.reactive().ftAggregate(index, query, args).mapNotNull(reply -> {
if (reply != null && reply.getCursorId() > 0) {
AggregationReply.stampNodeId(reply, nodeId);
}
return reply;
});
}

@Override
public Mono<AggregationReply<K, V>> ftAggregate(K index, V query) {
return ftAggregate(index, query, null);
}

@Override
public Mono<Void> shutdown(boolean save) {
Map<String, Publisher<Void>> publishers = executeOnNodes(commands -> commands.shutdown(save), ALL_NODES);
return Flux.merge(publishers.values()).then();
}

@Override
public Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply, int count) {
if (aggregateReply == null) {
return Mono.error(new IllegalArgumentException("aggregateReply must not be null"));
}
long cursorId = aggregateReply.getCursorId();
if (cursorId <= 0) {
return Mono.just(new AggregationReply<>());
}
Optional<String> nodeIdOpt = aggregateReply.getNodeId();
if (!nodeIdOpt.isPresent()) {
return Mono.error(
new IllegalArgumentException("AggregationReply missing nodeId; cannot route cursor READ in cluster mode"));
}
String nodeId = nodeIdOpt.get();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
return byNode.reactive().ftCursorread(index, aggregateReply, count).map(reply -> {
AggregationReply.stampNodeId(reply, nodeId);
return reply;
});
}

@Override
public Mono<AggregationReply<K, V>> ftCursorread(K index, AggregationReply<K, V> aggregateReply) {
return ftCursorread(index, aggregateReply, -1);
}

@Override
public Mono<String> ftCursordel(K index, AggregationReply<K, V> aggregateReply) {
if (aggregateReply == null) {
return Mono.error(new IllegalArgumentException("aggregateReply must not be null"));
}
long cursorId = aggregateReply.getCursorId();
if (cursorId <= 0) {
return Mono.just("OK");
}
Optional<String> nodeIdOpt = aggregateReply.getNodeId();
if (!nodeIdOpt.isPresent()) {
return Mono.error(
new IllegalArgumentException("AggregationReply missing nodeId; cannot route cursor DEL in cluster mode"));
}
String nodeId = nodeIdOpt.get();
StatefulRedisConnection<K, V> byNode = getStatefulConnection().getConnection(nodeId, ConnectionIntent.WRITE);
return byNode.reactive().ftCursordel(index, aggregateReply);
}

@Override
public Mono<Long> touch(K... keys) {
return touch(Arrays.asList(keys));
Expand Down
Loading