Skip to content

Commit

Permalink
Use RemoteTableName in JdbcOutputTableHandle
Browse files Browse the repository at this point in the history
Use RemoteTableName, which wraps `catalogName` and `schemaName` in `Optional` to handle their potential absence more explicitly
  • Loading branch information
chenjian2664 authored and ebyhr committed Aug 28, 2024
1 parent 784a131 commit 3b253d1
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,7 @@ protected JdbcOutputTableHandle createTable(
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Expand Down Expand Up @@ -1015,9 +1013,7 @@ protected JdbcOutputTableHandle beginInsertTable(

if (isNonTransactionalInsert(session)) {
return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Expand All @@ -1039,9 +1035,7 @@ protected JdbcOutputTableHandle beginInsertTable(
}

return new JdbcOutputTableHandle(
catalog,
remoteSchema,
remoteTable,
new RemoteTableName(Optional.ofNullable(catalog), Optional.ofNullable(remoteSchema), remoteTable),
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
Expand Down Expand Up @@ -1075,10 +1069,10 @@ public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle ha
else {
renameTable(
session,
handle.getCatalogName(),
handle.getSchemaName(),
handle.getRemoteTableName().getCatalogName().orElse(null),
handle.getRemoteTableName().getSchemaName().orElse(null),
handle.getTemporaryTableName().orElseThrow(() -> new IllegalStateException("Temporary table name missing")),
new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
handle.getRemoteTableName().getSchemaTableName());
}
}

Expand Down Expand Up @@ -1123,8 +1117,8 @@ private RemoteTableName constructPageSinkIdsTable(ConnectorSession session, Conn
verify(handle.getPageSinkIdColumnName().isPresent(), "Output table handle's pageSinkIdColumn is empty");

RemoteTableName pageSinkTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
generateTemporaryTableName(session));

int maxBatchSize = getWriteBatchSize(session);
Expand Down Expand Up @@ -1173,13 +1167,9 @@ public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle ha
}

RemoteTableName temporaryTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
handle.getTemporaryTableName().orElseThrow());
RemoteTableName targetTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getTableName());

// We conditionally create more than the one table, so keep a list of the tables that need to be dropped.
Closer closer = Closer.create();
Expand All @@ -1192,7 +1182,7 @@ public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle ha
.collect(joining(", "));

String insertSql = format("INSERT INTO %s (%s) SELECT %s FROM %s temp_table",
postProcessInsertTableNameClause(session, quoted(targetTable)),
postProcessInsertTableNameClause(session, quoted(handle.getRemoteTableName())),
columns,
columns,
quoted(temporaryTable));
Expand Down Expand Up @@ -1358,8 +1348,8 @@ public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle
if (handle.getTemporaryTableName().isPresent()) {
dropTable(session,
new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getRemoteTableName().getCatalogName(),
handle.getRemoteTableName().getSchemaName(),
handle.getTemporaryTableName().get()),
true);
}
Expand All @@ -1383,7 +1373,10 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
checkArgument(handle.getColumnNames().size() == columnWriters.size(), "handle and columnWriters mismatch: %s, %s", handle, columnWriters);
return format(
"INSERT INTO %s (%s%s) VALUES (%s%s)",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName().orElseGet(handle::getTableName)),
quoted(
handle.getRemoteTableName().getCatalogName().orElse(null),
handle.getRemoteTableName().getSchemaName().orElse(null),
handle.getTemporaryTableName().orElseGet(() -> handle.getRemoteTableName().getTableName())),
handle.getColumnNames().stream()
.map(this::quoted)
.collect(joining(", ")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public JdbcProcedureHandle getProcedureHandle(ConnectorSession session, Procedur
public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
delegate.commitCreateTable(session, handle, pageSinkIds);
invalidateTableCaches(new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
invalidateTableCaches(handle.getRemoteTableName().getSchemaTableName());
}

@Override
Expand All @@ -401,7 +401,7 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set<Long> pageSinkIds)
{
delegate.finishInsertTable(session, handle, pageSinkIds);
onDataChanged(new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
onDataChanged(handle.getRemoteTableName().getSchemaTableName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class JdbcOutputTableHandle
implements ConnectorOutputTableHandle, ConnectorInsertTableHandle
{
private final String catalogName;
private final String schemaName;
private final String tableName;
private final RemoteTableName remoteTableName;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final Optional<List<JdbcTypeHandle>> jdbcColumnTypes;
Expand All @@ -43,18 +39,14 @@ public class JdbcOutputTableHandle

@JsonCreator
public JdbcOutputTableHandle(
@JsonProperty("catalogName") @Nullable String catalogName,
@JsonProperty("schemaName") @Nullable String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("temporaryTableName") Optional<String> temporaryTableName,
@JsonProperty("pageSinkIdColumnName") Optional<String> pageSinkIdColumnName)
{
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = requireNonNull(tableName, "tableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.temporaryTableName = requireNonNull(temporaryTableName, "temporaryTableName is null");

requireNonNull(columnNames, "columnNames is null");
Expand All @@ -68,23 +60,9 @@ public JdbcOutputTableHandle(
}

@JsonProperty
@Nullable
public String getCatalogName()
public RemoteTableName getRemoteTableName()
{
return catalogName;
}

@JsonProperty
@Nullable
public String getSchemaName()
{
return schemaName;
}

@JsonProperty
public String getTableName()
{
return tableName;
return remoteTableName;
}

@JsonProperty
Expand Down Expand Up @@ -120,16 +98,14 @@ public Optional<String> getPageSinkIdColumnName()
@Override
public String toString()
{
return format("jdbc:%s.%s.%s", catalogName, schemaName, tableName);
return "jdbc:%s".formatted(remoteTableName);
}

@Override
public int hashCode()
{
return Objects.hash(
catalogName,
schemaName,
tableName,
remoteTableName,
columnNames,
columnTypes,
jdbcColumnTypes,
Expand All @@ -147,9 +123,7 @@ public boolean equals(Object obj)
return false;
}
JdbcOutputTableHandle other = (JdbcOutputTableHandle) obj;
return Objects.equals(this.catalogName, other.catalogName) &&
Objects.equals(this.schemaName, other.schemaName) &&
Objects.equals(this.tableName, other.tableName) &&
return Objects.equals(this.remoteTableName, other.remoteTableName) &&
Objects.equals(this.columnNames, other.columnNames) &&
Objects.equals(this.columnTypes, other.columnTypes) &&
Objects.equals(this.jdbcColumnTypes, other.jdbcColumnTypes) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import io.trino.spi.connector.SchemaTableName;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,6 +59,11 @@ public String getTableName()
return tableName;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName.orElseThrow(), tableName);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ public class TestJdbcOutputTableHandle
public void testJsonRoundTrip()
{
JdbcOutputTableHandle handleForCreate = new JdbcOutputTableHandle(
"catalog",
"schema",
"table",
new RemoteTableName(Optional.of("catalog"), Optional.of("schema"), "table"),
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.empty(),
Expand All @@ -41,9 +39,7 @@ public void testJsonRoundTrip()
assertJsonRoundTrip(OUTPUT_TABLE_CODEC, handleForCreate);

JdbcOutputTableHandle handleForInsert = new JdbcOutputTableHandle(
"catalog",
"schema",
"table",
new RemoteTableName(Optional.of("catalog"), Optional.of("schema"), "table"),
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.of(ImmutableList.of(JDBC_VARCHAR, JDBC_VARCHAR)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.plugin.jdbc.LongWriteFunction;
import io.trino.plugin.jdbc.PreparedQuery;
import io.trino.plugin.jdbc.QueryBuilder;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.plugin.jdbc.aggregation.ImplementAvgFloatingPoint;
Expand Down Expand Up @@ -415,8 +416,7 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
execute(session, connection, sql);

return new IgniteOutputTableHandle(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
new RemoteTableName(Optional.empty(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName()),
columnNames,
columnTypes.build(),
Optional.empty(),
Expand Down Expand Up @@ -568,7 +568,7 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
}
return format(
"INSERT INTO %s (%s) VALUES (%s)",
quoted(null, handle.getSchemaName(), handle.getTableName()),
quoted(handle.getRemoteTableName()),
columns,
params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto

RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName();
return new IgniteOutputTableHandle(
remoteTableName.getSchemaName().orElse(null),
remoteTableName.getTableName(),
remoteTableName,
columnNames.build(),
columnTypes.build(),
Optional.of(columnJdbcTypeHandles.build()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.RemoteTableName;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Optional;
Expand All @@ -32,14 +32,13 @@ public class IgniteOutputTableHandle

@JsonCreator
public IgniteOutputTableHandle(
@Nullable @JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("dummyIdColumn") Optional<String> dummyIdColumn)
{
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty(), Optional.empty());
super(remoteTableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty(), Optional.empty());
this.dummyIdColumn = requireNonNull(dummyIdColumn, "dummyIdColumn is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,13 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
if (outputHandle.rowkeyColumn().isPresent()) {
String nextId = format(
"NEXT VALUE FOR %s, ",
quoted(null, handle.getSchemaName(), handle.getTableName() + "_sequence"));
quoted(null, handle.getRemoteTableName().getSchemaName().orElse(null), handle.getRemoteTableName().getTableName() + "_sequence"));
params = nextId + params;
columns = outputHandle.rowkeyColumn().get() + ", " + columns;
}
return format(
"UPSERT INTO %s (%s) VALUES (%s)",
quoted(null, handle.getSchemaName(), handle.getTableName()),
quoted(handle.getRemoteTableName()),
columns,
params);
}
Expand Down Expand Up @@ -696,8 +696,7 @@ public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
execute(session, sql);

return new PhoenixOutputTableHandle(
schema,
table,
new RemoteTableName(Optional.empty(), Optional.ofNullable(schema), table),
columnNames.build(),
columnTypes.build(),
Optional.empty(),
Expand Down
Loading

0 comments on commit 3b253d1

Please sign in to comment.