-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server module #19390
Conversation
…module Signed-off-by: PoAn Yang <payang@apache.org>
Failed test OffloadAndTxnConsumeFromLeaderTest#executeTieredStorageTest in CI is not related to this PR. |
Should these classes go to the |
…module Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I left a few suggestions.
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} | ||
import org.junit.jupiter.api.Test | ||
|
||
import java.util.concurrent.{CompletableFuture, ExecutionException} | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import scala.jdk.CollectionConverters.CollectionHasAsScala | ||
import scala.jdk.CollectionConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the explicit import?
@@ -16,22 +16,22 @@ | |||
*/ | |||
package integration.kafka.server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also convert and move this file to server
*/ | ||
@Override | ||
public void onComplete() { | ||
List<CompletableFuture<T>> pendingFutures = futures.stream().filter(future -> !future.isDone()).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use toList()
instead of collect(Collectors.toList())
log.trace("All futures have been completed or have errors, completing the delayed operation"); | ||
return forceComplete(); | ||
} else { | ||
log.trace(pending + " future still pending, not completing the delayed operation"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use templating {}
instead of concatenation in Logger methods.
0, | ||
TimeUnit.MILLISECONDS, | ||
new LinkedBlockingQueue<>(), | ||
new ThreadFactory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use a lambda to make the anonymous class a bit less verbose
return new KafkaThread("DelayedExecutor-" + purgatoryName, r, true); | ||
} | ||
}); | ||
this.purgatoryKey = new DelayedOperationKey() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
DelayedFuture<T> delayedFuture = new DelayedFuture<>(timeoutMs, futures, responseCallback); | ||
boolean done = purgatory.tryCompleteElseWatch(delayedFuture, List.of(purgatoryKey)); | ||
if (!done) { | ||
BiConsumer<Void, Throwable> callbackAction = new BiConsumer<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
@mimaison Thanks for the review. I address all comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Rewrite these classes in Java and move them to the server module
Reviewers: Mickael Maison mickael.maison@gmail.com