Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply some Zeppelin fixes for kafka #110

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ build-zeppelin: build
# attaches to terminal (not run as daemon)
run-zeppelin: build-zeppelin
kubectl apply -f deploy/docker/zeppelin/zeppelin-flink-engine.yaml
kubectl apply -f deploy/docker/zeppelin/zeppelin-kafkadb.yaml
docker run --rm -p 8080:8080 \
--volume=${HOME}/.kube/config:/opt/zeppelin/.kube/config \
--add-host=docker-for-desktop:host-gateway \
Expand Down
1 change: 1 addition & 0 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ spec:
nodePort: 31092
brokers:
- broker: 0
# advertisedHost: host.docker.internal # swap these lines to enable Zeppelin, TODO: figure out a way around this
advertisedHost: 127.0.0.1
nodePort: 31234
config:
Expand Down
8 changes: 8 additions & 0 deletions deploy/docker/zeppelin/zeppelin-kafkadb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: kafka-database
spec:
schema: KAFKA
url: jdbc:kafka://bootstrap.servers=host.docker.internal:9092
dialect: Calcite
4 changes: 2 additions & 2 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ metadata:
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["acls", "kafkatopics", "subscriptions", "sqljobs", "pipelines"]
resources: ["acls", "databases", "engines", "jobtemplates", "kafkatopics", "pipelines", "sqljobs", "subscriptions", "tabletemplates", "views"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status", "acls/status", "sqljobs/status", "pipelines/status"]
resources: ["acls/status", "kafkatopics/status", "pipelines/status", "sqljobs/status", "subscriptions/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments", "flinksessionjobs"]
Expand Down
1 change: 1 addition & 0 deletions deploy/samples/kafkadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ spec:
value.format = json
scan.startup.mode = earliest-offset
key.fields = KEY
key.format = raw
value.fields-include = EXCEPT_KEY

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ public Connection connect(String url, Properties props) throws SQLException {
return null;
}
try {
// Load properties from the URL and from getConnection()'s properties.
// URL properties take precedence.
Properties properties = new Properties();
properties.putAll(props); // via getConnection()
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));

if (prepareFactory == null) {
// funky way of extending Driver with a custom Prepare:
return withPrepareFactory(() -> new Prepare(props))
.connect(url, props);
return withPrepareFactory(() -> new Prepare(properties))
.connect(url, properties);
}
Connection connection = super.connect(url, props);
Connection connection = super.connect(url, properties);
if (connection == null) {
throw new IOException("Could not connect to " + url);
}
Expand All @@ -76,12 +82,6 @@ public Connection connect(String url, Properties props) throws SQLException {
calciteConnection.setSchema("DEFAULT");

WrappedSchemaPlus wrappedRootSchema = new WrappedSchemaPlus(rootSchema);

// Load properties from the URL and from getConnection()'s properties.
// URL properties take precedence.
Properties properties = new Properties();
properties.putAll(props); // via getConnection()
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));
String[] catalogs = properties.getProperty("catalogs", "").split(",");

if (catalogs.length == 0 || catalogs[0].length() == 0) {
Expand All @@ -92,7 +92,7 @@ public Connection connect(String url, Properties props) throws SQLException {
} else {
// load specific catalogs when loaded as `jdbc:hoptimator://catalogs=foo,bar`
for (String catalog : catalogs) {
CatalogService.catalog(catalog).register(wrappedRootSchema, props);
CatalogService.catalog(catalog).register(wrappedRootSchema, properties);
}
}

Expand All @@ -104,7 +104,7 @@ public Connection connect(String url, Properties props) throws SQLException {

@Override
public Driver withPrepareFactory(Supplier<CalcitePrepare> prepareFactory) {
return new HoptimatorDriver(prepareFactory);
return new HoptimatorDriver(prepareFactory);
}

public static class Prepare extends CalcitePrepareImpl {
Expand Down
Loading