Skip to content

Commit 7ed97cb

Browse files
committed
fix: make KafkaSqlUpgraderManager more resilient against lock timeouts
- Increase default lock timeout to 80s. - Tune a few other time parameters as a response to higher default timeout. - Use kafka-clock instead of wall-clock for lock timeouts. - Configure message.timestamp.type=LogAppendTime by default when auto-creating the topic. - Improve some log messages.
1 parent 3daf161 commit 7ed97cb

File tree

4 files changed

+68
-26
lines changed

4 files changed

+68
-26
lines changed

integration-tests/src/test/java/io/apicurio/tests/kafkasql/manual/KafkaSqlStorageUpgraderManagerIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@
4545
@Tag(Constants.KAFKASQL_MANUAL)
4646
public class KafkaSqlStorageUpgraderManagerIT implements TestSeparator, Constants {
4747

48-
public static final int LOCK_TIMEOUT_SECONDS = 10;
48+
public static final int LOCK_TIMEOUT_SECONDS = 80;
4949

5050
private long testTimeoutMultiplier = 1;
5151

5252
private static final BiConsumer<String, RegistryRunner> REPORTER = (line, node) -> {
53-
if (line.contains("We detected a significant time difference")) {
53+
if (line.contains("consider increasing 'registry.kafkasql.upgrade-lock-timeout' config value")) {
5454
node.getReport().put("time-slip-detected", true);
5555
}
5656
if (line.contains("State change: WAIT -> LOCKED")) {

integration-tests/src/test/resources/infra/kafka/registry-kafka-manual.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ spec:
3030
name: http
3131
protocol: TCP
3232
readinessProbe:
33-
failureThreshold: 60 # Intentionally high
33+
failureThreshold: 120 # Intentionally high
3434
httpGet:
3535
path: /health/ready
3636
port: 8080
@@ -40,7 +40,7 @@ spec:
4040
successThreshold: 1
4141
timeoutSeconds: 10
4242
livenessProbe:
43-
failureThreshold: 60 # Intentionally high
43+
failureThreshold: 120 # Intentionally high
4444
httpGet:
4545
path: /health/live
4646
port: 8080

storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

+2
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ private void autoCreateTopics() {
192192
configuration.topicProperties().entrySet().forEach(entry -> topicProperties.put(entry.getKey().toString(), entry.getValue().toString()));
193193
// Use log compaction by default.
194194
topicProperties.putIfAbsent(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
195+
// See KafkaSqlUpgraderManager
196+
topicProperties.putIfAbsent(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime");
195197
Properties adminProperties = configuration.adminProperties();
196198
adminProperties.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.bootstrapServers());
197199
try {

storage/kafkasql/src/main/java/io/apicurio/registry/storage/impl/kafkasql/upgrade/KafkaSqlUpgraderManager.java

+62-22
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import jakarta.enterprise.inject.Instance;
3535
import jakarta.inject.Inject;
3636
import lombok.Getter;
37+
import lombok.ToString;
3738
import org.eclipse.microprofile.config.inject.ConfigProperty;
3839
import org.eclipse.microprofile.context.ManagedExecutor;
3940
import org.eclipse.microprofile.context.ThreadContext;
@@ -76,7 +77,7 @@ public class KafkaSqlUpgraderManager {
7677
* <p>
7778
* However, since we have heartbeat messages, this time does not have to be too long.
7879
*/
79-
@ConfigProperty(name = "registry.kafkasql.upgrade-lock-timeout", defaultValue = "10s")
80+
@ConfigProperty(name = "registry.kafkasql.upgrade-lock-timeout", defaultValue = "80s")
8081
@Info(category = "store", description = "How long should KafkaSQL upgrader manager hold the lock before it's assumed to have failed. " +
8182
"There is a tradeoff between giving the upgrade process enough time and recovering from a failed upgrade. " +
8283
"You may need to increase this value if your Kafka cluster is very busy.", availableSince = "2.5.9.Final")
@@ -128,6 +129,8 @@ public class KafkaSqlUpgraderManager {
128129

129130
private long sequence;
130131

132+
private Instant kafkaClock;
133+
131134
private volatile boolean localTryLocked;
132135
private Instant localTryLockedTimestamp;
133136
private volatile boolean upgrading;
@@ -165,8 +168,9 @@ public synchronized void init() {
165168
// We need to keep in mind that multiple nodes might start at the same time.
166169
// Upgrader runs only once on startup, so this can be set once.
167170
localUpgraderUUID = UUID.randomUUID().toString();
171+
log.debug("UUID of this upgrader is {}", localUpgraderUUID);
168172

169-
waitHeartbeatEmitter = new WaitHeartbeatEmitter(scale(lockTimeout, 1.1f), submitter, log, threadContext);
173+
waitHeartbeatEmitter = new WaitHeartbeatEmitter(scale(lockTimeout, 0.26f), submitter, log, threadContext);
170174

171175
// Produce a bootstrap message to know when we are up-to-date with the topic. We don't know the version yet.
172176
submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_BOOTSTRAP, localUpgraderUUID, null));
@@ -192,9 +196,12 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
192196
}
193197

194198
if (key instanceof UpgraderKey) {
199+
log.debug("Reading UpgraderKey {}", key);
200+
log.debug("Reading UpgraderValue {} {}", ((UpgraderValue) value).getAction(), value);
195201
// Update our lock map
196202
var upgraderValue = (UpgraderValue) value;
197-
updateLockMap(currentTimestamp, upgraderValue);
203+
processMessage(currentTimestamp, upgraderValue);
204+
log.debug("Lock map state: {}", lockMap);
198205
}
199206
}
200207

@@ -218,8 +225,10 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
218225
var slip = Duration.between(initTimestamp, currentTimestamp).abs().toMillis();
219226
if (slip > scale(lockTimeout, 0.25f).toMillis()) {
220227
log.warn("We detected a significant time difference ({} ms) between a moment when a Kafka message is produced (local time), " +
221-
"and it's creation timestamp reported by Kafka at the moment it is consumed. If this causes issues during KafkaSQL storage upgrade, " +
222-
"consider increasing 'registry.kafkasql.upgrade-lock-timeout' config value (currently {} ms).", slip, lockTimeout);
228+
"and it's creation timestamp reported by Kafka at the moment it is consumed. " +
229+
"This might happen when Kafka is configured with message.timestamp.type=LogAppendTime. " +
230+
"If this causes issues during KafkaSQL storage upgrade, " +
231+
"consider increasing 'registry.kafkasql.upgrade-lock-timeout' config value (currently {}).", slip, lockTimeout);
223232
}
224233
switchState(State.WAIT);
225234
}
@@ -256,9 +265,8 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
256265
// Nobody tried to upgrade yet, or failed, we should try.
257266
if (localTryLocked) {
258267
// We have tried to lock, eventually it might be our turn to go, but we have to check for our own timeout
259-
var now = Instant.now();
260-
if (lockMap.get(localUpgraderUUID).latestLockTimestamp != null && now.isAfter(lockMap.get(localUpgraderUUID).latestLockTimestamp.plus(lockTimeout)) &&
261-
localTryLockedTimestamp != null && now.isAfter(localTryLockedTimestamp.plus(scale(lockTimeout, 1.5f)))) { // We need to prevent loop here, so we keep a local timestamp as well
268+
if (lockMap.get(localUpgraderUUID).latestLockTimestamp != null && kafkaClock.isAfter(lockMap.get(localUpgraderUUID).latestLockTimestamp.plus(lockTimeout)) &&
269+
localTryLockedTimestamp != null && Instant.now().isAfter(localTryLockedTimestamp.plus(scale(lockTimeout, 1.5f)))) { // We need to prevent loop here, so we keep a local timestamp as well
262270
// Our own lock has timed out, we can try again
263271
localTryLocked = false;
264272
switchState(State.TRY_LOCK);
@@ -282,11 +290,10 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
282290
// We've got the lock, but we may have sent an unlock message
283291
if (localTryLocked) {
284292
// We got the lock, but first check if we have enough time.
285-
var now = Instant.now();
286-
if (now.isAfter(lockMap.get(localUpgraderUUID).latestLockTimestamp.plus(scale(lockTimeout, 0.5f)))) {
293+
if (kafkaClock.isAfter(lockMap.get(localUpgraderUUID).latestLockTimestamp.plus(scale(lockTimeout, 0.5f)))) {
287294
// We should unlock and wait, then try again
288295
log.warn("We've got the lock but we don't have enough time ({} ms remaining). Unlocking.",
289-
Duration.between(lockMap.get(localUpgraderUUID).latestLockTimestamp, now).toMillis());
296+
Duration.between(lockMap.get(localUpgraderUUID).latestLockTimestamp, kafkaClock).toMillis());
290297
submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_ABORT_AND_UNLOCK, localUpgraderUUID, targetVersion));
291298
localTryLocked = false;
292299
// No need to send heartbeat, since we're expecting to read the unlock message
@@ -488,14 +495,46 @@ private LockRecord computeActiveLock() {
488495
var r = lockMap.values().stream()
489496
.filter(rr -> rr.targetVersion != null &&
490497
rr.targetVersion == targetVersion &&
491-
rr.tryLocked &&
492-
!rr.isTimedOut(Instant.now(), lockTimeout))
498+
rr.tryLocked)
499+
.filter(rr -> {
500+
var to = rr.isTimedOut(kafkaClock, lockTimeout);
501+
if (to) {
502+
log.debug("Lock of upgrader {} has timed out.", rr.upgraderUUID);
503+
}
504+
return !to;
505+
})
493506
.min(Comparator.comparingLong(rr -> rr.tryLockSequence));
494507
return r.orElse(null);
495508
}
496509

497510

498-
private void updateLockMap(Instant timestamp, UpgraderValue value) {
511+
private void processMessage(Instant timestamp, UpgraderValue value) {
512+
/*
513+
* There are two main ways how the Kafka message gets a timestamp:
514+
* - By the broker when a message is put into the log (LogAppendTime), or
515+
* - By a client/producer when a message is created (CreateTime)
516+
* based on topic configuration https://kafka.apache.org/documentation/#log.message.timestamp.type
517+
*
518+
* The first case is better for synchronisation, and we try to set it by default in
519+
* io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage.autoCreateTopics.
520+
*
521+
* However, we have to handle the second case as well, so we have to ensure that:
522+
* - the kafka clock does not go back in time, and
523+
* - we use big enough lock timeout to handle potentially large round-trip times.
524+
*/
525+
if (kafkaClock == null || timestamp.isAfter(kafkaClock)) {
526+
kafkaClock = timestamp;
527+
} else {
528+
var slip = Duration.between(timestamp, kafkaClock).abs().toMillis();
529+
if (slip > scale(lockTimeout, 0.25f).toMillis()) {
530+
log.warn("Ignoring significantly antedated timestamp {}, current kafka clock is {}. " +
531+
"This might happen when Kafka is configured with message.timestamp.type=CreateTime. " +
532+
"If this causes issues during KafkaSQL storage upgrade, " +
533+
"consider increasing 'registry.kafkasql.upgrade-lock-timeout' config value (currently {}).", timestamp, kafkaClock, lockTimeout);
534+
} else {
535+
log.debug("Ignoring antedated timestamp {}, current kafka clock is {}.", timestamp, kafkaClock);
536+
}
537+
}
499538
if (value.getUpgraderUUID() == null) {
500539
return;
501540
}
@@ -559,6 +598,7 @@ private enum State {
559598
}
560599

561600

601+
@ToString
562602
private static class LockRecord {
563603
// UUID of the upgrader
564604
String upgraderUUID;
@@ -608,7 +648,7 @@ private UpgraderManagerHandle(KafkaSqlSubmitter submitter, String localUpgraderU
608648
*/
609649
public synchronized void heartbeat() {
610650
var now = Instant.now();
611-
if (lastHeartbeat == null || now.isAfter(lastHeartbeat.plus(scale(lockTimeout, 0.35f)))) {
651+
if (lastHeartbeat == null || now.isAfter(lastHeartbeat.plus(scale(lockTimeout, 0.25f)))) {
612652
log.debug("Sending lock heartbeat.");
613653
submitter.send(UpgraderKey.create(true), UpgraderValue.create(ActionType.UPGRADE_LOCK_HEARTBEAT, localUpgraderUUID, null));
614654
lastHeartbeat = now;
@@ -617,7 +657,7 @@ public synchronized void heartbeat() {
617657

618658

619659
private synchronized boolean isTimedOut() {
620-
return Instant.now().isAfter(lastHeartbeat.plus(scale(lockTimeout, 0.85f)));
660+
return Instant.now().isAfter(lastHeartbeat.plus(scale(lockTimeout, 0.75f)));
621661
}
622662
}
623663

@@ -697,13 +737,13 @@ public static Duration scale(Duration original, float scale) {
697737

698738
/* This is the current lock timeout schematic:
699739
*
700-
* Lock timeout: |------------------------------| 100% = 10s (default)
701-
* Wait heartbeat: | . | 110% = 11s
702-
* Lock heartbeat: | | . 35% = 3.5s
703-
* Too late to upgrade: | | . 50% = 5s
704-
* Upgrader timeout*: | | . 85% = 8.5s
740+
* Lock timeout: |------------------------------| 100%
741+
* Wait heartbeat: | | . 26%
742+
* Lock heartbeat: | | . 25%
743+
* Too late to upgrade: | | . 50%
744+
* Upgrader timeout*: | | . 75%
705745
*
706746
* * This is the longest time upgrader can block without sending a heartbeat,
707-
* assuming that the heartbeat Kafka message is stored within ~15% of lock timeout (1.5s by default).
747+
* assuming that the heartbeat Kafka message is stored within ~25% of lock timeout.
708748
*/
709749
}

0 commit comments

Comments
 (0)