Skip to content

Commit 4db1a7b

Browse files
committed
HTTP-99 address review feedback
Signed-off-by: davidradl <david_radley@uk.ibm.com>
1 parent 6a2255c commit 4db1a7b

File tree

4 files changed

+111
-101
lines changed

4 files changed

+111
-101
lines changed

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
/*
2-
* © Copyright IBM Corp. 2025
3-
*/
4-
51
package com.getindata.connectors.http.internal.table.lookup.querycreators;
62

73
import java.io.IOException;
@@ -17,12 +13,7 @@
1713
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
1814
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
1915
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
20-
import org.apache.flink.table.api.DataTypes.Field;
21-
import org.apache.flink.table.data.GenericRowData;
2216
import org.apache.flink.table.data.RowData;
23-
import org.apache.flink.table.types.DataType;
24-
import org.apache.flink.table.types.FieldsDataType;
25-
import org.apache.flink.types.Row;
2617
import org.apache.flink.util.FlinkRuntimeException;
2718
import org.apache.flink.util.Preconditions;
2819

@@ -33,25 +24,21 @@
3324
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
3425

3526
/**
36-
* Generic JSON and URL query creator; in addition to be able to map columns to json requests,
37-
* it allows url inserts to be mapped to column names using templating.
38-
* <br>
39-
* For GETs, column names are mapped to query parameters. e.g. for
27+
* <p>Generic JSON and URL query creator; in addition to be able to map columns to json requests,
28+
* it allows url inserts to be mapped to column names using templating.</p>
29+
* <p>For GETs, column names are mapped to query parameters. e.g. for
4030
* <code>GenericJsonAndUrlQueryCreator.REQUEST_PARAM_FIELDS</code> = "id1;id2"
4131
* and url of http://base. At lookup time with values of id1=1 and id2=2 a call of
42-
* http/base?id1=1&amp;id2=2 will be issued.
43-
* <br>
44-
* For PUT and POST, parameters are mapped to the json body e.g. for
32+
* http/base?id1=1&amp;id2=2 will be issued.</p>
33+
* <p>For PUT and POST, parameters are mapped to the json body e.g. for
4534
* REQUEST_PARAM_FIELDS = "id1;id2" and url of http://base. At lookup time with values of id1=1 and
46-
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}
47-
* <br>
48-
* For all http methods, url segments can be used to include lookup up values. Using the map from
35+
* id2=2 as call of http/base will be issued with a json payload of {"id1":1,"id2":2}</p>
36+
* <p>For all http methods, url segments can be used to include lookup up values. Using the map from
4937
* <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
5038
* value of the associated column.
5139
* e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> = "key1":"col1"
5240
* and url of http://base/{key1}. At lookup time with values of col1="aaaa" a call of
53-
* http/base/aaaa will be issued.
54-
*
41+
* http/base/aaaa will be issued.</p>
5542
*/
5643
@Slf4j
5744
@Builder
@@ -120,28 +107,6 @@ public LookupQueryInfo createLookupQuery(final RowData lookupDataRow) {
120107

121108
}
122109

123-
/**
124-
* Create a Row from a RowData and DataType
125-
* @param lookupRowData the lookup RowData
126-
* @param rowType the datatype
127-
* @return row return row
128-
*/
129-
@VisibleForTesting
130-
static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
131-
Preconditions.checkNotNull(lookupRowData);
132-
Preconditions.checkNotNull(rowType);
133-
134-
final Row row = Row.withNames();
135-
final List<Field> rowFields = FieldsDataType.getFields(rowType);
136-
137-
for (int idx = 0; idx < rowFields.size(); idx++) {
138-
final String fieldName = rowFields.get(idx).getName();
139-
final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx);
140-
row.setField(fieldName, fieldValue);
141-
}
142-
return row;
143-
}
144-
145110
/**
146111
* Create map of the json key to the lookup argument
147112
* value. This is used for body based content.
@@ -150,7 +115,7 @@ static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
150115
* @return map of field content to the lookup argument value.
151116
*/
152117
private Map<String, String> createBodyBasedParams(final Collection<LookupArg> args,
153-
ObjectNode objectNode ) {
118+
ObjectNode objectNode ) {
154119
Map<String, String> mapOfJsonKeyToLookupArg = new LinkedHashMap<>();
155120
Iterator<Map.Entry<String, JsonNode>> iterator = objectNode.fields();
156121
iterator.forEachRemaining(field -> {
@@ -172,7 +137,8 @@ private Map<String, String> createBodyBasedParams(final Collection<LookupArg> ar
172137
* @return map of field content to the lookup argument value.
173138
*/
174139
private Map<String, String> createURLPathBasedParams(final Collection<LookupArg> args,
175-
Map<String, String> urlMap ) {
140+
Map<String,
141+
String> urlMap ) {
176142
Map<String, String> mapOfinsertKeyToLookupArg = new LinkedHashMap<>();
177143
if (urlMap != null) {
178144
for (String key: urlMap.keySet()) {

src/main/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class GenericJsonAndUrlQueryCreatorFactory implements LookupQueryCreatorF
7070
+ "as url segments. Parses a string as a map of strings. "
7171
+ "<br>"
7272
+ "For example if there are table columns called customerId"
73-
+ " and orderId, then specifying value customerId:cid1,orderID:oid"
73+
+ " and orderId, then specifying value customerId:cid,orderID:oid"
7474
+ " and a url of https://myendpoint/customers/{cid}/orders/{oid}"
7575
+ " will mean that the url used for the lookup query will"
7676
+ " dynamically pickup the values for customerId, orderId"

src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactoryTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,6 @@ public void lookupQueryInfoTestRequiredConfig() {
7171
null,
7272
null);
7373
});
74-
// do not specify REQUEST_ARG_PATHS_CONFIG
75-
assertThrows(RuntimeException.class, () -> {
76-
genericJsonAndUrlQueryCreatorFactory.createLookupQueryCreator(config,
77-
null,
78-
null);
79-
});
8074
}
8175

8276
private void createUsingFactory(boolean async) {
@@ -116,6 +110,7 @@ void optionsTests() {
116110
assertThat(factory.optionalOptions()).contains(REQUEST_BODY_FIELDS);
117111
assertThat(factory.optionalOptions()).contains(REQUEST_URL_MAP);
118112
}
113+
119114
public static DynamicTableFactory.Context getTableContext(Configuration config,
120115
ResolvedSchema resolvedSchema) {
121116
return new FactoryUtil.DefaultDynamicTableContext(

src/test/java/com/getindata/connectors/http/internal/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java

Lines changed: 98 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,54 +18,43 @@
1818
import org.apache.flink.table.data.RowData;
1919
import org.apache.flink.table.data.StringData;
2020
import org.apache.flink.table.types.DataType;
21+
import org.apache.flink.table.types.FieldsDataType;
2122
import org.apache.flink.types.Row;
23+
import org.apache.flink.util.Preconditions;
24+
import org.jetbrains.annotations.NotNull;
2225
import org.junit.jupiter.api.Test;
2326
import org.junit.jupiter.params.ParameterizedTest;
2427
import org.junit.jupiter.params.provider.ValueSource;
2528
import static org.assertj.core.api.Assertions.assertThat;
2629
import static org.junit.jupiter.api.Assertions.assertThrows;
2730

31+
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
2832
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
2933
import com.getindata.connectors.http.internal.table.lookup.RowDataSingleValueLookupSchemaEntry;
3034
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.LOOKUP_METHOD;
3135
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row;
3236
import static com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonAndUrlQueryCreatorFactoryTest.getTableContext;
3337

3438
class GenericJsonAndUrlQueryCreatorTest {
35-
39+
private static final String KEY = "key1";
40+
private static final String VALUE = "val1";
41+
// for GET this is the minimum config
42+
private static final List<String> QUERY_PARAMS = List.of(KEY);
43+
// Path param ArgPath required a stringified json object. As we have PersonBean
44+
// we can use that.
45+
private static final Map<String, String> URL_PARAMS = Map.of(KEY, KEY);
46+
private static final DataType dataType = row(List.of(
47+
DataTypes.FIELD(KEY, DataTypes.STRING())
48+
));
49+
private static final ResolvedSchema resolvedSchema = ResolvedSchema.of(Column.physical(KEY,
50+
DataTypes.STRING()));
51+
private static final RowData ROWDATA = getRowData(1, VALUE);
3652
@ParameterizedTest
3753
@ValueSource(strings = {"GET", "PUT", "POST" })
3854
public void createLookupQueryTestStrAllOps(String operation) {
39-
String key = "key1";
40-
String value = "val1";
41-
// for GET this is the minimum config
42-
List<String> query_params = List.of(key);
43-
// Path param ArgPath required a stringified json object. As we have PersonBean
44-
// we can use that.
45-
Map<String, String> url_params = Map.of(key, key);
46-
LookupRow lookupRow = new LookupRow()
47-
.addLookupEntry(
48-
new RowDataSingleValueLookupSchemaEntry(
49-
key,
50-
RowData.createFieldGetter(
51-
DataTypes.STRING().getLogicalType(), 0)
52-
));
53-
DataType dataType = row(List.of(
54-
DataTypes.FIELD(key, DataTypes.STRING())
55-
));
56-
lookupRow.setLookupPhysicalRowDataType(dataType);
57-
ResolvedSchema resolvedSchema = ResolvedSchema.of(Column.physical(key,
58-
DataTypes.STRING()));
59-
Configuration config = new Configuration();
60-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS,
61-
query_params);
62-
if (!operation.equals("GET")) {
63-
// add the body content for PUT and POST
64-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
65-
query_params);
66-
}
67-
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, url_params);
68-
config.setString(LOOKUP_METHOD, operation);
55+
// WHEN
56+
LookupRow lookupRow = getLookupRow();
57+
Configuration config = getConfiguration(operation);
6958
GenericJsonAndUrlQueryCreator universalJsonQueryCreator =
7059
(GenericJsonAndUrlQueryCreator) new GenericJsonAndUrlQueryCreatorFactory()
7160
.createLookupQueryCreator(
@@ -74,26 +63,67 @@ public void createLookupQueryTestStrAllOps(String operation) {
7463
getTableContext(config,
7564
resolvedSchema)
7665
);
77-
var row = new GenericRowData(1);
78-
row.setField(0, StringData.fromString(value));
79-
var createdQuery = universalJsonQueryCreator.createLookupQuery(row);
66+
var createdQuery = universalJsonQueryCreator.createLookupQuery(ROWDATA);
8067
// THEN
8168
if (operation.equals("GET")) {
82-
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
83-
assertThat(createdQuery.getLookupQuery()).isEqualTo(key + "=" + value);
69+
validateCreatedQueryForGet(createdQuery);
8470
} else {
85-
assertThat(createdQuery
86-
.getBodyBasedUrlQueryParameters())
87-
.isEqualTo(key + "=" + value);
88-
assertThat(createdQuery.getLookupQuery()).isEqualTo(
89-
"{\""
90-
+ key
91-
+ "\":\"" + value
92-
+ "\"}");
71+
validateCreatedQueryForPutAndPost(createdQuery);
9372
}
73+
// validate url based parameters
9474
assertThat(createdQuery.getPathBasedUrlParameters().size() == 1).isTrue();
95-
assertThat(createdQuery.getPathBasedUrlParameters().get(key)).isEqualTo(value);
75+
assertThat(createdQuery.getPathBasedUrlParameters().get(KEY)).isEqualTo(VALUE);
76+
}
77+
78+
private static void validateCreatedQueryForGet( LookupQueryInfo createdQuery) {
79+
// check there is no body params and we have the expected lookup query
80+
assertThat(createdQuery.getBodyBasedUrlQueryParameters()).isEmpty();
81+
assertThat(createdQuery.getLookupQuery()).isEqualTo(KEY + "=" + VALUE);
9682
}
83+
private static void validateCreatedQueryForPutAndPost(LookupQueryInfo createdQuery) {
84+
// check we have the expected body params and lookup query
85+
assertThat(createdQuery
86+
.getBodyBasedUrlQueryParameters())
87+
.isEqualTo(KEY + "=" + VALUE);
88+
assertThat(createdQuery.getLookupQuery()).isEqualTo(
89+
"{\""
90+
+ KEY
91+
+ "\":\"" + VALUE
92+
+ "\"}");
93+
}
94+
95+
private static @NotNull GenericRowData getRowData(int arity, String value) {
96+
var row = new GenericRowData(arity);
97+
row.setField(0, StringData.fromString(value));
98+
return row;
99+
}
100+
101+
private static @NotNull Configuration getConfiguration(String operation) {
102+
Configuration config = new Configuration();
103+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_QUERY_PARAM_FIELDS,
104+
QUERY_PARAMS);
105+
if (!operation.equals("GET")) {
106+
// add the body content for PUT and POST
107+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_BODY_FIELDS,
108+
QUERY_PARAMS);
109+
}
110+
config.set(GenericJsonAndUrlQueryCreatorFactory.REQUEST_URL_MAP, URL_PARAMS);
111+
config.setString(LOOKUP_METHOD, operation);
112+
return config;
113+
}
114+
115+
private static @NotNull LookupRow getLookupRow() {
116+
LookupRow lookupRow = new LookupRow()
117+
.addLookupEntry(
118+
new RowDataSingleValueLookupSchemaEntry(
119+
KEY,
120+
RowData.createFieldGetter(
121+
DataTypes.STRING().getLogicalType(), 0)
122+
));
123+
lookupRow.setLookupPhysicalRowDataType(dataType);
124+
return lookupRow;
125+
}
126+
97127
@Test
98128
public void createLookupQueryTest() {
99129
List<String> query_params = List.of("key1", "key2");
@@ -137,8 +167,7 @@ public void createLookupQueryTest() {
137167
getTableContext(config,
138168
resolvedSchema)
139169
);
140-
var row = new GenericRowData(2);
141-
row.setField(0, StringData.fromString(value));
170+
var row = getRowData(2, value);
142171
row.setField(1, StringData.fromString(value));
143172
var createdQuery = genericJsonAndUrlQueryCreator.createLookupQuery(row);
144173
// THEN
@@ -148,7 +177,7 @@ public void createLookupQueryTest() {
148177
+ "&" + key2 + "=" + value);
149178
}
150179
@Test
151-
public void failserializationOpenTest() {
180+
public void failSerializationOpenTest() {
152181
List<String> paths_config =List.of("key1");
153182
final String operation = "GET";
154183
final String key = "key1";
@@ -226,10 +255,30 @@ public byte[] serialize(RowData element) {
226255
DataTypes.FIELD(key3, DataTypes.TIMESTAMP_LTZ())
227256
));
228257
// WHEN
229-
Row row = GenericJsonAndUrlQueryCreator.rowDataToRow(rowData, dataType);
258+
Row row = rowDataToRow(rowData, dataType);
230259
// THEN
231260
assertThat(row.getField(key1).equals(value));
232261
assertThat(row.getField(key2).equals("1970-01-01T00:00:00.010"));
233262
assertThat(row.getField(key3).equals("1970-01-01T00:00:00.010Z"));
234263
}
264+
/**
265+
* Create a Row from a RowData and DataType
266+
* @param lookupRowData the lookup RowData
267+
* @param rowType the datatype
268+
* @return row return row
269+
*/
270+
private static Row rowDataToRow(final RowData lookupRowData, final DataType rowType) {
271+
Preconditions.checkNotNull(lookupRowData);
272+
Preconditions.checkNotNull(rowType);
273+
274+
final Row row = Row.withNames();
275+
final List<DataTypes.Field> rowFields = FieldsDataType.getFields(rowType);
276+
277+
for (int idx = 0; idx < rowFields.size(); idx++) {
278+
final String fieldName = rowFields.get(idx).getName();
279+
final Object fieldValue = ((GenericRowData) lookupRowData).getField(idx);
280+
row.setField(fieldName, fieldValue);
281+
}
282+
return row;
283+
}
235284
}

0 commit comments

Comments
 (0)