19
19
import org .apache .calcite .util .Litmus ;
20
20
import org .apache .calcite .util .Pair ;
21
21
22
+ import com .google .common .annotations .VisibleForTesting ;
22
23
import com .google .common .collect .ImmutableList ;
23
24
24
25
import com .linkedin .hoptimator .Deployable ;
@@ -44,6 +45,8 @@ public interface PipelineRel extends RelNode {
44
45
45
46
Convention CONVENTION = new Convention .Impl ("PIPELINE" , PipelineRel .class );
46
47
String KEY_OPTION = "keys" ;
48
+ String KEY_PREFIX_OPTION = "keyPrefix" ;
49
+ String KEY_TYPE_OPTION = "keyType" ;
47
50
String KEY_PREFIX = "KEY_" ;
48
51
49
52
void implement (Implementor implementor ) throws SQLException ;
@@ -95,7 +98,8 @@ public void setSink(String database, List<String> path, RelDataType rowType, Map
95
98
this .sinkOptions = addKeysAsOption (options , rowType );
96
99
}
97
100
98
- private Map <String , String > addKeysAsOption (Map <String , String > options , RelDataType rowType ) {
101
+ @ VisibleForTesting
102
+ static Map <String , String > addKeysAsOption (Map <String , String > options , RelDataType rowType ) {
99
103
Map <String , String > newOptions = new LinkedHashMap <>(options );
100
104
101
105
RelDataType flattened = DataTypeUtils .flatten (rowType , new SqlTypeFactoryImpl (RelDataTypeSystem .DEFAULT ));
@@ -104,12 +108,15 @@ private Map<String, String> addKeysAsOption(Map<String, String> options, RelData
104
108
if (newOptions .containsKey (KEY_OPTION )) {
105
109
return newOptions ;
106
110
}
111
+
107
112
String keyString = flattened .getFieldList ().stream ()
108
113
.map (x -> x .getName ().replaceAll ("\\ $" , "_" ))
109
114
.filter (name -> name .startsWith (KEY_PREFIX ))
110
115
.collect (Collectors .joining (";" ));
111
116
if (!keyString .isEmpty ()) {
112
117
newOptions .put (KEY_OPTION , keyString );
118
+ newOptions .put (KEY_PREFIX_OPTION , KEY_PREFIX );
119
+ newOptions .put (KEY_TYPE_OPTION , "RECORD" );
113
120
}
114
121
return newOptions ;
115
122
}
0 commit comments