Skip to content

Commit 8f5f195

Browse files
authored
Support hints via jdbc connection properties (#116)
* Support hints via jdbc connection properties * Fix PipelineReconciler deletion errors * checkstyle * fixes * more checkstyle
1 parent 975ff9c commit 8f5f195

File tree

30 files changed

+134
-74
lines changed

30 files changed

+134
-74
lines changed

README.md

+62-4
Original file line numberDiff line numberDiff line change
@@ -111,21 +111,79 @@ To push a Flink job directly to the Flink deployment created above, `kubectl app
111111

112112
## The SQL CLI
113113

114-
The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`. The CLI includes some additional commands. See `!intro`.
114+
The `./hoptimator` script launches the [sqlline](https://github.com/julianhyde/sqlline) SQL CLI pre-configured to connect to `jdbc:hoptimator://`.
115+
The CLI includes some additional commands. See `!intro`.
115116

116117
## The JDBC Driver
117118

118119
To use Hoptimator from Java code, or from anything that supports JDBC, use the `jdbc:hoptimator://` JDBC driver.
119120

120121
## The Operator
121122

122-
`hoptimator-operator` turns materialized views into real data pipelines.
123+
`hoptimator-operator` turns materialized views into real data pipelines. The name operator comes from the Kubernetes Operator pattern.
124+
`PipelineOperatorApp` is intended to be an entry point for a running application that can listen to and reconcile the resources created in Kubernetes by the K8s Deployers.
125+
See [hoptimator-operator-deployment.yaml](deploy/hoptimator-operator-deployment.yaml) for K8s pod deployment of the operator.
123126

124127
## Extending Hoptimator
125128

126-
Hoptimator can be extended via `TableTemplates`:
129+
Hoptimator is extensible via `hoptimator-api`, which provides hooks for deploying, validating, and configuring the elements of a pipeline,
130+
including external objects (e.g. Kafka topics) and data plane connectors (e.g. Flink connectors).
131+
To deploy a source or sink, implement `Deployer<Source>`.
132+
To deploy a job, implement `Deployer<Job>`.
127133

134+
In addition, the `k8s` catalog is itself highly extensible via `TableTemplates` and `JobTemplates`.
135+
Generally, you can get Hoptimator to do what you want without writing any new code.
136+
137+
### Table Templates
138+
139+
`TableTemplates` let you specify how sources and sinks should be included in a pipeline. For example see [kafkadb.yaml](deploy/samples/kafkadb.yaml).
140+
141+
In this case, any tables within `kafka-database` will get deployed as `KafkaTopics` and use `kafka` connectors.
142+
143+
### Job Templates
144+
145+
`JobTemplates` are similar, but can embed SQL. For example see [flink-template.yaml](deploy/samples/flink-template.yaml).
146+
147+
In this case, any jobs created with this template will get deployed as `FlinkSessionJobs` within the `flink` namespace.
148+
149+
### Configuration
150+
151+
The ``{{ }}`` sections you see in the templates are variable placeholders that will be filled in by the Deployer.
152+
See [Template.java](hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java) for how to specify templates.
153+
154+
While Deployers are extensible, today the primary deployer is to Kubernetes. These deployers
155+
[K8sSourceDeployer](hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sSourceDeployer.java) (for table-templates)
156+
and [K8sJobDeployer](hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java) (for job-templates)
157+
provide a few template defaults that you can choose to include in your templates:
158+
159+
K8sSourceDeployer: `name, database, schema, table`
160+
161+
K8sJobDeployer: `name, database, schema, table, sql, flinksql, flinkconfigs`
162+
163+
However, it is often a case where you want to add additional information to the templates that will be passed through during Source or Job creation.
164+
There are two mechanisms to achieve this:
165+
166+
#### ConfigProvider
167+
168+
The ConfigProvider interface allows you to load additional configuration information that will be used during Source or Job creation.
169+
170+
The [K8sConfigProvider](hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConfigProvider.java) is the default one used in the K8s deployers.
171+
K8sConfigProvider contains the ability to read configuration information via a Kubernetes configmap, `hoptimator-configmap`.
172+
See [hoptimator-configmap.yaml](deploy/config/hoptimator-configmap.yaml) for information on how to use configmaps.
173+
174+
Configmaps are meant to be used for static configuration information applicable to the namespace `hoptimator-configmap` belongs to.
175+
176+
#### Hints
177+
178+
Users may want to provide additional information for their Job or Sink creation at runtime.
179+
This can be done by adding hints as JDBC properties.
180+
181+
Hints are key-value pairs separated by an equals sign. Multiple hints are separated by a comma.
182+
183+
For example, to specify the number of kafka partitions and the flink parallelism, you could add the following hints to the query:
128184
```
129-
$ kubectl apply -f my-table-template.yaml
185+
jdbc:hoptimator://hints=kafka.partitions=4,flink.parallelism=2
130186
```
187+
These fields can then be added to templates as `{{kafka.partitions}}` or `{{flink.parallelism}}` where applicable.
131188

189+
Note that hints are simply recommendations, if the planner plans a different pipeline, they will be ignored.

deploy/samples/flink-template.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ spec:
1717
args:
1818
- {{flinksql}}
1919
jarURI: file:///opt/hoptimator-flink-runner.jar
20-
parallelism: 1
20+
parallelism: {{flink.parallelism:1}}
2121
upgradeMode: stateless
2222
state: running

deploy/samples/kafkadb.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ spec:
2525
strimzi.io/cluster: one
2626
spec:
2727
topicName: {{table}}
28-
partitions: 1
28+
partitions: {{kafka.partitions:1}}
2929
replicas: 1
3030
config:
3131
retention.ms: 7200000

hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.linkedin.hoptimator;
22

3-
import java.sql.Connection;
43
import java.sql.SQLException;
54
import java.sql.Wrapper;
65

hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.List;
44
import java.util.function.Function;
5-
import java.util.stream.Collectors;
65

76

87
public class MaterializedView extends View implements Deployable {

hoptimator-api/src/main/java/com/linkedin/hoptimator/Pipeline.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.linkedin.hoptimator;
22

3-
import java.sql.SQLException;
43
import java.util.Collection;
54

65

@@ -9,9 +8,9 @@
98
*/
109
public class Pipeline {
1110

12-
private Collection<Source> sources;
13-
private Sink sink;
14-
private Job job;
11+
private final Collection<Source> sources;
12+
private final Sink sink;
13+
private final Job job;
1514

1615
public Pipeline(Collection<Source> sources, Sink sink, Job job) {
1716
this.sources = sources;

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import org.apache.calcite.rel.RelRoot;
1212
import org.jline.reader.Completer;
1313

14-
import com.linkedin.hoptimator.Job;
1514
import com.linkedin.hoptimator.Pipeline;
16-
import com.linkedin.hoptimator.Sink;
1715
import com.linkedin.hoptimator.Source;
1816
import com.linkedin.hoptimator.SqlDialect;
1917
import com.linkedin.hoptimator.jdbc.HoptimatorConnection;
@@ -93,7 +91,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
9391
HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection();
9492
try {
9593
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
96-
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations());
94+
PipelineRel.Implementor plan = DeploymentService.plan(root, conn.materializations(), conn.connectionProperties());
9795
sqlline.output(plan.sql(conn.connectionProperties()).apply(SqlDialect.ANSI));
9896
} catch (SQLException e) {
9997
sqlline.error(e);
@@ -163,7 +161,7 @@ public void execute(String line, DispatchCallback dispatchCallback) {
163161
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
164162
try {
165163
Properties connectionProperties = conn.connectionProperties();
166-
Pipeline pipeline = DeploymentService.plan(root, conn.materializations()).pipeline("sink", connectionProperties);
164+
Pipeline pipeline = DeploymentService.plan(root, conn.materializations(), conn.connectionProperties()).pipeline("sink", connectionProperties);
167165
List<String> specs = new ArrayList<>();
168166
for (Source source : pipeline.sources()) {
169167
specs.addAll(DeploymentService.specify(source, connectionProperties));

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@
3131
import org.apache.calcite.jdbc.CalciteSchema;
3232
import org.apache.calcite.rel.RelRoot;
3333
import org.apache.calcite.rel.type.RelDataType;
34-
import org.apache.calcite.rel.type.RelDataTypeImpl;
3534
import org.apache.calcite.rel.type.RelDataTypeFactory;
35+
import org.apache.calcite.rel.type.RelDataTypeImpl;
3636
import org.apache.calcite.rel.type.RelDataTypeSystem;
3737
import org.apache.calcite.rel.type.RelProtoDataType;
3838
import org.apache.calcite.schema.Function;
3939
import org.apache.calcite.schema.SchemaPlus;
4040
import org.apache.calcite.schema.Table;
41-
import org.apache.calcite.schema.TranslatableTable;
4241
import org.apache.calcite.schema.impl.ViewTable;
4342
import org.apache.calcite.server.DdlExecutor;
4443
import org.apache.calcite.server.ServerDdlExecutor;
@@ -205,10 +204,10 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
205204

206205
// Plan a pipeline to materialize the view.
207206
RelRoot root = new HoptimatorDriver.Prepare(connection).convert(context, sql).root;
208-
PipelineRel.Implementor plan = DeploymentService.plan(root, connection.materializations());
207+
PipelineRel.Implementor plan = DeploymentService.plan(root, connection.materializations(), connectionProperties);
209208
plan.setSink(database, sinkPath, rowType, Collections.emptyMap());
210209
Pipeline pipeline = plan.pipeline(viewName, connectionProperties);
211-
210+
212211
MaterializedView hook = new MaterializedView(database, viewPath, sql, plan.sql(connectionProperties), pipeline);
213212
// TODO support CREATE ... WITH (options...)
214213
ValidationService.validateOrThrow(hook);
@@ -217,8 +216,8 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con
217216
DeploymentService.update(hook, connectionProperties);
218217
} else {
219218
DeploymentService.create(hook, connectionProperties);
220-
}
221-
219+
}
220+
222221
schemaPlus.add(viewName, materializedViewTable);
223222
} catch (Exception e) {
224223
throw new RuntimeException("Cannot CREATE MATERIALIZED VIEW in " + schemaName + ": " + e.getMessage(), e);

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDriver.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import java.io.IOException;
44
import java.sql.Connection;
5-
import java.sql.SQLException;
65
import java.sql.DriverManager;
76
import java.sql.DriverPropertyInfo;
7+
import java.sql.SQLException;
88
import java.util.Properties;
9-
import java.util.function.Supplier;
9+
import java.util.logging.LogManager;
1010

1111
import org.apache.calcite.avatica.ConnectStringParser;
1212
import org.apache.calcite.jdbc.CalciteConnection;
@@ -17,10 +17,8 @@
1717
import org.apache.calcite.schema.impl.AbstractSchema;
1818
import org.apache.calcite.sql.SqlNode;
1919
import org.apache.calcite.sql.parser.SqlParser;
20-
2120
import org.slf4j.Logger;
2221
import org.slf4j.LoggerFactory;
23-
import java.util.logging.LogManager;
2422

2523
import com.linkedin.hoptimator.Catalog;
2624

@@ -32,13 +30,15 @@ public class HoptimatorDriver implements java.sql.Driver {
3230

3331
public static final String CONNECTION_PREFIX = "jdbc:hoptimator://";
3432

35-
static {{
36-
try {
37-
DriverManager.registerDriver(INSTANCE);
38-
} catch (SQLException e) {
39-
throw new RuntimeException("Failed to register Hoptimator driver.", e);
33+
static {
34+
{
35+
try {
36+
DriverManager.registerDriver(INSTANCE);
37+
} catch (SQLException e) {
38+
throw new RuntimeException("Failed to register Hoptimator driver.", e);
39+
}
4040
}
41-
}}
41+
}
4242

4343
public static CalcitePrepare.ConvertResult convert(HoptimatorConnection conn, String sql) {
4444
CalcitePrepare.Context context = conn.createPrepareContext();
@@ -77,7 +77,7 @@ public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws
7777

7878
@Override
7979
public boolean jdbcCompliant() {
80-
return false;
80+
return false;
8181
}
8282

8383
@Override
@@ -109,7 +109,7 @@ public Connection connect(String url, Properties props) throws SQLException {
109109
rootSchema.add("DEFAULT", new AbstractSchema());
110110

111111
calciteConnection.setSchema("DEFAULT");
112-
112+
113113
HoptimatorConnection hoptimatorConnection = new HoptimatorConnection(calciteConnection, properties);
114114
holder.connection = hoptimatorConnection;
115115

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/MaterializedViewTable.java

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.linkedin.hoptimator.jdbc;
22

3-
import java.util.Collections;
43
import java.util.List;
54

65
import org.apache.calcite.plan.RelOptTable;

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ValidationService.java

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.calcite.schema.SchemaPlus;
1313
import org.apache.calcite.schema.Table;
1414

15-
import com.linkedin.hoptimator.Validated;
1615
import com.linkedin.hoptimator.Validator;
1716
import com.linkedin.hoptimator.ValidatorProvider;
1817

hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/schema/UtilityCatalog.java

-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package com.linkedin.hoptimator.jdbc.schema;
22

3-
import java.sql.Connection;
43
import java.sql.SQLException;
54
import java.sql.Wrapper;
65
import java.util.LinkedHashMap;
76
import java.util.Map;
8-
import java.util.Properties;
97

108
import org.apache.calcite.schema.SchemaPlus;
119
import org.apache.calcite.schema.Table;

hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ protected void run(URI resource, String jdbcProperties) throws IOException {
4949
Quidem.Config config = Quidem.configBuilder()
5050
.withReader(r)
5151
.withWriter(w)
52-
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x + jdbcProperties))
52+
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x + ";" + jdbcProperties))
5353
.withCommandHandler(new CustomCommandHandler())
5454
.build();
5555
new Quidem(config).execute();
@@ -82,7 +82,7 @@ public void execute(Context context, boolean execute) throws Exception {
8282
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
8383
String []parts = line.split(" ", 2);
8484
String pipelineName = parts.length == 2 ? parts[1] : "test";
85-
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList())
85+
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), conn.connectionProperties())
8686
.pipeline(pipelineName, conn.connectionProperties());
8787
List<String> specs = new ArrayList<>();
8888
for (Source source : pipeline.sources()) {

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package com.linkedin.hoptimator.k8s;
22

33
import java.sql.SQLException;
4-
import java.util.ArrayList;
54
import java.util.Collection;
65
import java.util.Collections;
76
import java.util.HashMap;
8-
import java.util.List;
97
import java.util.Map;
108

119
import io.kubernetes.client.common.KubernetesListObject;
@@ -158,7 +156,7 @@ public void updateStatus(T obj, Object status) throws SQLException {
158156

159157
private void checkResponse(KubernetesApiResponse<?> resp) throws SQLException {
160158
if (!resp.isSuccess()) {
161-
throw new SQLException(resp.getStatus().getMessage() + ": " + context);
159+
throw new SQLException(resp.getStatus().getMessage() + ": " + context, null, resp.getHttpStatusCode());
162160
}
163161
}
164162
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sCatalog.java

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.sql.SQLException;
44
import java.sql.Wrapper;
5-
import java.util.Properties;
65

76
import org.apache.calcite.schema.SchemaPlus;
87
import org.slf4j.Logger;

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDatabaseTable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public K8sDatabaseTable(K8sContext context, K8sEngineTable engines) {
5252
public void addDatabases(SchemaPlus parentSchema, Properties connectionProperties) {
5353
for (Row row : rows()) {
5454
parentSchema.add(schemaName(row),
55-
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row, connectionProperties), parentSchema, dialect(row), engines.forDatabase(row.NAME), connectionProperties));
55+
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row, connectionProperties), parentSchema,
56+
dialect(row), engines.forDatabase(row.NAME), connectionProperties));
5657
}
5758
}
5859

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployer.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
import io.kubernetes.client.common.KubernetesListObject;
88
import io.kubernetes.client.common.KubernetesObject;
9-
import io.kubernetes.client.util.Yaml;
109
import io.kubernetes.client.openapi.models.V1OwnerReference;
11-
10+
import io.kubernetes.client.util.Yaml;
1211

1312
import com.linkedin.hoptimator.Deployer;
1413

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public <T extends Deployable> Collection<Deployer> deployers(T obj, Properties c
2525
} else if (obj instanceof View) {
2626
list.add(new K8sViewDeployer((View) obj, false, context));
2727
} else if (obj instanceof Job) {
28-
list.add(new K8sJobDeployer((Job) obj, context));
28+
list.add(new K8sJobDeployer((Job) obj, context, connectionProperties));
2929
} else if (obj instanceof Source) {
3030
list.add(new K8sSourceDeployer((Source) obj, context));
3131
}

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,20 @@ class K8sJobDeployer extends K8sYamlDeployer {
1919

2020
private static final String FLINK_CONFIG = "flink.config";
2121

22+
private final Properties connectionProperties;
2223
private final Job job;
2324
private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;
2425

25-
K8sJobDeployer(Job job, K8sContext context) {
26+
K8sJobDeployer(Job job, K8sContext context, Properties connectionProperties) {
2627
super(context);
28+
this.connectionProperties = connectionProperties;
2729
this.job = job;
2830
this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES);
2931
}
3032

3133
@Override
3234
public List<String> specify() throws SQLException {
33-
Properties properties = ConfigService.config(null, false, FLINK_CONFIG);
35+
Properties properties = ConfigService.config(this.connectionProperties, false, FLINK_CONFIG);
3436
properties.putAll(job.sink().options());
3537
Function<SqlDialect, String> sql = job.sql();
3638
String name = K8sUtils.canonicalizeName(job.sink().database(), job.name());

0 commit comments

Comments
 (0)