Skip to content

Commit 51a6e16

Browse files
authored
Allow venice driver to load multiple clusters under the same schema (#107)
1 parent bf2d446 commit 51a6e16

File tree

6 files changed

+39
-42
lines changed

6 files changed

+39
-42
lines changed

deploy/samples/venicedb.yaml

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
apiVersion: hoptimator.linkedin.com/v1alpha1
22
kind: Database
33
metadata:
4-
name: venice-cluster0
4+
name: venice
55
spec:
6-
schema: VENICE-CLUSTER0
7-
url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777
6+
schema: VENICE
7+
url: jdbc:venice://clusters=venice-cluster0;router.url=http://localhost:7777
88
dialect: Calcite
99

1010
---
1111

1212
apiVersion: hoptimator.linkedin.com/v1alpha1
1313
kind: TableTemplate
1414
metadata:
15-
name: venice-template-cluster0
15+
name: venice-template
1616
spec:
1717
databases:
18-
- venice-cluster0
18+
- venice
1919
connector: |
2020
connector = venice
2121
storeName = {{table}}

hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.linkedin.hoptimator.venice;
22

33
import java.io.IOException;
4+
import java.util.Arrays;
45
import java.util.HashMap;
6+
import java.util.List;
57
import java.util.Map;
68
import java.util.Optional;
79
import java.util.Properties;
@@ -35,8 +37,10 @@ public ClusterSchema(Properties properties) {
3537

3638
public void populate() throws InterruptedException, ExecutionException, IOException {
3739
tableMap.clear();
38-
String cluster = properties.getProperty("cluster");
39-
log.info("Loading Venice stores for cluster {}", cluster);
40+
String clusterStr = properties.getProperty("clusters");
41+
List<String> clusters = Arrays.asList(clusterStr.split(","));
42+
43+
log.info("Loading Venice stores for cluster {}", clusters);
4044

4145
String sslConfigPath = properties.getProperty("ssl-config-path");
4246
Optional<SSLFactory> sslFactory = Optional.empty();
@@ -47,12 +51,14 @@ public void populate() throws InterruptedException, ExecutionException, IOExcept
4751
sslFactory = Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName));
4852
}
4953

50-
try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) {
51-
String[] stores = controllerClient.queryStoreList(false).getStores();
52-
log.info("Loaded {} Venice stores.", stores.length);
53-
for (String store : stores) {
54-
StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
55-
tableMap.put(store, createVeniceStore(storeSchemaFetcher));
54+
for (String cluster : clusters) {
55+
try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) {
56+
String[] stores = controllerClient.queryStoreList(false).getStores();
57+
log.info("Loaded {} Venice stores.", stores.length);
58+
for (String store : stores) {
59+
StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store);
60+
tableMap.put(store, createVeniceStore(storeSchemaFetcher));
61+
}
5662
}
5763
}
5864
}

hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.io.IOException;
44
import java.sql.Connection;
55
import java.sql.SQLException;
6-
import java.util.Locale;
76
import java.util.Properties;
87

98
import org.apache.calcite.avatica.ConnectStringParser;
@@ -16,7 +15,6 @@
1615
/** JDBC driver for Venice stores. */
1716
public class VeniceDriver extends Driver {
1817
public static final String CATALOG_NAME = "VENICE";
19-
public static final String CONFIG_NAME = "venice.config";
2018

2119
static {
2220
new VeniceDriver().register();
@@ -41,14 +39,7 @@ public Connection connect(String url, Properties props) throws SQLException {
4139
Properties properties = new Properties();
4240
properties.putAll(props); // in case the driver is loaded via getConnection()
4341
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));
44-
String cluster = properties.getProperty("cluster");
45-
if (cluster == null) {
46-
throw new IllegalArgumentException("Missing required cluster property. Need: jdbc:venice://cluster=...");
47-
}
48-
cluster = cluster.toUpperCase(Locale.ROOT);
49-
if (!cluster.startsWith(CATALOG_NAME)) {
50-
cluster = CATALOG_NAME + "-" + cluster;
51-
}
42+
5243
try {
5344
Connection connection = super.connect(url, props);
5445
if (connection == null) {
@@ -60,7 +51,7 @@ public Connection connect(String url, Properties props) throws SQLException {
6051
SchemaPlus rootSchema = calciteConnection.getRootSchema();
6152
ClusterSchema schema = createClusterSchema(properties);
6253
schema.populate();
63-
rootSchema.add(cluster.toUpperCase(Locale.ROOT), schema);
54+
rootSchema.add(CATALOG_NAME, schema);
6455
return connection;
6556
} catch (Exception e) {
6657
throw new SQLException("Problem loading " + url, e);

hoptimator-venice/src/test/resources/venice-ddl-insert-all.id

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
!set outputformat mysql
22
!use k8s
33

4-
insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store";
4+
insert into "VENICE"."test-store-1" select * from "VENICE"."test-store";
55
apiVersion: flink.apache.org/v1beta1
66
kind: FlinkSessionJob
77
metadata:
8-
name: venice-cluster0-test-store-1
8+
name: venice-test-store-1
99
namespace: flink
1010
spec:
1111
deploymentName: basic-session-deployment
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17-
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19-
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
15+
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17+
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
!set outputformat mysql
22
!use k8s
33

4-
insert into "VENICE-CLUSTER0"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE-CLUSTER0"."test-store";
4+
insert into "VENICE"."test-store-1" ("KEY_id", "intField") select "KEY_id", "stringField" from "VENICE"."test-store";
55
apiVersion: flink.apache.org/v1beta1
66
kind: FlinkSessionJob
77
metadata:
8-
name: venice-cluster0-test-store-1
8+
name: venice-test-store-1
99
namespace: flink
1010
spec:
1111
deploymentName: basic-session-deployment
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17-
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
18-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19-
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store`
15+
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
17+
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
18+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
19+
- INSERT INTO `VENICE`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE`.`test-store`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

hoptimator-venice/src/test/resources/venice-ddl-select.id

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
!set outputformat mysql
22
!use k8s
33

4-
select * from "VENICE-CLUSTER0"."test-store-1";
4+
select * from "VENICE"."test-store-1";
55
apiVersion: flink.apache.org/v1beta1
66
kind: FlinkSessionJob
77
metadata:
@@ -12,11 +12,11 @@ spec:
1212
job:
1313
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1414
args:
15-
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
16-
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
15+
- CREATE DATABASE IF NOT EXISTS `VENICE` WITH ()
16+
- CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
1717
- CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH ()
1818
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ()
19-
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
19+
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE`.`test-store-1`
2020
jarURI: file:///opt/hoptimator-flink-runner.jar
2121
parallelism: 1
2222
upgradeMode: stateless

0 commit comments

Comments
 (0)