Skip to content

Commit 1ea0d68

Browse files
Redshift source and connector plugin added.
Redshift source and connector plugin added.
1 parent e5b4e3c commit 1ea0d68

20 files changed

+2056
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Amazon Redshift Batch Source
2+
3+
Description
4+
-----------
5+
Reads from an Amazon Redshift database using a configurable SQL query.
6+
Outputs one record for each row returned by the query.
7+
8+
9+
Use Case
10+
--------
11+
The source is used whenever you need to read from an Amazon Redshift database. For example, you may want
12+
to create daily snapshots of a database table by using this source and writing to
13+
a TimePartitionedFileSet.
14+
15+
16+
Properties
17+
----------
18+
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
19+
20+
**JDBC Driver name:** Name of the JDBC driver to use.
21+
22+
**Host:** Host URL of the current master instance of Redshift cluster.
23+
24+
**Port:** Port that Redshift master instance is listening to.
25+
26+
**Database:** Redshift database name.
27+
28+
**Import Query:** The SELECT query to use to import data from the specified table.
29+
You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should
30+
contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'.
31+
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
32+
The '$CONDITIONS' string is not required if numSplits is set to one.
33+
34+
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
35+
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
36+
37+
**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one.
38+
39+
**Number of Splits to Generate:** Number of splits to generate.
40+
41+
**Username:** User identity for connecting to the specified database.
42+
43+
**Password:** Password to use to connect to the specified database.
44+
45+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
46+
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
47+
48+
**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes
49+
back from the query. However, it must match the schema that comes back from the query,
50+
except it can mark fields as nullable and can contain a subset of the fields.
51+
52+
**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import,
53+
with the tradeoff of higher memory usage.
54+
55+
Example
56+
------
57+
Suppose you want to read data from an Amazon Redshift database named "prod" that is running on
58+
"redshift.xyz.eu-central-1.redshift.amazonaws.com", port 5439, as "sa" user with "Test11" password.
59+
Ensure that the driver for Redshift is installed (you can also provide driver name for some specific driver,
60+
otherwise "redshift" will be used), then configure the plugin with:then configure plugin with:
61+
62+
```
63+
Reference Name: "src1"
64+
Driver Name: "redshift"
65+
Host: "redshift.xyz.eu-central-1.redshift.amazonaws.com"
66+
Port: 5439
67+
Database: "prod"
68+
Import Query: "select id, name, email, phone from users;"
69+
Number of Splits to Generate: 1
70+
Username: "sa"
71+
Password: "Test11"
72+
```
73+
74+
Data Types Mapping
75+
------------------
76+
77+
Mapping of Redshift types to CDAP schema:
78+
79+
| Redshift Data Type | CDAP Schema Data Type | Comment |
80+
|-----------------------------------------------------|-----------------------|----------------------------------|
81+
| bigint | long | |
82+
| boolean | boolean | |
83+
| character | string | |
84+
| character varying | string | |
85+
| double precision | double | |
86+
| integer | int | |
87+
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
88+
| numeric(with 0 precision) | string | |
89+
| real | float | |
90+
| smallint | int | |
91+
| smallserial | int | |
92+
| text | string | |
93+
| date | date | |
94+
| time [ (p) ] [ without time zone ] | time | |
95+
| time [ (p) ] with time zone | string | |
96+
| timestamp [ (p) ] [ without time zone ] | timestamp | |
97+
| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database |
98+
| xml | string | |
99+
| json | string | |
100+
| super | string | |
101+
| geometry | bytes | |
102+
| hllsketch | string | |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Amazon Redshift Connection
2+
3+
Description
4+
-----------
5+
Use this connection to access data in an Amazon Redshift database using JDBC.
6+
7+
Properties
8+
----------
9+
**Name:** Name of the connection. Connection names must be unique in a namespace.
10+
11+
**Description:** Description of the connection.
12+
13+
**JDBC Driver name:** Name of the JDBC driver to use.
14+
15+
**Host:** Host of the current master instance of Redshift cluster.
16+
17+
**Port:** Port that Redshift master instance is listening to.
18+
19+
**Database:** Redshift database name.
20+
21+
**Username:** User identity for connecting to the specified database.
22+
23+
**Password:** Password to use to connect to the specified database.
24+
25+
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
26+
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
Loading

amazon-redshift-plugin/pom.xml

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright © 2023 CDAP
4+
5+
Licensed under the Apache License, Version 2.0 (the "License"); you may not
6+
use this file except in compliance with the License. You may obtain a copy of
7+
the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
License for the specific language governing permissions and limitations under
15+
the License.
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<parent>
21+
<artifactId>database-plugins-parent</artifactId>
22+
<groupId>io.cdap.plugin</groupId>
23+
<version>1.12.0-SNAPSHOT</version>
24+
</parent>
25+
26+
<name>Amazon Redshift plugin</name>
27+
<artifactId>amazon-redshift-plugin</artifactId>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<properties>
31+
<redshift-jdbc.version>2.1.0.18</redshift-jdbc.version>
32+
</properties>
33+
34+
<repositories>
35+
<repository>
36+
<id>redshift</id>
37+
<url>http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release</url>
38+
</repository>
39+
</repositories>
40+
41+
<dependencies>
42+
<dependency>
43+
<groupId>io.cdap.cdap</groupId>
44+
<artifactId>cdap-etl-api</artifactId>
45+
</dependency>
46+
<dependency>
47+
<groupId>io.cdap.plugin</groupId>
48+
<artifactId>database-commons</artifactId>
49+
<version>${project.version}</version>
50+
</dependency>
51+
<dependency>
52+
<groupId>io.cdap.plugin</groupId>
53+
<artifactId>hydrator-common</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.google.guava</groupId>
57+
<artifactId>guava</artifactId>
58+
</dependency>
59+
60+
<!-- test dependencies -->
61+
<dependency>
62+
<groupId>com.amazon.redshift</groupId>
63+
<artifactId>redshift-jdbc42</artifactId>
64+
<version>${redshift-jdbc.version}</version>
65+
<scope>test</scope>
66+
</dependency>
67+
<dependency>
68+
<groupId>io.cdap.plugin</groupId>
69+
<artifactId>database-commons</artifactId>
70+
<version>${project.version}</version>
71+
<type>test-jar</type>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>io.cdap.cdap</groupId>
76+
<artifactId>hydrator-test</artifactId>
77+
</dependency>
78+
<dependency>
79+
<groupId>io.cdap.cdap</groupId>
80+
<artifactId>cdap-data-pipeline3_2.12</artifactId>
81+
</dependency>
82+
<dependency>
83+
<groupId>junit</groupId>
84+
<artifactId>junit</artifactId>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.mockito</groupId>
88+
<artifactId>mockito-core</artifactId>
89+
<scope>test</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>io.cdap.cdap</groupId>
93+
<artifactId>cdap-api</artifactId>
94+
<scope>provided</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.jetbrains</groupId>
98+
<artifactId>annotations</artifactId>
99+
<version>RELEASE</version>
100+
<scope>compile</scope>
101+
</dependency>
102+
</dependencies>
103+
<build>
104+
<plugins>
105+
<plugin>
106+
<groupId>io.cdap</groupId>
107+
<artifactId>cdap-maven-plugin</artifactId>
108+
</plugin>
109+
<plugin>
110+
<groupId>org.apache.felix</groupId>
111+
<artifactId>maven-bundle-plugin</artifactId>
112+
<version>5.1.2</version>
113+
<extensions>true</extensions>
114+
<configuration>
115+
<instructions>
116+
<_exportcontents>
117+
io.cdap.plugin.amazon.redshift.*;
118+
io.cdap.plugin.db.source.*;
119+
org.apache.commons.lang;
120+
org.apache.commons.logging.*;
121+
org.codehaus.jackson.*
122+
</_exportcontents>
123+
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency>
124+
<Embed-Transitive>true</Embed-Transitive>
125+
<Embed-Directory>lib</Embed-Directory>
126+
</instructions>
127+
</configuration>
128+
<executions>
129+
<execution>
130+
<phase>package</phase>
131+
<goals>
132+
<goal>bundle</goal>
133+
</goals>
134+
</execution>
135+
</executions>
136+
</plugin>
137+
</plugins>
138+
</build>
139+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.amazon.redshift;
18+
19+
import io.cdap.cdap.api.annotation.Category;
20+
import io.cdap.cdap.api.annotation.Description;
21+
import io.cdap.cdap.api.annotation.Name;
22+
import io.cdap.cdap.api.annotation.Plugin;
23+
import io.cdap.cdap.api.data.format.StructuredRecord;
24+
import io.cdap.cdap.etl.api.batch.BatchSource;
25+
import io.cdap.cdap.etl.api.connector.Connector;
26+
import io.cdap.cdap.etl.api.connector.ConnectorSpec;
27+
import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest;
28+
import io.cdap.cdap.etl.api.connector.PluginSpec;
29+
import io.cdap.plugin.common.Constants;
30+
import io.cdap.plugin.common.ReferenceNames;
31+
import io.cdap.plugin.common.db.DBConnectorPath;
32+
import io.cdap.plugin.common.db.DBPath;
33+
import io.cdap.plugin.db.SchemaReader;
34+
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
35+
import org.apache.hadoop.io.LongWritable;
36+
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
37+
38+
import java.io.IOException;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
/**
43+
* Amazon Redshift Database Connector that connects to Amazon Redshift database via JDBC.
44+
*/
45+
@Plugin(type = Connector.PLUGIN_TYPE)
46+
@Name(RedshiftConnector.NAME)
47+
@Description("Connection to access data in Amazon Redshift using JDBC.")
48+
@Category("Database")
49+
public class RedshiftConnector extends AbstractDBSpecificConnector<io.cdap.plugin.amazon.redshift.RedshiftDBRecord> {
50+
public static final String NAME = RedshiftConstants.PLUGIN_NAME;
51+
private final RedshiftConnectorConfig config;
52+
53+
public RedshiftConnector(RedshiftConnectorConfig config) {
54+
super(config);
55+
this.config = config;
56+
}
57+
58+
@Override
59+
protected DBConnectorPath getDBConnectorPath(String path) throws IOException {
60+
return new DBPath(path, true);
61+
}
62+
63+
@Override
64+
public boolean supportSchema() {
65+
return true;
66+
}
67+
68+
@Override
69+
protected Class<? extends DBWritable> getDBRecordType() {
70+
return RedshiftDBRecord.class;
71+
}
72+
73+
@Override
74+
public StructuredRecord transform(LongWritable longWritable, RedshiftDBRecord redshiftDBRecord) {
75+
return redshiftDBRecord.getRecord();
76+
}
77+
78+
@Override
79+
protected SchemaReader getSchemaReader(String sessionID) {
80+
return new RedshiftSchemaReader(sessionID);
81+
}
82+
83+
@Override
84+
protected String getTableName(String database, String schema, String table) {
85+
return String.format("\"%s\".\"%s\"", schema, table);
86+
}
87+
88+
@Override
89+
protected String getRandomQuery(String tableName, int limit) {
90+
return String.format("SELECT * FROM %s\n" +
91+
"TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))",
92+
tableName, limit, tableName);
93+
}
94+
95+
@Override
96+
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path,
97+
ConnectorSpec.Builder builder) {
98+
Map<String, String> sourceProperties = new HashMap<>();
99+
setConnectionProperties(sourceProperties, request);
100+
builder
101+
.addRelatedPlugin(new PluginSpec(RedshiftConstants.PLUGIN_NAME,
102+
BatchSource.PLUGIN_TYPE, sourceProperties));
103+
104+
String schema = path.getSchema();
105+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.NUM_SPLITS, "1");
106+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.FETCH_SIZE,
107+
RedshiftSource.RedshiftSourceConfig.DEFAULT_FETCH_SIZE);
108+
String table = path.getTable();
109+
if (table == null) {
110+
return;
111+
}
112+
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY,
113+
getTableQuery(path.getDatabase(), schema, table));
114+
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
115+
}
116+
117+
}

0 commit comments

Comments
 (0)