Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection pass through preventing functions from working #112

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ metadata:
name: ads-database
spec:
schema: ADS
url: jdbc:demodb://ads
url: jdbc:demodb://names=ads
dialect: Calcite

---
@@ -15,7 +15,7 @@ metadata:
name: profile-database
spec:
schema: PROFILE
url: jdbc:demodb://profile
url: jdbc:demodb://names=profile
dialect: Calcite

---
2 changes: 1 addition & 1 deletion hoptimator
Original file line number Diff line number Diff line change
@@ -7,5 +7,5 @@ $BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
-Dorg.slf4j.simpleLogger.showLogName=false \
sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
-u jdbc:hoptimator:// -n "" -p "" -nn "Hoptimator" $@
-u jdbc:hoptimator://fun=mysql -n "" -p "" -nn "Hoptimator" $@

Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.avatica.DriverVersion;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.Driver;
@@ -37,15 +38,18 @@ public Connection connect(String url, Properties props) throws SQLException {
if (!url.startsWith(getConnectStringPrefix())) {
return null;
}
String params = url.substring(getConnectStringPrefix().length());
Set<String> schemas = Arrays.asList(params.split(","))
Properties properties = new Properties();
properties.putAll(props);
properties.putAll(ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())));

Set<String> schemas = Arrays.asList(properties.getProperty("names").split(","))
.stream()
.map(x -> x.trim())
.filter(x -> !x.isEmpty())
.map(x -> x.toUpperCase(Locale.ROOT))
.collect(Collectors.toSet());
try {
Connection connection = super.connect(url, props);
Connection connection = super.connect(url, properties);
if (connection == null) {
throw new IOException("Could not connect to " + url);
}
Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.apache.calcite.server.DdlExecutor;
import org.apache.calcite.server.ServerDdlExecutor;
import org.apache.calcite.sql.SqlCall;
@@ -65,6 +64,7 @@
import com.linkedin.hoptimator.Database;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.Pipeline;
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.planner.PipelineRel;

@@ -117,8 +117,9 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
List<String> viewPath = new ArrayList<>();
viewPath.addAll(schemaPath);
viewPath.add(viewName);
ViewTableMacro viewTableMacro = ViewTable.viewMacro(schemaPlus, sql, schemaPath, viewPath, false);
ViewTable viewTable = (ViewTable) viewTableMacro.apply(Collections.emptyList());
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
sql, schemaPath, viewPath, false);
ViewTable viewTable = (ViewTable) viewTableMacro.apply(connectionProperties);
try {
ValidationService.validateOrThrow(viewTable, TranslatableTable.class);
if (create.getReplace()) {
@@ -174,8 +175,9 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con

// Table does not exist. Create it.
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
ViewTableMacro viewTableMacro = ViewTable.viewMacro(schemaPlus, sql, schemaPath, viewPath, false);
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro);
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(schemaPlus),
sql, schemaPath, viewPath, false);
MaterializedViewTable materializedViewTable = new MaterializedViewTable(viewTableMacro, connectionProperties);
RelDataType viewRowType = materializedViewTable.getRowType(typeFactory);

// Support "partial views", i.e. CREATE VIEW FOO$BAR, where the view name
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.hoptimator.jdbc;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
@@ -11,7 +11,8 @@
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;

import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;


public class MaterializedViewTable extends AbstractTable implements TranslatableTable {
@@ -22,8 +23,8 @@ public MaterializedViewTable(ViewTable viewTable) {
this.viewTable = viewTable;
}

public MaterializedViewTable(ViewTableMacro viewTableMacro) {
this((ViewTable) viewTableMacro.apply(Collections.emptyList()));
public MaterializedViewTable(HoptimatorViewTableMacro viewTableMacro, Properties connectionProperties) {
this((ViewTable) viewTableMacro.apply(connectionProperties));
}

public ViewTable viewTable() {
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator.jdbc.schema;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.ViewTableMacro;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This file is copy-pasted from {@link ViewTableMacro} with the only modification being
* how the connection is instantiated.
*/
public class HoptimatorViewTableMacro extends ViewTableMacro {

private final Boolean modifiable;
public HoptimatorViewTableMacro(CalciteSchema schema, String viewSql,
@Nullable List<String> schemaPath, @Nullable List<String> viewPath,
@Nullable Boolean modifiable) {
super(schema, viewSql, schemaPath, viewPath, modifiable);
this.modifiable = modifiable;
}

public TranslatableTable apply(Properties properties) {
CalciteConnection connection;
try {
connection = DriverManager.getConnection("jdbc:calcite:", properties)
.unwrap(CalciteConnection.class);
} catch (SQLException e) {
throw new RuntimeException(e);
}
CalcitePrepare.AnalyzeViewResult parsed =
Schemas.analyzeView(connection, schema, schemaPath, viewSql, viewPath,
modifiable != null && modifiable);
final List<String> schemaPath1 =
schemaPath != null ? schemaPath : schema.path(null);
if ((modifiable == null || modifiable)
&& parsed.modifiable
&& parsed.table != null) {
return modifiableViewTable(parsed, viewSql, schemaPath1, viewPath, schema);
} else {
return viewTable(parsed, viewSql, schemaPath1, viewPath);
}
}
}
Original file line number Diff line number Diff line change
@@ -32,17 +32,21 @@
public abstract class QuidemTestBase {

protected void run(String resourceName) throws IOException, URISyntaxException {
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI());
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI(), "");
}

protected void run(URI resource) throws IOException {
protected void run(String resourceName, String jdbcProperties) throws IOException, URISyntaxException {
run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI(), jdbcProperties);
}

protected void run(URI resource, String jdbcProperties) throws IOException {
File in = new File(resource);
File out = File.createTempFile(in.getName(), ".out");
try (Reader r = new FileReader(in); Writer w = new PrintWriter(out)) {
Quidem.Config config = Quidem.configBuilder()
.withReader(r)
.withWriter(w)
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x))
.withConnectionFactory((x, y) -> DriverManager.getConnection("jdbc:hoptimator://catalogs=" + x + jdbcProperties))
.withCommandHandler(new CustomCommandHandler())
.build();
new Quidem(config).execute();
Original file line number Diff line number Diff line change
@@ -34,6 +34,6 @@ public void register(Wrapper parentSchema, Properties connectionProperties) thro
K8sMetadata metadata = new K8sMetadata(context);
schemaPlus.add("k8s", metadata);
metadata.databaseTable().addDatabases(schemaPlus, connectionProperties);
metadata.viewTable().addViews(schemaPlus);
metadata.viewTable().addViews(schemaPlus, connectionProperties);
}
}
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.StringJoiner;
import javax.sql.DataSource;

import org.apache.calcite.adapter.jdbc.JdbcSchema;
@@ -51,7 +52,7 @@ public K8sDatabaseTable(K8sContext context, K8sEngineTable engines) {
public void addDatabases(SchemaPlus parentSchema, Properties connectionProperties) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row), engines.forDatabase(row.NAME), connectionProperties));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row, connectionProperties), parentSchema, dialect(row), engines.forDatabase(row.NAME), connectionProperties));
}
}

@@ -82,9 +83,28 @@ private static String schemaName(Row row) {
}
}

private static DataSource dataSource(Row row) {
// TODO fetch username/password from Secret
return JdbcSchema.dataSource(row.URL, row.DRIVER, "nouser", "nopass");
private static DataSource dataSource(Row row, Properties connectionProperties) {
String user = "nouser";
String pass = "nopass";
StringJoiner joiner = new StringJoiner(";");
for (String key : connectionProperties.stringPropertyNames()) {
if ("user".equals(key)) {
user = connectionProperties.getProperty(key);
} else if ("password".equals(key)) {
pass = connectionProperties.getProperty(key);
} else {
String value = connectionProperties.getProperty(key);
joiner.add(key + "=" + value);
}
}
String joinedUrl = row.URL;
// Handles case where there are no properties already in the URL
if (row.URL.endsWith("//")) {
joinedUrl = joinedUrl + joiner;
} else {
joinedUrl = joinedUrl + ";" + joiner;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The k1=v1;k2=v2 format seems to be a common convention, but not all drivers support it. We may run into drivers that choke on it. Not sure what to do in that case...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the drivers we have for now do support it, so maybe that's good enough for now? Considering Properties objects are always key/value pairs I'd imagine we'd run into more problems if we just had single values

}
return JdbcSchema.dataSource(joinedUrl, row.DRIVER, user, pass);
}

private static SqlDialect dialect(Row row) {
Original file line number Diff line number Diff line change
@@ -3,19 +3,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.ViewTableMacro;

import io.kubernetes.client.openapi.models.V1ObjectMeta;

import com.linkedin.hoptimator.Validated;
import com.linkedin.hoptimator.Validator;
import com.linkedin.hoptimator.jdbc.MaterializedViewTable;
import com.linkedin.hoptimator.jdbc.schema.HoptimatorViewTableMacro;
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList;
import com.linkedin.hoptimator.k8s.models.V1alpha1ViewSpec;
@@ -75,7 +76,7 @@ public K8sViewTable(K8sContext context) {
super(context, K8sApiEndpoints.VIEWS, Row.class);
}

public void addViews(SchemaPlus parentSchema) {
public void addViews(SchemaPlus parentSchema, Properties connectionProperties) {
for (Row row : rows()) {

// build schema path, filling in any missing schemas
@@ -88,7 +89,7 @@ public void addViews(SchemaPlus parentSchema) {
}
schema = next;
}
schema.add(row.viewName(), makeView(schema, row));
schema.add(row.viewName(), makeView(schema, row, connectionProperties));
}
}

@@ -107,12 +108,13 @@ public void remove(String name) {
rows().remove(find(name));
}

private Table makeView(SchemaPlus parentSchema, Row row) {
ViewTableMacro viewTableMacro = ViewTable.viewMacro(parentSchema, row.SQL, row.schemaPath(), row.viewPath(), false);
private Table makeView(SchemaPlus parentSchema, Row row, Properties connectionProperties) {
HoptimatorViewTableMacro viewTableMacro = new HoptimatorViewTableMacro(CalciteSchema.from(parentSchema), row.SQL,
row.schemaPath(), row.viewPath(), false);
if (row.MATERIALIZED) {
return new MaterializedViewTable(viewTableMacro);
return new MaterializedViewTable(viewTableMacro, connectionProperties);
} else {
return viewTableMacro.apply(Collections.emptyList());
return viewTableMacro.apply(connectionProperties);
}
}

Original file line number Diff line number Diff line change
@@ -13,4 +13,10 @@ public class TestSqlScripts extends QuidemTestBase {
public void k8sDdlScript() throws Exception {
run("k8s-ddl.id");
}

@Test
@Tag("integration")
public void k8sDdlScriptFunction() throws Exception {
run("k8s-ddl-function.id", ";fun=mysql");
}
}
133 changes: 133 additions & 0 deletions hoptimator-k8s/src/test/resources/k8s-ddl-function.id
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
!set outputformat mysql
!use k8s

create or replace view ADS."case" AS SELECT CASE WHEN "FIRST_NAME" = 'Bob' THEN ARRAY['a'] ELSE ARRAY['b'] END || CASE WHEN "FIRST_NAME" = 'Alice' THEN ARRAY['c'] ELSE ARRAY['d'] END AS arr from profile.members;
(0 rows modified)

!update

select * from ADS."case";
+--------+
| ARR |
+--------+
| [b, c] |
| [a, d] |
| [b, d] |
+--------+
(3 rows)

!ok

create or replace view ADS."json" AS SELECT JSON_VALUE('{"a": 1}', '$.a') AS json from profile.members;
(0 rows modified)

!update

select * from ADS."json";
+------+
| JSON |
+------+
| 1 |
| 1 |
| 1 |
+------+
(3 rows)

!ok

create or replace view ADS."regex" AS SELECT REGEXP_REPLACE("FIRST_NAME", '(B)ob', '$1ill') AS name from profile.members;
(0 rows modified)

!update

select * from ads."regex";
+---------+
| NAME |
+---------+
| Alice |
| Bill |
| Charlie |
+---------+
(3 rows)

!ok

create or replace view ADS."concat" AS SELECT CONCAT('_', "FIRST_NAME", '_') AS name from profile.members;
(0 rows modified)

!update

select * from ads."concat";
+-----------+
| NAME |
+-----------+
| _Alice_ |
| _Bob_ |
| _Charlie_ |
+-----------+
(3 rows)

!ok


create or replace view ADS."listagg" AS SELECT LISTAGG("FIRST_NAME") AS agg FROM profile.members;
(0 rows modified)

!update

select * from ads."listagg";
+-------------------+
| AGG |
+-------------------+
| Alice,Bob,Charlie |
+-------------------+
(1 row)

!ok

create or replace view ADS."unnested" AS SELECT * FROM UNNEST(ARRAY(SELECT "FIRST_NAME" FROM profile.members)) AS name;
(0 rows modified)

!update

select * from ADS."unnested";
+---------+
| NAME |
+---------+
| Alice |
| Bob |
| Charlie |
+---------+
(3 rows)

!ok

drop view ads."case";
(0 rows modified)

!update

drop view ads."json";
(0 rows modified)

!update

drop view ads."regex";
(0 rows modified)

!update

drop view ads."concat";
(0 rows modified)

!update

drop view ads."listagg";
(0 rows modified)

!update

drop view ads."unnested";
(0 rows modified)

!update
Original file line number Diff line number Diff line change
@@ -21,10 +21,10 @@
import java.util.Properties;

import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.adapter.jdbc.JdbcConvention;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;

import org.checkerframework.checker.nullness.qual.Nullable;

/**