1
1
package io .apicurio .registry .storage .impl .kafkasql ;
2
2
3
3
import io .apicurio .registry .AbstractResourceTestBase ;
4
- import io .apicurio .registry .rest .client .models .ArtifactContent ;
4
+ import io .apicurio .registry .rest .client .models .CreateArtifact ;
5
5
import io .apicurio .registry .rest .client .models .Rule ;
6
6
import io .apicurio .registry .rest .client .models .RuleType ;
7
+ import io .apicurio .registry .types .ArtifactType ;
8
+ import io .apicurio .registry .types .ContentTypes ;
7
9
import io .apicurio .registry .utils .kafka .KafkaUtil ;
8
10
import io .apicurio .registry .utils .tests .KafkasqlSnapshotTestProfile ;
11
+ import io .apicurio .registry .utils .tests .TestUtils ;
9
12
import io .quarkus .test .common .QuarkusTestResource ;
10
13
import io .quarkus .test .common .QuarkusTestResourceLifecycleManager ;
11
14
import io .quarkus .test .junit .QuarkusTest ;
@@ -54,11 +57,9 @@ public void init() {
54
57
for (int idx = 0 ; idx < 1000 ; idx ++) {
55
58
System .out .println ("Iteration: " + idx );
56
59
String artifactId = UUID .randomUUID ().toString ();
57
- ArtifactContent content = new ArtifactContent ();
58
- content .setContent (simpleAvro );
59
- clientV3 .groups ().byGroupId (NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID ).artifacts ().post (content , config -> {
60
- config .headers .add ("X-Registry-ArtifactId" , artifactId );
61
- });
60
+ CreateArtifact createArtifact = TestUtils .clientCreateArtifact (artifactId , ArtifactType .AVRO , simpleAvro ,
61
+ ContentTypes .APPLICATION_JSON );
62
+ clientV3 .groups ().byGroupId (NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID ).artifacts ().post (createArtifact , config -> config .headers .add ("X-Registry-ArtifactId" , artifactId ));
62
63
Rule rule = new Rule ();
63
64
rule .setType (RuleType .VALIDITY );
64
65
rule .setConfig ("SYNTAX_ONLY" );
@@ -77,7 +78,7 @@ public void testSnapshotCreation() throws IOException {
77
78
@ Test
78
79
public void testRecoverFromSnapshot () throws InterruptedException {
79
80
//We expect 4001 artifacts coming from the snapshot
80
- Assertions .assertEquals (4001 , clientV3 .groups ().byGroupId ("default" ).artifacts ().get ().getCount ());
81
+ Assertions .assertEquals (1000 , clientV3 .groups ().byGroupId ("default" ).artifacts ().get ().getCount ());
81
82
//We expect another 1000 artifacts coming added on top of the snapshot
82
83
Assertions .assertEquals (1000 , clientV3 .groups ().byGroupId (NEW_ARTIFACTS_SNAPSHOT_TEST_GROUP_ID ).artifacts ().get ().getCount ());
83
84
}
@@ -114,10 +115,10 @@ private void prepareSnapshotMarker(Properties props) throws ExecutionException,
114
115
RecordHeader messageTypeHeader = new RecordHeader ("mt" , "CreateSnapshot1Message" .getBytes (StandardCharsets .UTF_8 ));
115
116
ProducerRecord <String , String > snapshotMarkerRecord = new ProducerRecord <>("kafkasql-journal" , 0 ,
116
117
"{\" uuid\" :\" 1345b402-c707-457e-af76-10c1045e68e8\" ,\" messageType\" :\" CreateSnapshot1Message\" ,\" partitionKey\" :\" __GLOBAL_PARTITION__\" }" , "{\n "
117
- + " \" snapshotLocation\" : \" /io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8 .sql\" ,\n "
118
- + " \" snapshotId\" : \" 1302b402-c707-457e-af76-10c1045e68e8 \" ,\n "
118
+ + " \" snapshotLocation\" : \" /io/apicurio/registry/storage/impl/kafkasql/943e6945-5aef-4ca0-a3cd-31af380840ea .sql\" ,\n "
119
+ + " \" snapshotId\" : \" 943e6945-5aef-4ca0-a3cd-31af380840ea \" ,\n "
119
120
+ " \" key\" : {\n "
120
- + " \" uuid\" : \" 1302b402-c707-457e-af76-10c1045e68e8 \" ,\n "
121
+ + " \" uuid\" : \" 943e6945-5aef-4ca0-a3cd-31af380840ea \" ,\n "
121
122
+ " \" messageType\" : \" CreateSnapshot1Message\" ,\n "
122
123
+ " \" partitionKey\" : \" __GLOBAL_PARTITION__\" \n "
123
124
+ " }\n "
@@ -131,14 +132,14 @@ private void prepareSnapshotMessages(Properties props) throws URISyntaxException
131
132
StringSerializer keySerializer = new StringSerializer ();
132
133
StringSerializer valueSerializer = new StringSerializer ();
133
134
134
- URL resource = getClass ().getResource ("/io/apicurio/registry/storage/impl/kafkasql/1302b402-c707-457e-af76-10c1045e68e8 .sql" );
135
+ URL resource = getClass ().getResource ("/io/apicurio/registry/storage/impl/kafkasql/943e6945-5aef-4ca0-a3cd-31af380840ea .sql" );
135
136
String snapshotLocation = Paths .get (resource .toURI ()).toFile ().getAbsolutePath ();
136
137
137
138
//Send three messages to the snapshots topic, two invalid, and one valid. Only the latest valid one must be processed.
138
139
ProducerRecord <String , String > olderInvalidSnapshot = new ProducerRecord <>("kafkasql-snapshots" , 0 , "1312b402-c707-457e-af76-10c1045e68e8" ,
139
140
"snapshotLocation" ,
140
141
Collections .emptyList ());
141
- ProducerRecord <String , String > record = new ProducerRecord <>("kafkasql-snapshots" , 0 , "1302b402-c707-457e-af76-10c1045e68e8 " , snapshotLocation ,
142
+ ProducerRecord <String , String > record = new ProducerRecord <>("kafkasql-snapshots" , 0 , "943e6945-5aef-4ca0-a3cd-31af380840ea " , snapshotLocation ,
142
143
Collections .emptyList ());
143
144
ProducerRecord <String , String > newerInvalidSnaphot = new ProducerRecord <>("kafkasql-snapshots" , 0 , "1322b402-c707-457e-af76-10c1045e68e8" , "" ,
144
145
Collections .emptyList ());
0 commit comments