1
1
package io .apicurio .registry .storage .impl .kafkasql .serde ;
2
2
3
- import java .util .Optional ;
4
-
3
+ import com .fasterxml .jackson .databind .DeserializationFeature ;
4
+ import com .fasterxml .jackson .databind .ObjectMapper ;
5
+ import io .apicurio .registry .storage .impl .kafkasql .KafkaSqlMessage ;
5
6
import org .apache .kafka .common .header .Header ;
6
7
import org .apache .kafka .common .header .Headers ;
7
8
import org .apache .kafka .common .serialization .Deserializer ;
8
9
import org .slf4j .Logger ;
9
10
import org .slf4j .LoggerFactory ;
10
11
11
- import com .fasterxml .jackson .databind .DeserializationFeature ;
12
- import com .fasterxml .jackson .databind .ObjectMapper ;
13
-
14
- import io .apicurio .registry .storage .impl .kafkasql .KafkaSqlMessage ;
12
+ import java .util .Optional ;
15
13
16
14
/**
17
15
* Kafka deserializer responsible for deserializing the value of a KSQL Kafka message.
@@ -31,7 +29,7 @@ public class KafkaSqlValueDeserializer implements Deserializer<KafkaSqlMessage>
31
29
*/
32
30
@ Override
33
31
public KafkaSqlMessage deserialize (String topic , byte [] data ) {
34
- // Not supported - must deserializer with headers.
32
+ // Not supported - must deserialize with headers.
35
33
return null ;
36
34
}
37
35
@@ -48,13 +46,13 @@ public KafkaSqlMessage deserialize(String topic, Headers headers, byte[] data) {
48
46
try {
49
47
String messageType = extractMessageType (headers );
50
48
if (messageType == null ) {
51
- log .error ("Message missing required header: mt" );
49
+ log .error ("Message missing required message type header: mt" );
52
50
return null ;
53
51
}
54
52
55
53
Class <? extends KafkaSqlMessage > msgClass = KafkaSqlMessageIndex .lookup (messageType );
56
54
if (msgClass == null ) {
57
- throw new Exception ("Unknown KafkaSql message class: " + msgClass );
55
+ throw new Exception ("Unknown KafkaSql message class for ' " + messageType + "'" );
58
56
}
59
57
KafkaSqlMessage message = mapper .readValue (data , msgClass );
60
58
return message ;
@@ -66,8 +64,6 @@ public KafkaSqlMessage deserialize(String topic, Headers headers, byte[] data) {
66
64
67
65
/**
68
66
* Extracts the UUID from the message. The UUID should be found in a message header.
69
- *
70
- * @param record
71
67
*/
72
68
private static String extractMessageType (Headers headers ) {
73
69
return Optional .ofNullable (headers .headers ("mt" ))
0 commit comments