diff --git a/docker-compose/db-plugins-env/docker-compose.yml b/docker-compose/db-plugins-env/docker-compose.yml
index 2b5206e9f..2df4c7ad4 100644
--- a/docker-compose/db-plugins-env/docker-compose.yml
+++ b/docker-compose/db-plugins-env/docker-compose.yml
@@ -80,7 +80,6 @@ services:
sysctls:
- kernel.shmmax=1073741824
- net.ipv4.ip_local_port_range=60000 65535
- - kernel.shmmni=524288
- kernel.shmall=8388608
extra_hosts:
# Alter this if running on non-Linux machine
diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml
index 01bb2c815..de0b1a2f1 100644
--- a/saphana-plugin/pom.xml
+++ b/saphana-plugin/pom.xml
@@ -51,6 +51,12 @@
test-jar
test
+
+ com.sap.cloud.db.jdbc
+ ngdbc
+ 2.3.48
+ test
+
io.cdap.cdap
hydrator-test
diff --git a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
index 4d14c6bfd..a2620d490 100644
--- a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
+++ b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java
@@ -31,8 +31,11 @@
@Description("Runs a SAP HANA query after a pipeline run.")
public class SapHanaPostAction extends AbstractQueryAction {
+ private final SapHanaQueryActionConfig sapHanaQueryActionConfig;
+
public SapHanaPostAction(SapHanaQueryActionConfig sapHanaQueryActionConfig) {
super(sapHanaQueryActionConfig, false);
+ this.sapHanaQueryActionConfig = sapHanaQueryActionConfig;
}
/**
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java
new file mode 100644
index 000000000..9e63bc272
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.action.Action;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class SapHanaActionTestRun extends SapHanaPluginTestBase {
+
+ protected static final String ACTION_TEST_TABLE_NAME = "DB_ACTION_TEST";
+
+
+ @Before
+ public void createTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE " + ACTION_TEST_TABLE_NAME + "(ID INT);");
+ statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(1)");
+ statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(2)");
+ }
+
+
+ @Test
+ public void testDBAction() throws Exception {
+ ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput"));
+ ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput"));
+ ETLStage action = new ETLStage("action", new ETLPlugin(
+ SapHanaConstants.PLUGIN_NAME,
+ Action.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(QueryConfig.QUERY, "DELETE from " + ACTION_TEST_TABLE_NAME + " where ID=1")
+ .build(),
+ null));
+ ETLBatchConfig config = ETLBatchConfig.builder()
+ .addStage(source)
+ .addStage(sink)
+ .addStage(action)
+ .addConnection(sink.getName(), action.getName())
+ .addConnection(source.getName(), sink.getName())
+ .build();
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+ ApplicationId appId = NamespaceId.DEFAULT.app("actionTest");
+ ApplicationManager appManager = deployApplication(appId, appRequest);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+ Connection connection = createConnection();
+ Statement statement = connection.createStatement();
+
+ ResultSet results = statement.executeQuery("select * from " + ACTION_TEST_TABLE_NAME);
+ results.next();
+ int id = results.getInt("ID");
+ Assert.assertEquals(id, 2);
+ Assert.assertNotEquals(id, 1);
+ Assert.assertFalse(results.next());
+
+ }
+
+ @After
+ public void dropTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("DROP TABLE " + ACTION_TEST_TABLE_NAME + ";");
+ }
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java
new file mode 100644
index 000000000..42d894976
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.sap.db.jdbc.Driver;
+import io.cdap.cdap.api.artifact.ArtifactSummary;
+import io.cdap.cdap.api.plugin.PluginClass;
+import io.cdap.cdap.datapipeline.DataPipelineApp;
+import io.cdap.cdap.proto.id.ArtifactId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.batch.DatabasePluginTestBase;
+import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat;
+import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Collections;
+import java.util.Map;
+
+
+public abstract class SapHanaPluginTestBase extends DatabasePluginTestBase {
+
+
+ protected static final String JDBC_DRIVER_NAME = "sap";
+ protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0");
+ protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0");
+ protected static final Map BASE_PROPS = ImmutableMap.builder()
+ .put(ConnectionConfig.HOST, System.getProperty("saphana.host", "localhost"))
+ .put(ConnectionConfig.PORT, System.getProperty("saphana.port", "39017"))
+ .put(ConnectionConfig.DATABASE, System.getProperty("sapahana.database", "SYSTEMDB"))
+ .put(ConnectionConfig.USER, System.getProperty("sapahana.user", "SYSTEM"))
+ .put(ConnectionConfig.PASSWORD, System.getProperty("sapahana.password", "SAPhxe123"))
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .build();
+ private static String connectionUrl;
+ private static Boolean setupCompleted = false;
+
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ if (setupCompleted) {
+ return;
+ }
+ System.out.println("Setting up batch artifacts");
+ setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
+ System.out.println("Adding plugin artifact");
+ addPluginArtifact(NamespaceId.DEFAULT.artifact(SapHanaConstants.PLUGIN_NAME, "1.0.0"),
+ DATAPIPELINE_ARTIFACT_ID, SapHanaSource.class, DBRecord.class, ETLDBOutputFormat.class,
+ DataDrivenETLDBInputFormat.class, SapHanaAction.class, SapHanaPostAction.class);
+
+ PluginClass sapHanaDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME,
+ "SapHana driver class", Driver.class.getName(), null, Collections.emptyMap());
+ addPluginArtifact(NamespaceId.DEFAULT.artifact("saphana-jdbc-connector", "1.0.0"), DATAPIPELINE_ARTIFACT_ID,
+ Sets.newHashSet(sapHanaDriver), Driver.class);
+
+ connectionUrl = "jdbc:sap://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" +
+ BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE);
+ setupCompleted = true;
+ }
+
+
+ protected static Connection createConnection() throws Exception {
+ Class.forName(Driver.class.getCanonicalName());
+ return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER),
+ BASE_PROPS.get(ConnectionConfig.PASSWORD));
+ }
+
+
+}
+
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java
new file mode 100644
index 000000000..714212e6f
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import io.cdap.cdap.common.test.TestSuite;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+
+/**
+ * This is a test suite that runs all the tests for Database plugins.
+ */
+@RunWith(TestSuite.class)
+@Suite.SuiteClasses({
+ SapHanaActionTestRun.class,
+ SapHanaSourceTestRun.class,
+ SapHanaSinkTestRun.class,
+ SapHanaPostActionTestRun.class
+})
+public class SapHanaPluginTestSuite extends SapHanaPluginTestBase {
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java
new file mode 100644
index 000000000..1e9de305c
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.batch.PostAction;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.common.batch.action.Condition;
+import io.cdap.plugin.db.batch.action.QueryActionConfig;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class SapHanaPostActionTestRun extends SapHanaPluginTestBase {
+
+ protected static final String POST_ACTION_TEST_TABLE_NAME = "POST_ACTION_TEST";
+
+ @Before
+ public void createTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE " + POST_ACTION_TEST_TABLE_NAME + "(ID INT);");
+ statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(1)");
+ statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(2)");
+ }
+
+
+ @Test
+ public void testDBPostAction() throws Exception {
+ ETLStage source = new ETLStage("source", MockSource.getPlugin("postActionInput"));
+ ETLStage sink = new ETLStage("sink", MockSink.getPlugin("postActionOutput"));
+ ETLStage action = new ETLStage("postAction", new ETLPlugin(
+ SapHanaConstants.PLUGIN_NAME,
+ PostAction.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(QueryConfig.QUERY, "DELETE from " + POST_ACTION_TEST_TABLE_NAME + " where ID=1")
+ .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name())
+ .build(),
+ null));
+
+ ETLBatchConfig config = ETLBatchConfig.builder()
+ .addStage(source)
+ .addStage(sink)
+ .addPostAction(action)
+ .addConnection(source.getName(), sink.getName())
+ .build();
+
+
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+ ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest");
+ ApplicationManager appManager = deployApplication(appId, appRequest);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+ Connection connection = createConnection();
+ Statement statement = connection.createStatement();
+
+ ResultSet results = statement.executeQuery("select * from " + POST_ACTION_TEST_TABLE_NAME);
+ results.next();
+ int id = results.getInt("ID");
+ Assert.assertEquals(id, 2);
+ Assert.assertFalse(results.next());
+ connection.close();
+ }
+
+ @After
+ public void dropTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("DROP TABLE " + POST_ACTION_TEST_TABLE_NAME + ";");
+ }
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java
new file mode 100644
index 000000000..45e473106
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.batch.sink.AbstractDBSink;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class SapHanaSinkTestRun extends SapHanaPluginTestBase {
+
+ public static final String SINK_TEST_TABLE_NAME = "SINK_TEST";
+
+
+ private static final Schema SCHEMA = Schema.recordOf(
+ "dbRecord",
+ Schema.Field.of("ID", Schema.of(Schema.Type.INT))
+ );
+
+ @Before
+ public void createTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE " + SINK_TEST_TABLE_NAME + "(ID INT);");
+ }
+
+
+ private void createInputData(String inputDatasetName) throws Exception {
+ // add some data to the input table
+ DataSetManager inputManager = getDataset(inputDatasetName);
+ List inputRecords = new ArrayList<>();
+ for (int i = 1; i <= 2; i++) {
+ inputRecords.add(StructuredRecord.builder(SCHEMA)
+ .set("ID", i)
+ .build());
+ }
+ MockSource.writeInput(inputManager, inputRecords);
+ }
+
+ @Test
+ public void testDBSinkWithExplicitInputSchema() throws Exception {
+ testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", true);
+ }
+
+ public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception {
+ ETLPlugin sourceConfig = (setInputSchema)
+ ? MockSource.getPlugin(inputDatasetName, SCHEMA)
+ : MockSource.getPlugin(inputDatasetName);
+
+ ETLPlugin sinkConfig = getSinkConfig();
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName);
+ createInputData(inputDatasetName);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(0)));
+ try (Connection conn = createConnection();
+ Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + SINK_TEST_TABLE_NAME)) {
+ Assert.assertTrue(resultSet.next());
+
+ }
+
+ }
+
+ private ETLPlugin getSinkConfig() {
+ return new ETLPlugin(
+ SapHanaConstants.PLUGIN_NAME,
+ BatchSink.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, SINK_TEST_TABLE_NAME)
+ .put(Constants.Reference.REFERENCE_NAME, "DBTest")
+ .build(),
+ null);
+ }
+
+
+ @After
+ public void dropTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("DROP TABLE " + SINK_TEST_TABLE_NAME + ";");
+ }
+
+}
diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java
new file mode 100644
index 000000000..4084293df
--- /dev/null
+++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.saphana;
+
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.batch.source.AbstractDBSource;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+public class SapHanaSourceTestRun extends SapHanaPluginTestBase {
+
+ public static final String SOURCE_TEST_TABLE_NAME = "SOURCE_TEST";
+
+ @Before
+ public void createTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("CREATE TABLE " + SOURCE_TEST_TABLE_NAME + "(ID INT);");
+ statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(1)");
+ statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(2)");
+ statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(3)");
+ }
+
+
+ @Test
+ public void testDBSource() throws Exception {
+ String importQuery = "SELECT ID from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS";
+ String splitBy = "ID";
+
+ ImmutableMap sourceProps = ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build();
+
+ ETLPlugin sourceConfig = new ETLPlugin(
+ SapHanaConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ sourceProps
+ );
+
+ ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable");
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBMacro");
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+ DataSetManager outputManager = getDataset("macroOutputTable");
+ Assert.assertEquals(3, MockSink.readOutput(outputManager).size());
+ }
+
+ @After
+ public void dropTables() throws Exception {
+ Connection conn = createConnection();
+ Statement statement = conn.createStatement();
+ statement.execute("DROP TABLE " + SOURCE_TEST_TABLE_NAME + ";");
+ }
+
+}