Skip to content

Commit c5614e9

Browse files
committed
[hotfix] Supports specifying routing fields during write operations. Parameter name: sink.partition-routing.fields
1 parent 7ada6f3 commit c5614e9

File tree

9 files changed

+186
-26
lines changed

9 files changed

+186
-26
lines changed

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ public Optional<String> getPathPrefix() {
150150
return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
151151
}
152152

153+
public String getPartitionRoutingFields() {
154+
return config.get(ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS);
155+
}
156+
153157
@Override
154158
public boolean equals(Object o) {
155159
if (this == o) {

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java

+6
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ public class ElasticsearchConnectorOptions {
152152
"The format must produce a valid JSON document. "
153153
+ "Please refer to the documentation on formats for more details.");
154154

155+
public static final ConfigOption<String> PARTITION_ROUTING_FIELDS =
156+
ConfigOptions.key("sink.partition-routing.fields")
157+
.stringType()
158+
.noDefaultValue()
159+
.withDescription("Route field names list, multiple separated by commas.");
160+
155161
// --------------------------------------------------------------------------------------------
156162
// Enums
157163
// --------------------------------------------------------------------------------------------

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java

+37
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,19 @@
2424
import org.apache.flink.table.data.RowData;
2525
import org.apache.flink.table.types.logical.DistinctType;
2626
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.apache.flink.util.StringUtils;
2728

2829
import java.io.Serializable;
2930
import java.time.Duration;
3031
import java.time.LocalDate;
3132
import java.time.LocalTime;
3233
import java.time.Period;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
3336
import java.util.HashMap;
3437
import java.util.List;
3538
import java.util.Map;
39+
import java.util.Optional;
3640
import java.util.function.Function;
3741

3842
/** An extractor for a Elasticsearch key from a {@link RowData}. */
@@ -109,6 +113,39 @@ public static Function<RowData, String> createKeyExtractor(
109113
.orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
110114
}
111115

116+
public static Function<RowData, String> createColumnExtractor(
117+
TableSchema schema, String keyDelimiter, String columns) {
118+
List<String> cols = null;
119+
if (StringUtils.isNullOrWhitespaceOnly(columns)) {
120+
cols = new ArrayList<>(0);
121+
} else {
122+
cols = Arrays.asList(columns.split(","));
123+
}
124+
return createColumnExtractor(schema, keyDelimiter, cols);
125+
}
126+
127+
public static Function<RowData, String> createColumnExtractor(
128+
TableSchema schema, String keyDelimiter, List<String> columns) {
129+
Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
130+
List<TableColumn> tableColumns = schema.getTableColumns();
131+
for (int i = 0; i < schema.getFieldCount(); i++) {
132+
TableColumn column = tableColumns.get(i);
133+
namesToColumns.put(column.getName(), new ColumnWithIndex(column, i));
134+
}
135+
136+
FieldFormatter[] fieldFormatters = columns == null || columns.isEmpty() ? new FieldFormatter[0] :
137+
columns.stream()
138+
.map(namesToColumns::get)
139+
.map(
140+
column ->
141+
toFormatter(
142+
column.index, column.getType()))
143+
.toArray(FieldFormatter[]::new);
144+
145+
Function<RowData, String> extractor = new KeyExtractor(fieldFormatters, keyDelimiter);
146+
return Optional.of(extractor).orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
147+
}
148+
112149
private static FieldFormatter toFormatter(int index, LogicalType type) {
113150
switch (type.getTypeRoot()) {
114151
case DATE:

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,17 @@ interface RequestFactory extends Serializable {
3636
* Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
3737
* been deprecated since Elasticsearch 7.x and it would not take any effort.
3838
*/
39-
UpdateRequest createUpdateRequest(
40-
String index, String docType, String key, XContentType contentType, byte[] document);
39+
UpdateRequest createUpdateRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document);
4140

4241
/**
4342
* Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
4443
* been deprecated since Elasticsearch 7.x and it would not take any effort.
4544
*/
46-
IndexRequest createIndexRequest(
47-
String index, String docType, String key, XContentType contentType, byte[] document);
45+
IndexRequest createIndexRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document);
4846

4947
/**
5048
* Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
5149
* been deprecated since Elasticsearch 7.x and it would not take any effort.
5250
*/
53-
DeleteRequest createDeleteRequest(String index, String docType, String key);
51+
DeleteRequest createDeleteRequest(String index, String docType, String key, String routing);
5452
}

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,23 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData>
5151
private final XContentType contentType;
5252
private final RequestFactory requestFactory;
5353
private final Function<RowData, String> createKey;
54+
private final Function<RowData, String> routingKey;
5455

5556
public RowElasticsearchSinkFunction(
5657
IndexGenerator indexGenerator,
5758
@Nullable String docType, // this is deprecated in es 7+
5859
SerializationSchema<RowData> serializationSchema,
5960
XContentType contentType,
6061
RequestFactory requestFactory,
61-
Function<RowData, String> createKey) {
62+
Function<RowData, String> createKey,
63+
Function<RowData, String> routingKey) {
6264
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
6365
this.docType = docType;
6466
this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
6567
this.contentType = Preconditions.checkNotNull(contentType);
6668
this.requestFactory = Preconditions.checkNotNull(requestFactory);
6769
this.createKey = Preconditions.checkNotNull(createKey);
70+
this.routingKey = routingKey;
6871
}
6972

7073
@Override
@@ -96,20 +99,20 @@ private void processUpsert(RowData row, RequestIndexer indexer) {
9699
if (key != null) {
97100
final UpdateRequest updateRequest =
98101
requestFactory.createUpdateRequest(
99-
indexGenerator.generate(row), docType, key, contentType, document);
102+
indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document);
100103
indexer.add(updateRequest);
101104
} else {
102105
final IndexRequest indexRequest =
103106
requestFactory.createIndexRequest(
104-
indexGenerator.generate(row), docType, key, contentType, document);
107+
indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document);
105108
indexer.add(indexRequest);
106109
}
107110
}
108111

109112
private void processDelete(RowData row, RequestIndexer indexer) {
110113
final String key = createKey.apply(row);
111114
final DeleteRequest deleteRequest =
112-
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
115+
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key, routingKey.apply(row));
113116
indexer.add(deleteRequest);
114117
}
115118

@@ -127,7 +130,8 @@ public boolean equals(Object o) {
127130
&& Objects.equals(serializationSchema, that.serializationSchema)
128131
&& contentType == that.contentType
129132
&& Objects.equals(requestFactory, that.requestFactory)
130-
&& Objects.equals(createKey, that.createKey);
133+
&& Objects.equals(createKey, that.createKey)
134+
&& Objects.equals(routingKey, that.routingKey);
131135
}
132136

133137
@Override
@@ -138,6 +142,7 @@ public int hashCode() {
138142
serializationSchema,
139143
contentType,
140144
requestFactory,
141-
createKey);
145+
createKey,
146+
routingKey);
142147
}
143148
}

flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java

+80
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.time.LocalDate;
3232
import java.time.LocalDateTime;
3333
import java.time.LocalTime;
34+
import java.util.Arrays;
35+
import java.util.List;
3436
import java.util.function.Function;
3537

3638
import static org.assertj.core.api.Assertions.assertThat;
@@ -129,4 +131,82 @@ public void testAllTypesKey() {
129131
.isEqualTo(
130132
"1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15");
131133
}
134+
135+
@Test
136+
public void testStringColumnsExtractor() {
137+
TableSchema schema =
138+
TableSchema.builder()
139+
.field("a", DataTypes.BIGINT().notNull())
140+
.field("b", DataTypes.STRING())
141+
.primaryKey("a")
142+
.build();
143+
144+
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", "a,b");
145+
146+
String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
147+
assertThat(key).isEqualTo("12_ABCD");
148+
}
149+
150+
@Test
151+
public void testListColumnsExtractor() {
152+
TableSchema schema =
153+
TableSchema.builder()
154+
.field("a", DataTypes.BIGINT().notNull())
155+
.field("b", DataTypes.STRING())
156+
.primaryKey("a")
157+
.build();
158+
159+
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_",
160+
Arrays.asList("a", "b"));
161+
162+
String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
163+
assertThat(key).isEqualTo("12_ABCD");
164+
}
165+
166+
@Test
167+
public void testEmptyColumnsExtractor() {
168+
TableSchema schema =
169+
TableSchema.builder()
170+
.field("a", DataTypes.BIGINT().notNull())
171+
.field("b", DataTypes.STRING())
172+
.primaryKey("a")
173+
.build();
174+
175+
String columns = null;
176+
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns);
177+
178+
String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
179+
assertThat(key).isEqualTo("");
180+
}
181+
182+
@Test
183+
public void testBlankColumnsExtractor() {
184+
TableSchema schema =
185+
TableSchema.builder()
186+
.field("a", DataTypes.BIGINT().notNull())
187+
.field("b", DataTypes.STRING())
188+
.primaryKey("a")
189+
.build();
190+
191+
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", "");
192+
193+
String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
194+
assertThat(key).isEqualTo("");
195+
}
196+
197+
@Test
198+
public void testNullColumnsExtractor() {
199+
TableSchema schema =
200+
TableSchema.builder()
201+
.field("a", DataTypes.BIGINT().notNull())
202+
.field("b", DataTypes.STRING())
203+
.primaryKey("a")
204+
.build();
205+
206+
List<String> columns = null;
207+
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns);
208+
209+
String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
210+
assertThat(key).isEqualTo("");
211+
}
132212
}

flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
148148
format,
149149
XContentType.JSON,
150150
REQUEST_FACTORY,
151-
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
151+
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
152+
KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields()));
152153

153154
final ElasticsearchSink.Builder<RowData> builder =
154155
builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -295,26 +296,39 @@ public UpdateRequest createUpdateRequest(
295296
String index,
296297
String docType,
297298
String key,
299+
String routing,
298300
XContentType contentType,
299301
byte[] document) {
300-
return new UpdateRequest(index, docType, key)
301-
.doc(document, contentType)
302-
.upsert(document, contentType);
302+
UpdateRequest req = new UpdateRequest(index, docType, key)
303+
.doc(document, contentType);
304+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
305+
req = req.routing(routing);
306+
}
307+
return req.upsert(document, contentType);
303308
}
304309

305310
@Override
306311
public IndexRequest createIndexRequest(
307312
String index,
308313
String docType,
309314
String key,
315+
String routing,
310316
XContentType contentType,
311317
byte[] document) {
312-
return new IndexRequest(index, docType, key).source(document, contentType);
318+
IndexRequest req = new IndexRequest(index, docType, key);
319+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
320+
req = req.routing(routing);
321+
}
322+
return req.source(document, contentType);
313323
}
314324

315325
@Override
316-
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
317-
return new DeleteRequest(index, docType, key);
326+
public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) {
327+
DeleteRequest req = new DeleteRequest(index, docType, key);
328+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
329+
req = req.routing(routing);
330+
}
331+
return req;
318332
}
319333
}
320334

flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java

+21-7
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
143143
format,
144144
XContentType.JSON,
145145
REQUEST_FACTORY,
146-
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
146+
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
147+
KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields()));
147148

148149
final ElasticsearchSink.Builder<RowData> builder =
149150
builderProvider.createBuilder(config.getHosts(), upsertFunction);
@@ -290,26 +291,39 @@ public UpdateRequest createUpdateRequest(
290291
String index,
291292
String docType,
292293
String key,
294+
String routing,
293295
XContentType contentType,
294296
byte[] document) {
295-
return new UpdateRequest(index, key)
296-
.doc(document, contentType)
297-
.upsert(document, contentType);
297+
UpdateRequest req = new UpdateRequest(index, key)
298+
.doc(document, contentType);
299+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
300+
req = req.routing(routing);
301+
}
302+
return req.upsert(document, contentType);
298303
}
299304

300305
@Override
301306
public IndexRequest createIndexRequest(
302307
String index,
303308
String docType,
304309
String key,
310+
String routing,
305311
XContentType contentType,
306312
byte[] document) {
307-
return new IndexRequest(index).id(key).source(document, contentType);
313+
IndexRequest req = new IndexRequest(index).id(key);
314+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
315+
req = req.routing(routing);
316+
}
317+
return req.source(document, contentType);
308318
}
309319

310320
@Override
311-
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
312-
return new DeleteRequest(index, key);
321+
public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) {
322+
DeleteRequest req = new DeleteRequest(index, key);
323+
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
324+
req = req.routing(routing);
325+
}
326+
return req;
313327
}
314328
}
315329

flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
6767
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
6868
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
69+
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS;
6970
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
7071
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
7172
import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
@@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory
104105
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
105106
PARTIAL_CACHE_MAX_ROWS,
106107
PARTIAL_CACHE_CACHE_MISSING_KEY,
107-
MAX_RETRIES)
108+
MAX_RETRIES,
109+
PARTITION_ROUTING_FIELDS)
108110
.collect(Collectors.toSet());
109111

110112
@Override

0 commit comments

Comments
 (0)