Skip to content

Commit 70132fb

Browse files
authored
Add Venice support (#78)
* Add Venice support * Fix insert to exclude non-specified fields for Venice * Fix target column inserts * Address feedback, rebase to work with structured Key type, add tests * Address feedback
1 parent 64b7b73 commit 70132fb

File tree

38 files changed

+836
-118
lines changed

38 files changed

+836
-118
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.gradle
22
.idea/
3+
/build
34
*/build/
45
*/*.iml
56
./models/external/

Makefile

+50-18
Original file line numberDiff line numberDiff line change
@@ -12,50 +12,63 @@ build:
1212

1313
bounce: build undeploy deploy deploy-samples deploy-config deploy-demo
1414

15-
# Integration tests expect K8s and Kafka to be running
16-
integration-tests: deploy-dev-environment deploy-samples
17-
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
18-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
19-
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
20-
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
21-
./gradlew intTest || kill `cat port-forward.pid`
22-
kill `cat port-forward.pid`
23-
2415
clean:
2516
./gradlew clean
2617

2718
deploy-config:
2819
kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f -
2920

21+
undeploy-config:
22+
kubectl delete configmap hoptimator-configmap || echo "skipping"
23+
3024
deploy: deploy-config
3125
kubectl apply -f ./hoptimator-k8s/src/main/resources/
3226
kubectl apply -f ./deploy
3327

28+
undeploy: undeploy-config
29+
kubectl delete -f ./deploy || echo "skipping"
30+
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
31+
3432
quickstart: build deploy
3533

3634
deploy-demo: deploy
3735
kubectl apply -f ./deploy/samples/demodb.yaml
3836

37+
undeploy-demo: undeploy
38+
kubectl delete -f ./deploy/samples/demodb.yaml
39+
3940
deploy-samples: deploy
4041
kubectl wait --for=condition=Established=True \
4142
crds/subscriptions.hoptimator.linkedin.com \
4243
crds/kafkatopics.hoptimator.linkedin.com \
4344
crds/sqljobs.hoptimator.linkedin.com
4445
kubectl apply -f ./deploy/samples
4546

46-
deploy-dev-environment: deploy-config
47-
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
48-
kubectl create namespace kafka || echo "skipping"
47+
undeploy-samples: undeploy
48+
kubectl delete -f ./deploy/samples || echo "skipping"
49+
50+
deploy-flink:
4951
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
5052
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
53+
54+
undeploy-flink:
55+
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
56+
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
57+
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
58+
kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping"
59+
helm uninstall flink-kubernetes-operator || echo "skipping"
60+
61+
deploy-kafka: deploy deploy-flink
62+
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
63+
kubectl create namespace kafka || echo "skipping"
5164
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
5265
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
5366
kubectl apply -f ./hoptimator-k8s/src/main/resources/
5467
kubectl apply -f ./deploy/dev
5568
kubectl apply -f ./deploy/samples/demodb.yaml
5669
kubectl apply -f ./deploy/samples/kafkadb.yaml
5770

58-
undeploy-dev-environment:
71+
undeploy-kafka:
5972
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
6073
kubectl delete strimzi -n kafka --all || echo "skipping"
6174
kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping"
@@ -65,12 +78,31 @@ undeploy-dev-environment:
6578
kubectl delete -f ./deploy/dev || echo "skipping"
6679
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
6780
kubectl delete namespace kafka || echo "skipping"
68-
helm uninstall flink-kubernetes-operator || echo "skipping"
6981
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
7082

71-
undeploy: undeploy-dev-environment
72-
kubectl delete -f ./deploy || echo "skipping"
73-
kubectl delete configmap hoptimator-configmap || echo "skipping"
83+
# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
84+
deploy-venice: deploy deploy-flink
85+
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait
86+
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
87+
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
88+
kubectl apply -f ./deploy/samples/venicedb.yaml
89+
90+
undeploy-venice:
91+
kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping"
92+
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down
93+
94+
deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice
95+
96+
undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy
97+
98+
# Integration tests expect K8s, Kafka, and Venice to be running
99+
integration-tests: deploy-dev-environment deploy-samples
100+
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
101+
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
102+
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
103+
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
104+
./gradlew intTest || kill `cat port-forward.pid`
105+
kill `cat port-forward.pid`
74106

75107
generate-models:
76108
./generate-models.sh
@@ -80,4 +112,4 @@ release:
80112
test -n "$(VERSION)" # MISSING ARG: $$VERSION
81113
./gradlew publish
82114

83-
.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
115+
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ The below setup will install a Kafka and Flink cluster within Kubernetes.
5454

5555
```
5656
$ make install # build and install SQL CLI
57-
$ make deploy-dev-environment # start local Kafka & Flink setups
57+
$ make deploy-dev-environment # start all local dev setups
5858
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
5959
$ ./hoptimator # start the SQL CLI
6060
> !intro
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
services:
2+
zookeeper:
3+
image: venicedb/apache-zookeeper:3.9.0
4+
container_name: zookeeper
5+
hostname: zookeeper
6+
healthcheck:
7+
test: ["CMD-SHELL", "echo ruok | nc zookeeper 2181"]
8+
start_period: 10s
9+
interval: 5s
10+
timeout: 5s
11+
retries: 5
12+
13+
kafka:
14+
image: venicedb/apache-kafka:3.3.1
15+
container_name: kafka
16+
hostname: kafka
17+
environment:
18+
- ZOOKEEPER_ADDRESS=zookeeper:2181
19+
depends_on:
20+
zookeeper:
21+
condition: service_healthy
22+
healthcheck:
23+
test: ["CMD-SHELL", "bash -x bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
24+
start_period: 60s
25+
interval: 5s
26+
timeout: 20s
27+
retries: 5
28+
29+
venice-controller:
30+
image: venicedb/venice-controller:0.4.340
31+
container_name: venice-controller
32+
hostname: venice-controller
33+
depends_on:
34+
kafka:
35+
condition: service_healthy
36+
ports:
37+
- 5555:5555
38+
healthcheck:
39+
test: ["CMD-SHELL", "sleep 5"]
40+
start_period: 20s
41+
interval: 5s
42+
timeout: 20s
43+
retries: 5
44+
45+
venice-server:
46+
image: venicedb/venice-server:0.4.340
47+
container_name: venice-server
48+
hostname: venice-server
49+
depends_on:
50+
venice-controller:
51+
condition: service_healthy
52+
healthcheck:
53+
test: ["CMD-SHELL", "sleep 5"]
54+
start_period: 20s
55+
interval: 5s
56+
timeout: 20s
57+
retries: 5
58+
59+
venice-router:
60+
image: venicedb/venice-router:0.4.340
61+
container_name: venice-router
62+
hostname: venice-router
63+
depends_on:
64+
venice-server:
65+
condition: service_healthy
66+
ports:
67+
- 7777:7777
68+
healthcheck:
69+
test: ["CMD-SHELL", "sleep 5"]
70+
start_period: 20s
71+
interval: 5s
72+
timeout: 20s
73+
retries: 5
74+
75+
venice-client:
76+
image: venicedb/venice-client:0.4.340
77+
container_name: venice-client
78+
hostname: venice-client
79+
tty: true
80+
volumes:
81+
- ./venice:/opt/venice/schemas
82+
depends_on:
83+
venice-router:
84+
condition: service_healthy

deploy/docker/venice/keySchema.avsc

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"type": "record",
3+
"name": "SampleTableKey",
4+
"doc": "SampleTableKey",
5+
"fields": [
6+
{
7+
"name": "id",
8+
"type": "int"
9+
}
10+
]
11+
}

deploy/docker/venice/valueSchema.avsc

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"type": "record",
3+
"name": "SampleTableValue",
4+
"doc": "SampleTableValue",
5+
"fields": [
6+
{
7+
"name": "intField",
8+
"type": [
9+
"null",
10+
"int"
11+
],
12+
"default": null
13+
},
14+
{
15+
"name": "stringField",
16+
"type": [
17+
"null",
18+
"string"
19+
],
20+
"default": null
21+
}
22+
]
23+
}

deploy/samples/venicedb.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
apiVersion: hoptimator.linkedin.com/v1alpha1
2+
kind: Database
3+
metadata:
4+
name: venice-cluster0
5+
spec:
6+
schema: VENICE-CLUSTER0
7+
url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777
8+
dialect: Calcite
9+
10+
---
11+
12+
apiVersion: hoptimator.linkedin.com/v1alpha1
13+
kind: TableTemplate
14+
metadata:
15+
name: venice-template-cluster0
16+
spec:
17+
databases:
18+
- venice-cluster0
19+
connector: |
20+
connector = venice
21+
storeName = {{table}}
22+
partial-update-mode = true
23+
key.fields-prefix = KEY_
24+
key.fields = {{keys}}
25+
value.fields-include: EXCEPT_KEY

gradle/libs.versions.toml

+2
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,5 @@ slf4j-api = "org.slf4j:slf4j-api:1.7.30"
2929
sqlline = "sqlline:sqlline:1.12.0"
3030
commons-cli = "commons-cli:commons-cli:1.4"
3131
quidem = "net.hydromatic:quidem:0.11"
32+
venice = "com.linkedin.venice:venice-common:0.4.376"
33+
venice-client = "com.linkedin.venice:venice-thin-client:0.4.376"

hoptimator-avro/build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ publishing {
4242
license {
4343
name = 'BSD 2-Clause'
4444
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
45-
}
4645
}
46+
}
4747
scm {
4848
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
4949
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
@@ -52,4 +52,4 @@ publishing {
5252
}
5353
}
5454
}
55-
}
55+
}

hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java

+7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.util.regex.Pattern;
2121
import java.util.stream.Collectors;
2222

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2326

2427
/**
2528
* Represents something required by a Table.
@@ -37,6 +40,7 @@
3740
* for informational/debugging purposes.
3841
*/
3942
public abstract class Resource {
43+
private static final Logger log = LoggerFactory.getLogger(Resource.class);
4044
private final String template;
4145
private final SortedMap<String, Supplier<String>> properties = new TreeMap<>();
4246
private final List<Resource> inputs = new ArrayList<>();
@@ -345,6 +349,9 @@ private static String applyTransform(String value, String transform) {
345349
case "concat":
346350
res = res.replace("\n", "");
347351
break;
352+
default:
353+
log.warn("Transformation function '{}' not found", f);
354+
break;
348355
}
349356
}
350357
return res;

hoptimator-cli/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies {
1212
implementation project(':hoptimator-demodb')
1313
implementation project(':hoptimator-jdbc')
1414
implementation project(':hoptimator-kafka')
15+
implementation project(':hoptimator-venice')
1516
implementation project(':hoptimator-k8s')
1617
implementation project(':hoptimator-util')
1718
implementation libs.calcite.core

hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import java.util.Scanner;
99

1010
import org.apache.calcite.jdbc.CalciteConnection;
11-
import org.apache.calcite.rel.RelNode;
11+
import org.apache.calcite.rel.RelRoot;
1212
import org.jline.reader.Completer;
1313

1414
import com.linkedin.hoptimator.SqlDialect;
@@ -87,8 +87,8 @@ public void execute(String line, DispatchCallback dispatchCallback) {
8787
String sql = split[1];
8888
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
8989
try {
90-
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
91-
PipelineRel.Implementor plan = DeploymentService.plan(rel);
90+
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
91+
PipelineRel.Implementor plan = DeploymentService.plan(root);
9292
sqlline.output(plan.sql().apply(SqlDialect.ANSI));
9393
} catch (SQLException e) {
9494
sqlline.error(e);
@@ -155,9 +155,9 @@ public void execute(String line, DispatchCallback dispatchCallback) {
155155
}
156156
String sql = split[1];
157157
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
158-
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
158+
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
159159
try {
160-
List<String> specs = DeploymentService.plan(rel).pipeline().specify();
160+
List<String> specs = DeploymentService.plan(root).pipeline().specify();
161161
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
162162
} catch (SQLException e) {
163163
sqlline.error(e);

hoptimator-cli/src/main/resources/intro.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Try:
44
> !tables
55
> !schemas
66
> create view foo as select * from ads.ad_clicks natural join profile.members;
7-
> !yaml select * from foo
7+
> !specify select * from foo
88
> !pipeline select * from foo
99

1010

0 commit comments

Comments
 (0)