Skip to content

Commit 975ff9c

Browse files
authored
Register materialized views with the planner (#115)
1 parent c76604a commit 975ff9c

File tree

19 files changed

+215
-171
lines changed

19 files changed

+215
-171
lines changed

hoptimator

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
77
-Dorg.slf4j.simpleLogger.showLogName=false \
88
sqlline.SqlLine \
99
-ac sqlline.HoptimatorAppConfig \
10-
-u jdbc:hoptimator://fun=mysql -n "" -p "" -nn "Hoptimator" $@
10+
--verbose \
11+
-u "jdbc:hoptimator://fun=mysql" -n "" -p "" -nn "Hoptimator" $@
1112

Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package com.linkedin.hoptimator;
22

3+
import java.sql.Connection;
34
import java.sql.SQLException;
45
import java.sql.Wrapper;
5-
import java.util.Properties;
66

77

88
/** Registers a set of tables, possibly within schemas and sub-schemas. */
@@ -12,5 +12,5 @@ public interface Catalog {
1212

1313
String description();
1414

15-
void register(Wrapper parentSchema, Properties connectionProperties) throws SQLException;
15+
void register(Wrapper wrapper) throws SQLException;
1616
}

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
9393
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
9494
try {
9595
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
96-
PipelineRel.Implementor plan = DeploymentService.plan(root);
96+
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations());
9797
sqlline.output(plan.sql(conn.connectionProperties()).apply(SqlDialect.ANSI));
9898
} catch (SQLException e) {
9999
sqlline.error(e);
@@ -163,7 +163,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
163163
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
164164
try {
165165
Properties connectionProperties = conn.connectionProperties();
166-
Pipeline pipeline = DeploymentService.plan(root).pipeline("sink", connectionProperties);
166+
Pipeline pipeline = DeploymentService.plan(root, conn.materializations()).pipeline("sink", connectionProperties);
167167
List<String> specs = new ArrayList<>();
168168
for (Source source : pipeline.sources()) {
169169
specs.addAll(DeploymentService.specify(source, connectionProperties));

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java

+21
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22

33
import java.sql.SQLException;
44
import java.sql.Statement;
5+
import java.util.ArrayList;
6+
import java.util.List;
57
import java.util.Properties;
8+
import java.util.stream.Collectors;
69

710
import org.apache.calcite.jdbc.CalciteConnection;
811
import org.apache.calcite.jdbc.CalcitePrepare;
12+
import org.apache.calcite.plan.RelOptMaterialization;
13+
import org.apache.calcite.rel.RelNode;
914

1015
import com.linkedin.hoptimator.util.DelegatingConnection;
1116

@@ -14,6 +19,7 @@ public class HoptimatorConnection extends DelegatingConnection {
1419

1520
private final CalciteConnection connection;
1621
private final Properties connectionProperties;
22+
private final List<RelOptMaterialization> materializations = new ArrayList<>();
1723

1824
public HoptimatorConnection(CalciteConnection connection, Properties connectionProperties) {
1925
super(connection);
@@ -37,4 +43,19 @@ public CalcitePrepare.Context createPrepareContext() {
3743
public CalciteConnection calciteConnection() {
3844
return connection;
3945
}
46+
47+
public void registerMaterialization(List<String> viewPath, String querySql) {
48+
String tableSql = "SELECT * FROM " + viewPath.stream().map(x -> "\"" + x + "\"").collect(Collectors.joining("."));
49+
RelNode tableRel = HoptimatorDriver.convert(this, tableSql).root.rel;
50+
RelNode queryRel = HoptimatorDriver.convert(this, querySql).root.rel;
51+
registerMaterialization(viewPath, tableRel, queryRel);
52+
}
53+
54+
private void registerMaterialization(List<String> viewPath, RelNode tableRel, RelNode queryRel) {
55+
materializations.add(new RelOptMaterialization(tableRel, queryRel, null, viewPath));
56+
}
57+
58+
public List<RelOptMaterialization> materializations() {
59+
return materializations;
60+
}
4061
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.apache.calcite.jdbc.CalciteSchema;
3232
import org.apache.calcite.rel.RelRoot;
3333
import org.apache.calcite.rel.type.RelDataType;
34+
import org.apache.calcite.rel.type.RelDataTypeImpl;
3435
import org.apache.calcite.rel.type.RelDataTypeFactory;
3536
import org.apache.calcite.rel.type.RelDataTypeSystem;
37+
import org.apache.calcite.rel.type.RelProtoDataType;
3638
import org.apache.calcite.schema.Function;
3739
import org.apache.calcite.schema.SchemaPlus;
3840
import org.apache.calcite.schema.Table;
@@ -65,7 +67,6 @@
6567
import com.linkedin.hoptimator.MaterializedView;
6668
import com.linkedin.hoptimator.Pipeline;
6769
import com.linkedin.hoptimator.View;
68-
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
6970
import com.linkedin.hoptimator.util.DeploymentService;
7071
import com.linkedin.hoptimator.util.planner.PipelineRel;
7172

@@ -74,10 +75,12 @@
7475

7576
public final class HoptimatorDdlExecutor extends ServerDdlExecutor {
7677

78+
private final HoptimatorConnection connection;
7779
private final Properties connectionProperties;
7880

79-
public HoptimatorDdlExecutor(Properties connectionProperties) {
80-
this.connectionProperties = connectionProperties;
81+
public HoptimatorDdlExecutor(HoptimatorConnection connection) {
82+
this.connection = connection;
83+
this.connectionProperties = connection.connectionProperties();
8184
}
8285

8386
@SuppressWarnings("unused") // used via reflection
@@ -118,9 +121,9 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
118121
List<String> viewPath = new ArrayList<>();
119122
viewPath.addAll(schemaPath);
120123
viewPath.add(viewName);
121-
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
122-
sql, schemaPath, viewPath, false);
123-
ViewTable viewTable = (ViewTable) viewTableMacro.apply(Collections.singletonList(connectionProperties));
124+
CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(connection, sql);
125+
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
126+
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
124127
View view = new View(viewPath, sql);
125128
try {
126129
ValidationService.validateOrThrow(viewTable);
@@ -177,9 +180,10 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
177180

178181
// Table does not exist. Create it.
179182
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
180-
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
181-
sql, schemaPath, viewPath, false);
182-
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro, connectionProperties);
183+
CalcitePrepare.AnalyzeViewResult analyzed = HoptimatorDriver.analyzeView(connection, sql);
184+
RelProtoDataType protoType = RelDataTypeImpl.proto(analyzed.rowType);
185+
ViewTable viewTable = new ViewTable(Object.class, protoType, sql, schemaPath, viewPath);
186+
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTable);
183187
RelDataType viewRowType = materializedViewTable.getRowType(typeFactory);
184188

185189
// Support "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
@@ -200,9 +204,8 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
200204
}
201205

202206
// Plan a pipeline to materialize the view.
203-
RelRoot root = new HoptimatorDriver.Prepare(connectionProperties)
204-
.convert(context, sql).root;
205-
PipelineRel.Implementor plan = DeploymentService.plan(root);
207+
RelRoot root = new HoptimatorDriver.Prepare(connection).convert(context, sql).root;
208+
PipelineRel.Implementor plan = DeploymentService.plan(root, connection.materializations());
206209
plan.setSink(database, sinkPath, rowType, Collections.emptyMap());
207210
Pipeline pipeline = plan.pipeline(viewName, connectionProperties);
208211

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java

+74-38
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
import java.io.IOException;
44
import java.sql.Connection;
55
import java.sql.SQLException;
6+
import java.sql.DriverManager;
7+
import java.sql.DriverPropertyInfo;
68
import java.util.Properties;
79
import java.util.function.Supplier;
810

911
import org.apache.calcite.avatica.ConnectStringParser;
10-
import org.apache.calcite.avatica.DriverVersion;
1112
import org.apache.calcite.jdbc.CalciteConnection;
1213
import org.apache.calcite.jdbc.CalcitePrepare;
1314
import org.apache.calcite.jdbc.Driver;
@@ -17,102 +18,137 @@
1718
import org.apache.calcite.sql.SqlNode;
1819
import org.apache.calcite.sql.parser.SqlParser;
1920

21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import java.util.logging.LogManager;
24+
2025
import com.linkedin.hoptimator.Catalog;
21-
import com.linkedin.hoptimator.util.WrappedSchemaPlus;
2226

2327

2428
/** Driver for :jdbc:hoptimator:// connections. */
25-
public class HoptimatorDriver extends Driver {
29+
public class HoptimatorDriver implements java.sql.Driver {
30+
private static final Logger logger = LoggerFactory.getLogger(HoptimatorDriver.class);
31+
private static final HoptimatorDriver INSTANCE = new HoptimatorDriver();
32+
33+
public static final String CONNECTION_PREFIX = "jdbc:hoptimator://";
34+
35+
static {{
36+
try {
37+
DriverManager.registerDriver(INSTANCE);
38+
} catch (SQLException e) {
39+
throw new RuntimeException("Failed to register Hoptimator driver.", e);
40+
}
41+
}}
2642

27-
public HoptimatorDriver() {
28-
super();
43+
public static CalcitePrepare.ConvertResult convert(HoptimatorConnection conn, String sql) {
44+
CalcitePrepare.Context context = conn.createPrepareContext();
45+
return new Prepare(conn).convert(context, sql);
2946
}
3047

31-
private HoptimatorDriver(Supplier<CalcitePrepare> prepareFactory) {
32-
super(prepareFactory);
48+
public static CalcitePrepare.AnalyzeViewResult analyzeView(HoptimatorConnection conn, String sql) {
49+
CalcitePrepare.Context context = conn.createPrepareContext();
50+
return new Prepare(conn).analyzeView(context, sql, false);
3351
}
3452

35-
static {
36-
new HoptimatorDriver().register();
53+
@Override
54+
public boolean acceptsURL(String url) {
55+
return url.startsWith(CONNECTION_PREFIX);
3756
}
3857

39-
public static CalcitePrepare.ConvertResult convert(HoptimatorConnection conn, String sql) {
40-
CalcitePrepare.Context context = conn.createPrepareContext();
41-
return new Prepare(conn.connectionProperties()).convert(context, sql);
58+
@Override
59+
public int getMajorVersion() {
60+
return 0;
4261
}
4362

4463
@Override
45-
protected String getConnectStringPrefix() {
46-
return "jdbc:hoptimator://";
64+
public int getMinorVersion() {
65+
return 1;
4766
}
4867

4968
@Override
50-
protected DriverVersion createDriverVersion() {
51-
return DriverVersion.load(this.getClass(), "hoptimator.properties", "hoptimator", "0", "hoptimator", "0");
69+
public java.util.logging.Logger getParentLogger() {
70+
return LogManager.getLogManager().getLogger(logger.getName());
71+
}
72+
73+
@Override
74+
public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
75+
return new DriverPropertyInfo[0];
76+
}
77+
78+
@Override
79+
public boolean jdbcCompliant() {
80+
return false;
5281
}
5382

5483
@Override
5584
public Connection connect(String url, Properties props) throws SQLException {
56-
if (!url.startsWith(getConnectStringPrefix())) {
85+
if (!acceptsURL(url)) {
5786
return null;
5887
}
5988
try {
6089
// Load properties from the URL and from getConnection()'s properties.
6190
// URL properties take precedence.
6291
Properties properties = new Properties();
6392
properties.putAll(props); // via getConnection()
64-
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));
65-
66-
if (prepareFactory == null) {
67-
// funky way of extending Driver with a custom Prepare:
68-
return withPrepareFactory(() -> new Prepare(properties))
69-
.connect(url, properties);
70-
}
71-
Connection connection = super.connect(url, properties);
93+
properties.putAll(ConnectStringParser.parse(url.substring(CONNECTION_PREFIX.length())));
94+
95+
// For [Calcite]Driver.connect() to work, we need [Calcite]Driver.createPrepare()
96+
// to return our Prepare. But our Prepare requires a HoptimatorConnection, which
97+
// we cannot construct yet.
98+
ConnectionHolder holder = new ConnectionHolder();
99+
Connection connection = new Driver().withPrepareFactory(() -> new Prepare(holder))
100+
.connect("jdbc:calcite:", properties);
72101
if (connection == null) {
73-
throw new IOException("Could not connect to " + url);
102+
throw new IOException("Could not connect to " + url + ": Could not create Calcite connection.");
74103
}
75-
connection.setAutoCommit(true); // to prevent rollback()
76104
CalciteConnection calciteConnection = (CalciteConnection) connection;
105+
calciteConnection.setAutoCommit(true); // to prevent rollback()
77106
SchemaPlus rootSchema = calciteConnection.getRootSchema();
78107

79108
// built-in schemas
80109
rootSchema.add("DEFAULT", new AbstractSchema());
81110

82111
calciteConnection.setSchema("DEFAULT");
112+
113+
HoptimatorConnection hoptimatorConnection = new HoptimatorConnection(calciteConnection, properties);
114+
holder.connection = hoptimatorConnection;
83115

84-
WrappedSchemaPlus wrappedRootSchema = new WrappedSchemaPlus(rootSchema);
116+
Wrapped wrapped = new Wrapped(hoptimatorConnection, rootSchema);
85117
String[] catalogs = properties.getProperty("catalogs", "").split(",");
86118

87119
if (catalogs.length == 0 || catalogs[0].length() == 0) {
88120
// load all catalogs (typical usage)
89121
for (Catalog catalog : CatalogService.catalogs()) {
90-
catalog.register(wrappedRootSchema, properties);
122+
catalog.register(wrapped);
91123
}
92124
} else {
93125
// load specific catalogs when loaded as `jdbc:hoptimator://catalogs=foo,bar`
94126
for (String catalog : catalogs) {
95-
CatalogService.catalog(catalog).register(wrappedRootSchema, properties);
127+
CatalogService.catalog(catalog).register(wrapped);
96128
}
97129
}
98130

99-
return new HoptimatorConnection(calciteConnection, properties);
131+
return hoptimatorConnection;
100132
} catch (Exception e) {
101133
throw new SQLException("Problem loading " + url, e);
102134
}
103135
}
104136

105-
@Override
106-
public Driver withPrepareFactory(Supplier<CalcitePrepare> prepareFactory) {
107-
return new HoptimatorDriver(prepareFactory);
137+
private static class ConnectionHolder {
138+
HoptimatorConnection connection;
108139
}
109140

110141
public static class Prepare extends CalcitePrepareImpl {
111142

112-
private final Properties connectionProperties;
143+
private final ConnectionHolder holder;
144+
145+
Prepare(ConnectionHolder holder) {
146+
this.holder = holder;
147+
}
113148

114-
Prepare(Properties connectionProperties) {
115-
this.connectionProperties = connectionProperties;
149+
Prepare(HoptimatorConnection connection) {
150+
this.holder = new ConnectionHolder();
151+
this.holder.connection = connection;
116152
}
117153

118154
@Override
@@ -122,7 +158,7 @@ protected SqlParser.Config parserConfig() {
122158

123159
@Override
124160
public void executeDdl(Context context, SqlNode node) {
125-
new HoptimatorDdlExecutor(connectionProperties).executeDdl(context, node);
161+
new HoptimatorDdlExecutor(holder.connection).executeDdl(context, node);
126162
}
127163
}
128164
}

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/MaterializedViewTable.java

-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.Collections;
44
import java.util.List;
5-
import java.util.Properties;
65

76
import org.apache.calcite.plan.RelOptTable;
87
import org.apache.calcite.rel.RelNode;
@@ -13,8 +12,6 @@
1312
import org.apache.calcite.schema.impl.AbstractTable;
1413
import org.apache.calcite.schema.impl.ViewTable;
1514

16-
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
17-
1815

1916
public class MaterializedViewTable extends AbstractTable implements TranslatableTable {
2017

@@ -24,10 +21,6 @@ public MaterializedViewTable(ViewTable viewTable) {
2421
this.viewTable = viewTable;
2522
}
2623

27-
public MaterializedViewTable(HoptimatorViewTableMacro viewTableMacro, Properties connectionProperties) {
28-
this((ViewTable) viewTableMacro.apply(Collections.singletonList(connectionProperties)));
29-
}
30-
3124
public ViewTable viewTable() {
3225
return viewTable;
3326
}

0 commit comments

Comments
 (0)