17
17
package io .apicurio .registry .storage .impl .kafkasql .upgrade ;
18
18
19
19
import io .apicurio .common .apps .config .Info ;
20
+ import io .apicurio .common .apps .multitenancy .TenantContext ;
21
+ import io .apicurio .common .apps .multitenancy .TenantContextLoader ;
20
22
import io .apicurio .registry .exception .RuntimeAssertionFailedException ;
21
23
import io .apicurio .registry .exception .UnreachableCodeException ;
22
24
import io .apicurio .registry .storage .impl .kafkasql .KafkaSqlSubmitter ;
26
28
import io .apicurio .registry .storage .impl .kafkasql .values .ActionType ;
27
29
import io .apicurio .registry .storage .impl .kafkasql .values .MessageValue ;
28
30
import io .apicurio .registry .storage .impl .kafkasql .values .UpgraderValue ;
31
+ import io .apicurio .registry .utils .impexp .ContentEntity ;
29
32
import jakarta .enterprise .context .ApplicationScoped ;
33
+ import jakarta .enterprise .context .control .ActivateRequestContext ;
30
34
import jakarta .enterprise .inject .Instance ;
31
35
import jakarta .inject .Inject ;
32
36
import lombok .Getter ;
@@ -103,6 +107,12 @@ public class KafkaSqlUpgraderManager {
103
107
@ Inject
104
108
ThreadContext threadContext ;
105
109
110
+ @ Inject
111
+ TenantContextLoader tcl ;
112
+
113
+ @ Inject
114
+ TenantContext tctx ;
115
+
106
116
/**
107
117
* Unique ID of this upgrader, generated on each Registry start.
108
118
*/
@@ -336,10 +346,11 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
336
346
lockHeartbeat .heartbeat (); // Initialize heartbeat
337
347
if (testMode ) {
338
348
try {
339
- var c = sqlStore . getContentEntityByContentId (2 );
349
+ var c = getContentEntityByContentIdForTest (2 );
340
350
log .debug ("Content hash before: {}" , c .contentHash );
341
351
log .debug ("Canonical content hash before: {}" , c .canonicalHash );
342
- } catch (Exception ignored ) {
352
+ } catch (Exception ex ) {
353
+ log .error ("Suppressing exception in TEST MODE" , ex );
343
354
}
344
355
}
345
356
for (KafkaSqlUpgrader u : activeUpgraders ) {
@@ -357,10 +368,11 @@ public synchronized void read(Instant currentTimestamp, MessageKey key, MessageV
357
368
}
358
369
if (testMode ) {
359
370
try {
360
- var c = sqlStore . getContentEntityByContentId (2 );
371
+ var c = getContentEntityByContentIdForTest (2 );
361
372
log .debug ("Content hash after: {}" , c .contentHash );
362
373
log .debug ("Canonical content hash after: {}" , c .canonicalHash );
363
- } catch (Exception ignored ) {
374
+ } catch (Exception ex ) {
375
+ log .error ("Suppressing exception in TEST MODE" , ex );
364
376
}
365
377
}
366
378
}
@@ -527,6 +539,15 @@ private void updateLockMap(Instant timestamp, UpgraderValue value) {
527
539
}
528
540
529
541
542
+ @ ActivateRequestContext
543
+ synchronized ContentEntity getContentEntityByContentIdForTest (int contentId ) {
544
+ tctx .setContext (tcl .loadBatchJobContext (TenantContext .DEFAULT_TENANT_ID ));
545
+ var res = sqlStore .getContentEntityByContentId (contentId );
546
+ tctx .clearContext ();
547
+ return res ;
548
+ }
549
+
550
+
530
551
private enum State {
531
552
WAIT_FOR_BOOTSTRAP ,
532
553
WAIT ,
0 commit comments