5
5
import io .apicurio .registry .model .GAV ;
6
6
import io .apicurio .registry .storage .RegistryStorage ;
7
7
import io .apicurio .registry .storage .dto .ArtifactReferenceDto ;
8
+ import io .apicurio .registry .storage .dto .EditableArtifactMetaDataDto ;
8
9
import io .apicurio .registry .storage .error .InvalidArtifactTypeException ;
9
10
import io .apicurio .registry .storage .error .VersionAlreadyExistsException ;
10
11
import io .apicurio .registry .storage .impexp .EntityInputStream ;
@@ -61,6 +62,12 @@ public class SqlDataUpgrader extends AbstractDataImporter {
61
62
protected final Map <Long , Long > globalIdMapping = new HashMap <>();
62
63
protected final Map <Long , Long > contentIdMapping = new HashMap <>();
63
64
65
+ // Collection of content waiting for required references. A given content cannot be imported unless the
66
+ // expected reference is present.
67
+ // TODO do a second round to this, since this currently means enforcing the integrity rule. (Maybe try a
68
+ // first round and, if there are no artifacts remaining, just import the orphaned content).
69
+ protected final Map <ContentEntity , Set <GAV >> waitingForReference = new HashMap <>();
70
+
64
71
// To keep track of which versions have been imported
65
72
private final Set <GAV > gavDone = new HashSet <>();
66
73
@@ -126,7 +133,7 @@ public void importArtifactVersion(ArtifactVersionEntity entity) {
126
133
.groupId (entity .groupId ).build ();
127
134
128
135
// If the version being imported is the first one, we have to create the artifact first
129
- if (entity .versionId == 1 ) {
136
+ if (! storage . isArtifactExists ( entity .groupId , entity . artifactId ) ) {
130
137
ArtifactEntity artifactEntity = ArtifactEntity .builder ().artifactId (entity .artifactId )
131
138
.artifactType (entity .artifactType ).createdOn (entity .createdOn )
132
139
.description (entity .description ).groupId (entity .groupId ).labels (artifactVersionLabels )
@@ -135,6 +142,16 @@ public void importArtifactVersion(ArtifactVersionEntity entity) {
135
142
storage .importArtifact (artifactEntity );
136
143
}
137
144
145
+ if (entity .isLatest ) {
146
+ // If this version is the latest, update the artifact metadata with its metadata
147
+ EditableArtifactMetaDataDto editableArtifactMetaDataDto = EditableArtifactMetaDataDto
148
+ .builder ().name (newEntity .name ).owner (newEntity .owner )
149
+ .description (newEntity .description ).labels (newEntity .labels ).build ();
150
+
151
+ storage .updateArtifactMetaData (newEntity .groupId , newEntity .artifactId ,
152
+ editableArtifactMetaDataDto );
153
+ }
154
+
138
155
storage .importArtifactVersion (newEntity );
139
156
log .debug ("Artifact version imported successfully: {}" , entity );
140
157
globalIdMapping .put (oldGlobalId , entity .globalId );
@@ -149,6 +166,20 @@ public void importArtifactVersion(ArtifactVersionEntity entity) {
149
166
}
150
167
waitingForVersion .removeAll (commentsToImport );
151
168
169
+ // Once the artifact version is processed, check if there is some content waiting for this as it's
170
+ // reference
171
+ // For each content waiting for the version we just inserted, remove it from the list.
172
+ waitingForReference .values ().forEach (waitingReferences -> waitingReferences .remove (gav ));
173
+
174
+ // Finally, once the list of required deps is updated, if it was the last reference needed, import
175
+ // the content.
176
+ waitingForReference .keySet ().stream ()
177
+ .filter (content -> waitingForReference .get (content ).isEmpty ())
178
+ .forEach (contentToImport -> {
179
+ if (!contentIdMapping .containsKey (contentToImport .contentId )) {
180
+ importContent (contentToImport );
181
+ }
182
+ });
152
183
} catch (VersionAlreadyExistsException ex ) {
153
184
if (ex .getGlobalId () != null ) {
154
185
log .warn ("Duplicate globalId {} detected, skipping import of artifact version: {}" ,
@@ -167,6 +198,27 @@ public void importContent(ContentEntity entity) {
167
198
List <ArtifactReferenceDto > references = SqlUtil
168
199
.deserializeReferences (entity .serializedReferences );
169
200
201
+ Set <GAV > referencesGavs = references
202
+ .stream ().map (referenceDto -> new GAV (referenceDto .getGroupId (),
203
+ referenceDto .getArtifactId (), referenceDto .getVersion ()))
204
+ .collect (Collectors .toSet ());
205
+
206
+ Set <ArtifactReferenceDto > requiredReferences = new HashSet <>();
207
+
208
+ // If there are references and they've not been imported yet, add them to the waiting collection
209
+ if (!references .isEmpty () && !gavDone .containsAll (referencesGavs )) {
210
+ waitingForReference .put (entity , referencesGavs );
211
+
212
+ // For each artifact reference, if it has not been imported yet, add it to the waiting list
213
+ // for this content.
214
+ referencesGavs .stream ()
215
+ .filter (artifactReference -> !referencesGavs .contains (artifactReference ))
216
+ .forEach (artifactReference -> waitingForReference .get (entity ).add (artifactReference ));
217
+
218
+ // This content cannot be imported until all the references are imported.
219
+ return ;
220
+ }
221
+
170
222
TypedContent typedContent = TypedContent .create (ContentHandle .create (entity .contentBytes ), null );
171
223
Map <String , TypedContent > resolvedReferences = storage .resolveReferences (references );
172
224
entity .artifactType = utils .determineArtifactType (typedContent , null , resolvedReferences );
0 commit comments