Skip to content

Commit 48aeb54

Browse files
committed
added extension ObjectMapKvSourceRecordMapper
1 parent baa91bc commit 48aeb54

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.github.castorm.kafka.connect.http.record;
2+
3+
/*-
4+
* #%L
5+
* Kafka Connect HTTP
6+
* %%
7+
* Copyright (C) 2020 - 2024 Cástor Rodríguez
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.fasterxml.jackson.core.type.TypeReference;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import com.github.castorm.kafka.connect.http.model.Offset;
26+
import com.github.castorm.kafka.connect.http.record.model.KvRecord;
27+
import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper;
28+
import lombok.RequiredArgsConstructor;
29+
import org.apache.kafka.connect.source.SourceRecord;
30+
31+
import java.time.Instant;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.function.Function;
35+
36+
import static java.util.Collections.emptyMap;
37+
38+
@RequiredArgsConstructor
39+
public class ObjectMapKvSourceRecordMapper implements KvSourceRecordMapper {
40+
41+
private static final String KEY_FIELD_NAME = "key";
42+
private static final String TIMESTAMP_FIELD_NAME = "timestamp";
43+
44+
private static Map<String, ?> sourcePartition = emptyMap();
45+
46+
private final Function<Map<String, ?>, SourceRecordMapperConfig> configFactory;
47+
48+
private SourceRecordMapperConfig config;
49+
50+
// Jackson ObjectMapper for deserialization
51+
private static final ObjectMapper objectMapper = new ObjectMapper();
52+
53+
public ObjectMapKvSourceRecordMapper() {
54+
this(SourceRecordMapperConfig::new);
55+
}
56+
57+
@Override
58+
public void configure(Map<String, ?> settings) {
59+
config = configFactory.apply(settings);
60+
}
61+
62+
@Override
63+
public SourceRecord map(KvRecord record) {
64+
65+
Offset offset = record.getOffset();
66+
Long timestamp = offset.getTimestamp().map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis);
67+
68+
String key = record.getKey();
69+
70+
Map<String, Object> deserializedValue;
71+
try {
72+
deserializedValue = objectMapper.readValue(record.getValue().toString(), new TypeReference<Map<String, Object>>() {});
73+
} catch (Exception e) {
74+
throw new RuntimeException("Failed to deserialize record value", e);
75+
}
76+
77+
deserializedValue.put(KEY_FIELD_NAME, key);
78+
deserializedValue.put(TIMESTAMP_FIELD_NAME, timestamp);
79+
80+
return new SourceRecord(
81+
sourcePartition,
82+
offset.toMap(),
83+
config.getTopic(),
84+
null,
85+
null,
86+
key,
87+
null,
88+
deserializedValue,
89+
timestamp);
90+
}
91+
}

0 commit comments

Comments
 (0)