10
10
import java .io .OutputStream ;
11
11
import java .net .HttpURLConnection ;
12
12
import java .net .URL ;
13
+ import java .nio .charset .StandardCharsets ;
13
14
import java .time .Instant ;
14
15
import java .util .ArrayList ;
15
16
import java .util .Collections ;
26
27
import org .apache .kafka .connect .source .SourceRecord ;
27
28
import org .apache .kafka .connect .source .SourceTaskContext ;
28
29
import org .apache .kafka .connect .storage .OffsetStorageReader ;
30
+ import org .jetbrains .annotations .NotNull ;
29
31
import org .junit .jupiter .api .AfterAll ;
30
32
import org .junit .jupiter .api .BeforeAll ;
31
33
import org .junit .jupiter .api .Test ;
32
34
import org .opensearch .testcontainers .OpensearchContainer ;
33
35
import org .testcontainers .utility .DockerImageName ;
34
36
35
- import com .github .castorm .kafka .connect .http .model .Offset ;
36
37
import com .github .castorm .kafka .connect .http .model .Partition ;
37
38
38
39
import lombok .extern .slf4j .Slf4j ;
@@ -79,6 +80,56 @@ private Map<String, String> getConf() {
79
80
return props ;
80
81
}
81
82
83
+ private String createAliases (Integer nbIndexes ) throws Exception {
84
+ String url = opensearch .getHttpHostAddress () + "/_aliases" ;
85
+ HttpURLConnection con = (HttpURLConnection ) new URL (url ).openConnection ();
86
+ con .setRequestMethod ("POST" );
87
+ con .setRequestProperty ("Content-Type" , "application/json" );
88
+ con .setRequestProperty ("Accept" , "application/json" );
89
+ con .setDoOutput (true );
90
+
91
+ ArrayList <String > aliases = new ArrayList <>();
92
+ StringBuilder input = getAliasesRequestInput (nbIndexes , aliases );
93
+
94
+ try (OutputStream os = con .getOutputStream ()) {
95
+ byte [] byteInput = input .toString ().getBytes (StandardCharsets .UTF_8 );
96
+ os .write (byteInput , 0 , byteInput .length );
97
+
98
+ assertThat (con .getResponseCode ()).isEqualTo (200 );
99
+ }
100
+
101
+ return StringUtils .join (aliases , "," );
102
+ }
103
+
104
+ @ NotNull
105
+ private static StringBuilder getAliasesRequestInput (Integer nbIndexes , ArrayList <String > aliases ) {
106
+ StringBuilder input = new StringBuilder ("{\" actions\" : [ " );
107
+ for (int i = 0 ; i < nbIndexes ; i ++) {
108
+ String alias = "alias-" + i ;
109
+ input .append ("{\" add\" : {\" index\" : \" index-" ).append (i )
110
+ .append ("\" ,\" alias\" : \" " ).append (alias ).append ("\" }}, " );
111
+ aliases .add (alias );
112
+ }
113
+ input .replace (input .length () - 2 , input .length () - 1 , "" );
114
+ input .append ("]}" );
115
+ return input ;
116
+ }
117
+
118
+ private void deleteAliases (String indexes , String aliases ) throws Exception {
119
+ String [] aliasesList = aliases .split ("," );
120
+ String [] indexesList = indexes .split ("," );
121
+ for (int i = 0 ; i < aliasesList .length ; i ++) {
122
+ String url = opensearch .getHttpHostAddress () + "/" + indexesList [i ] + "/_alias/" + aliasesList [i ];
123
+ HttpURLConnection con = (HttpURLConnection ) new URL (url ).openConnection ();
124
+ con .setRequestMethod ("DELETE" );
125
+ con .setRequestProperty ("Content-Type" , "application/json" );
126
+ con .setRequestProperty ("Accept" , "application/json" );
127
+ con .setDoOutput (true );
128
+
129
+ assertThat (con .getResponseCode ()).isEqualTo (200 );
130
+ }
131
+ }
132
+
82
133
private String loadTestData (int nbIndexes ) throws Exception {
83
134
return loadTestData (nbIndexes , 1 );
84
135
}
@@ -155,10 +206,16 @@ void testNominal() throws Exception {
155
206
156
207
@ Test
157
208
void testTimestamps () throws Exception {
158
- String endpointIncludeList = loadTestData (4 , 4 );
209
+ // before
210
+ String indexes = loadTestData (4 , 4 );
211
+ String aliases = createAliases (4 );
159
212
Map <String , String > config = getConf ();
160
- config .put (HttpSourceConnectorConfig .ENDPOINT_INCLUDE_LIST , endpointIncludeList );
213
+ config .put (HttpSourceConnectorConfig .ENDPOINT_INCLUDE_LIST , aliases );
214
+
215
+ // when
161
216
List <SourceRecord > records = runTasks (config , 4 , 4 );
217
+
218
+ // then
162
219
assertThat (records ).hasSize (16 );
163
220
assertThat (records .get (0 ).value ()).isInstanceOf (Struct .class );
164
221
assertThat (((Struct ) records .get (0 ).value ()).get ("_streamkap_value" ).toString ()).contains ("my_timestamp" );
@@ -171,6 +228,9 @@ void testTimestamps() throws Exception {
171
228
"1-0" , "1-1" , "1-2" , "1-3" ,
172
229
"2-0" , "2-1" , "2-2" , "2-3" ,
173
230
"3-0" , "3-1" , "3-2" , "3-3" );
231
+
232
+ // after
233
+ deleteAliases (indexes , aliases );
174
234
}
175
235
176
236
@ Test
0 commit comments