Skip to content

Commit

Permalink
add snowflake test
Browse files Browse the repository at this point in the history
  • Loading branch information
invisal committed Nov 14, 2024
1 parent 0355c9e commit 0f58148
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 19 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,24 @@ jobs:
MOTHERDUCK_PATH: ${{ secrets.MOTHERDUCK_PATH }}
MOTHERDUCK_TOKEN: ${{ secrets.MOTHERDUCK_TOKEN }}
run: npm run test:connection

test_snowflake:
name: 'Snowflake Connection'
runs-on: ubuntu-latest
needs: build

steps:
- uses: actions/checkout@v4

- name: Install modules
run: npm install

- name: Run tests
env:
CONNECTION_TYPE: snowflake
SNOWFLAKE_ACCOUNT_ID: ${{ secrets.SNOWFLAKE_ACCOUNT_ID }}
SNOWFLAKE_USERNAME: ${{ secrets.SNOWFLAKE_USERNAME }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWFLAKE_WAREHOUSE }}
SNOWFLAKE_DATABASE: ${{ secrets.SNOWFLAKE_DATABASE }}
run: npm run test:connection
2 changes: 1 addition & 1 deletion src/connections/mysql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ interface MySQLConstraintResult {
CONSTRAINT_TYPE: string;
}

interface MySQLConstraintColumnResult {
export interface MySQLConstraintColumnResult {
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
Expand Down
171 changes: 153 additions & 18 deletions src/connections/snowflake/snowflake.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import snowflake from "snowflake-sdk";
import { Query } from "src/query";
import { QueryResult } from "..";
import { createErrorResult, transformObjectBasedResult } from "src/utils/transformer";
import { Database, TableColumn } from "src/models/database";
import snowflake from 'snowflake-sdk';
import { Query } from 'src/query';
import { QueryResult } from '..';
import {
createErrorResult,
transformArrayBasedResult,
} from 'src/utils/transformer';
import { Database, TableColumn } from 'src/models/database';
import { PostgreBaseConnection } from './../postgre/base';
import {
buildMySQLDatabaseSchmea,
MySQLConstraintColumnResult,
} from '../mysql';

export class SnowflakeConnection extends PostgreBaseConnection {
protected db: snowflake.Connection;
Expand All @@ -17,17 +24,125 @@ export class SnowflakeConnection extends PostgreBaseConnection {
await new Promise((resolve, reject) => {
this.db.connectAsync((err, conn) => {
if (err) reject(err.message);
else resolve(conn)
})
})
else resolve(conn);
});
});
}

async disconnect(): Promise<any> {
await new Promise((resolve) => this.db.destroy(resolve))
await new Promise((resolve) => this.db.destroy(resolve));
}

async testConnection(): Promise<{ error?: string }> {
try {
await this.connect();
const { data } = await this.query({
query: 'SELECT CURRENT_DATABASE() AS DBNAME;',
});

await this.disconnect();
if (!data[0].DBNAME) return { error: 'Database does not exist' };

return {};
} catch (e) {
if (e instanceof Error) return { error: e.message };
return { error: 'Unknown error' };
}
}

async fetchDatabaseSchema(): Promise<Database> {
return {};
// Get the list of schema first
const { data: schemaList } = await this.query<{ SCHEMA_NAME: string }>({
query: `SELECT SCHEMA_NAME FROM information_schema.schemata WHERE schema_name NOT IN ('INFORMATION_SCHEMA');`,
});

// Get the list of all tables
const { data: tableList } = await this.query<{
TABLE_NAME: string;
TABLE_SCHEMA: string;
}>({
query: `SELECT TABLE_NAME, TABLE_SCHEMA FROM information_schema.tables WHERE table_schema NOT IN ('INFORMATION_SCHEMA');`,
});

// Get the list of all columns
const { data: columnList } = await this.query<{
TABLE_SCHEMA: string;
TABLE_NAME: string;
COLUMN_NAME: string;
DATA_TYPE: string;
IS_NULLABLE: string;
COLUMN_DEFAULT: string;
ORDINAL_POSITION: number;
}>({
query: `SELECT * FROM information_schema.columns WHERE table_schema NOT IN ('INFORMATION_SCHEMA');`,
});

// Get the list of all constraints
const { data: constraintsList } = await this.query<{
CONSTRAINT_SCHEMA: string;
CONSTRAINT_NAME: string;
TABLE_NAME: string;
TABLE_SCHEMA: string;
CONSTRAINT_TYPE: string;
}>({
query: `SELECT * FROM information_schema.table_constraints WHERE CONSTRAINT_SCHEMA NOT IN ('INFORMATION_SCHEMA') AND CONSTRAINT_TYPE IN ('FOREIGN KEY', 'PRIMARY KEY', 'UNIQUE');`,
});

// Mamic the key usages table using SHOW PRIMARY KEY and SHOW FOREIGN KEYS
const { data: primaryKeyConstraint } = await this.query<{
schema_name: string;
table_name: string;
column_name: string;
constraint_name: string;
}>({ query: `SHOW PRIMARY KEYS;` });

const { data: foreignKeyConstraint } = await this.query<{
pk_schema_name: string;
pk_table_name: string;
pk_column_name: string;
fk_schema_name: string;
fk_table_name: string;
fk_column_name: string;
fk_name: string;
}>({ query: `SHOW IMPORTED KEYS;` });

// Postgres structure is similar to MySQL, so we can reuse the MySQL schema builder
// by just mapping the column names
return buildMySQLDatabaseSchmea({
schemaList,
tableList,
columnList: columnList.map((column) => ({
COLUMN_TYPE: column.DATA_TYPE,
...column,
COLUMN_KEY: '',
EXTRA: '',
})),
constraintsList,
constraintColumnsList: [
...primaryKeyConstraint.map(
(constraint): MySQLConstraintColumnResult => ({
TABLE_SCHEMA: constraint.schema_name,
TABLE_NAME: constraint.table_name,
COLUMN_NAME: constraint.column_name,
CONSTRAINT_NAME: constraint.constraint_name,
REFERENCED_TABLE_SCHEMA: '',
REFERENCED_TABLE_NAME: '',
REFERENCED_COLUMN_NAME: '',
})
),
...foreignKeyConstraint.map(
(constraint): MySQLConstraintColumnResult => ({
TABLE_SCHEMA: constraint.fk_schema_name,
TABLE_NAME: constraint.fk_table_name,
COLUMN_NAME: constraint.fk_column_name,
CONSTRAINT_NAME: constraint.fk_name,
REFERENCED_TABLE_SCHEMA: constraint.pk_schema_name,
REFERENCED_TABLE_NAME: constraint.pk_table_name,
REFERENCED_COLUMN_NAME: constraint.pk_column_name,
})
),
],
});
}

createTable(
Expand All @@ -37,7 +152,11 @@ export class SnowflakeConnection extends PostgreBaseConnection {
): Promise<QueryResult> {
const tempColumns = structuredClone(columns);
for (const column of tempColumns) {
delete column.definition.references;
if (column.definition.references) {
column.definition.references.table = schemaName
? `${schemaName}.${column.definition.references.table}`
: column.definition.references.table;
}
}

return super.createTable(schemaName, tableName, tempColumns);
Expand All @@ -56,23 +175,39 @@ export class SnowflakeConnection extends PostgreBaseConnection {
);
}

async query<T = Record<string, unknown>>(query: Query): Promise<QueryResult<T>> {
async query<T = Record<string, unknown>>(
query: Query
): Promise<QueryResult<T>> {
try {
const [err, rows] = await new Promise<[snowflake.SnowflakeError | undefined, Record<string, unknown>[]]>((resolve) => {
const [err, headers, rows] = await new Promise<
[snowflake.SnowflakeError | undefined, string[], unknown[][]]
>((resolve) => {
this.db.execute({
sqlText: query.query,
binds: query.parameters as snowflake.Binds,
rowMode: 'array',
complete: (err, stmt, rows) => {
if (err) console.log(err.message, stmt.getSqlText());
resolve([err, rows as Record<string, unknown>[]]);
}
resolve([
err,
err
? []
: stmt.getColumns().map((col) => col.getName()),
rows as unknown[][],
]);
},
});
});

if (err) return createErrorResult(err.message) as QueryResult<T>;
return transformObjectBasedResult(rows) as QueryResult<T>
return transformArrayBasedResult(
headers,
(header) => ({
name: header,
}),
rows
) as QueryResult<T>;
} catch (e) {
return createErrorResult('Unknown error') as QueryResult<T>;
}
}
}
}

0 comments on commit 0f58148

Please sign in to comment.