27
27
import io .apicurio .registry .storage .impl .sql .RegistryStorageContentUtils ;
28
28
import io .apicurio .registry .storage .impl .sql .SqlRegistryStorage ;
29
29
import io .apicurio .registry .storage .importing .DataImporter ;
30
- import io .apicurio .registry .storage .importing .SqlDataImporter ;
30
+ import io .apicurio .registry .storage .importing .v2 .SqlDataUpgrader ;
31
+ import io .apicurio .registry .storage .importing .v3 .SqlDataImporter ;
31
32
import io .apicurio .registry .types .RuleType ;
32
33
import io .apicurio .registry .utils .ConcurrentUtil ;
33
- import io .apicurio .registry .utils .impexp .ArtifactEntity ;
34
- import io .apicurio .registry .utils .impexp .ArtifactRuleEntity ;
35
- import io .apicurio .registry .utils .impexp .ArtifactVersionEntity ;
36
- import io .apicurio .registry .utils .impexp .BranchEntity ;
37
- import io .apicurio .registry .utils .impexp .CommentEntity ;
38
- import io .apicurio .registry .utils .impexp .ContentEntity ;
39
- import io .apicurio .registry .utils .impexp .GlobalRuleEntity ;
40
- import io .apicurio .registry .utils .impexp .GroupEntity ;
41
- import io .apicurio .registry .utils .impexp .GroupRuleEntity ;
34
+ import io .apicurio .registry .utils .impexp .v3 . ArtifactEntity ;
35
+ import io .apicurio .registry .utils .impexp .v3 . ArtifactRuleEntity ;
36
+ import io .apicurio .registry .utils .impexp .v3 . ArtifactVersionEntity ;
37
+ import io .apicurio .registry .utils .impexp .v3 . BranchEntity ;
38
+ import io .apicurio .registry .utils .impexp .v3 . CommentEntity ;
39
+ import io .apicurio .registry .utils .impexp .v3 . ContentEntity ;
40
+ import io .apicurio .registry .utils .impexp .v3 . GlobalRuleEntity ;
41
+ import io .apicurio .registry .utils .impexp .v3 . GroupEntity ;
42
+ import io .apicurio .registry .utils .impexp .v3 . GroupRuleEntity ;
42
43
import io .apicurio .registry .utils .kafka .KafkaUtil ;
43
44
import io .apicurio .registry .utils .kafka .ProducerActions ;
44
45
import jakarta .annotation .PreDestroy ;
@@ -634,6 +635,30 @@ public void importData(EntityInputStream entities, boolean preserveGlobalId, boo
634
635
});
635
636
}
636
637
638
+ /**
639
+ * @see io.apicurio.registry.storage.RegistryStorage#importData(io.apicurio.registry.storage.impexp.EntityInputStream,
640
+ * boolean, boolean)
641
+ */
642
+ @ Override
643
+ public void upgradeData (EntityInputStream entities , boolean preserveGlobalId , boolean preserveContentId )
644
+ throws RegistryStorageException {
645
+ DataImporter dataImporter = new SqlDataUpgrader (log , utils , this , preserveGlobalId ,
646
+ preserveContentId );
647
+ dataImporter .importData (entities , () -> {
648
+ // Because importing just pushes a bunch of Kafka messages, we may need to
649
+ // wait for a few seconds before we send the reset messages. Due to partitioning,
650
+ // we can't guarantee ordering of these next two messages, and we NEED them to
651
+ // be consumed after all the import messages.
652
+ // TODO We can wait until the last message is read (a specific one),
653
+ // or create a new message type for this purpose (a sync message).
654
+ try {
655
+ Thread .sleep (2000 );
656
+ } catch (Exception e ) {
657
+ // Noop
658
+ }
659
+ });
660
+ }
661
+
637
662
/**
638
663
* @see io.apicurio.registry.storage.RegistryStorage#createRoleMapping(java.lang.String, java.lang.String,
639
664
* java.lang.String)
@@ -804,7 +829,7 @@ public long nextCommentId() {
804
829
}
805
830
806
831
/**
807
- * @see io.apicurio.registry.storage.RegistryStorage#importComment(io.apicurio.registry.utils.impexp. CommentEntity)
832
+ * @see io.apicurio.registry.storage.RegistryStorage#importComment(CommentEntity)
808
833
*/
809
834
@ Override
810
835
public void importComment (CommentEntity entity ) {
@@ -814,7 +839,7 @@ public void importComment(CommentEntity entity) {
814
839
}
815
840
816
841
/**
817
- * @see io.apicurio.registry.storage.RegistryStorage#importGroup(io.apicurio.registry.utils.impexp. GroupEntity)
842
+ * @see io.apicurio.registry.storage.RegistryStorage#importGroup(GroupEntity)
818
843
*/
819
844
@ Override
820
845
public void importGroup (GroupEntity entity ) {
@@ -824,7 +849,7 @@ public void importGroup(GroupEntity entity) {
824
849
}
825
850
826
851
/**
827
- * @see io.apicurio.registry.storage.RegistryStorage#importGlobalRule(io.apicurio.registry.utils.impexp. GlobalRuleEntity)
852
+ * @see io.apicurio.registry.storage.RegistryStorage#importGlobalRule(GlobalRuleEntity)
828
853
*/
829
854
@ Override
830
855
public void importGlobalRule (GlobalRuleEntity entity ) {
@@ -834,7 +859,7 @@ public void importGlobalRule(GlobalRuleEntity entity) {
834
859
}
835
860
836
861
/**
837
- * @see io.apicurio.registry.storage.RegistryStorage#importContent(io.apicurio.registry.utils.impexp. ContentEntity)
862
+ * @see io.apicurio.registry.storage.RegistryStorage#importContent(ContentEntity)
838
863
*/
839
864
@ Override
840
865
public void importContent (ContentEntity entity ) {
@@ -845,7 +870,7 @@ public void importContent(ContentEntity entity) {
845
870
}
846
871
847
872
/**
848
- * @see io.apicurio.registry.storage.RegistryStorage#importArtifactVersion(io.apicurio.registry.utils.impexp. ArtifactVersionEntity)
873
+ * @see io.apicurio.registry.storage.RegistryStorage#importArtifactVersion(ArtifactVersionEntity)
849
874
*/
850
875
@ Override
851
876
public void importArtifactVersion (ArtifactVersionEntity entity ) {
@@ -862,7 +887,7 @@ public void importArtifact(ArtifactEntity entity) {
862
887
}
863
888
864
889
/**
865
- * @see io.apicurio.registry.storage.RegistryStorage#importArtifactRule(io.apicurio.registry.utils.impexp. ArtifactRuleEntity)
890
+ * @see io.apicurio.registry.storage.RegistryStorage#importArtifactRule(ArtifactRuleEntity)
866
891
*/
867
892
@ Override
868
893
public void importArtifactRule (ArtifactRuleEntity entity ) {
0 commit comments