From 61e37ddd66feadd3d679f21b4e128d6e83872385 Mon Sep 17 00:00:00 2001 From: Andrew Khalymon Date: Wed, 4 Dec 2019 20:13:27 +0200 Subject: [PATCH] PLUGIN-72 SAP HANA Plugin integration tests --- .../db-plugins-env/docker-compose.yml | 1 - saphana-plugin/pom.xml | 6 + .../plugin/saphana/SapHanaPostAction.java | 3 + .../plugin/saphana/SapHanaActionTestRun.java | 98 +++++++++++++++ .../plugin/saphana/SapHanaPluginTestBase.java | 89 ++++++++++++++ .../saphana/SapHanaPluginTestSuite.java | 36 ++++++ .../saphana/SapHanaPostActionTestRun.java | 100 +++++++++++++++ .../plugin/saphana/SapHanaSinkTestRun.java | 116 ++++++++++++++++++ .../plugin/saphana/SapHanaSourceTestRun.java | 88 +++++++++++++ 9 files changed, 536 insertions(+), 1 deletion(-) create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java create mode 100644 saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java diff --git a/docker-compose/db-plugins-env/docker-compose.yml b/docker-compose/db-plugins-env/docker-compose.yml index 2b5206e9f..2df4c7ad4 100644 --- a/docker-compose/db-plugins-env/docker-compose.yml +++ b/docker-compose/db-plugins-env/docker-compose.yml @@ -80,7 +80,6 @@ services: sysctls: - kernel.shmmax=1073741824 - net.ipv4.ip_local_port_range=60000 65535 - - kernel.shmmni=524288 - kernel.shmall=8388608 extra_hosts: # Alter this if running on non-Linux machine diff --git a/saphana-plugin/pom.xml b/saphana-plugin/pom.xml index 01bb2c815..de0b1a2f1 100644 --- a/saphana-plugin/pom.xml +++ b/saphana-plugin/pom.xml @@ -51,6 +51,12 @@ test-jar test + + com.sap.cloud.db.jdbc + ngdbc + 2.3.48 + test + io.cdap.cdap hydrator-test diff --git a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java index 4d14c6bfd..a2620d490 100644 --- a/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java +++ b/saphana-plugin/src/main/java/io/cdap/plugin/saphana/SapHanaPostAction.java @@ -31,8 +31,11 @@ @Description("Runs a SAP HANA query after a pipeline run.") public class SapHanaPostAction extends AbstractQueryAction { + private final SapHanaQueryActionConfig sapHanaQueryActionConfig; + public SapHanaPostAction(SapHanaQueryActionConfig sapHanaQueryActionConfig) { super(sapHanaQueryActionConfig, false); + this.sapHanaQueryActionConfig = sapHanaQueryActionConfig; } /** diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java new file mode 100644 index 000000000..9e63bc272 --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaActionTestRun.java @@ -0,0 +1,98 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.db.batch.action.QueryConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class SapHanaActionTestRun extends SapHanaPluginTestBase { + + protected static final String ACTION_TEST_TABLE_NAME = "DB_ACTION_TEST"; + + + @Before + public void createTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE " + ACTION_TEST_TABLE_NAME + "(ID INT);"); + statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(1)"); + statement.execute("INSERT INTO " + ACTION_TEST_TABLE_NAME + " VALUES(2)"); + } + + + @Test + public void testDBAction() throws Exception { + ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput")); + ETLStage action = new ETLStage("action", new ETLPlugin( + SapHanaConstants.PLUGIN_NAME, + Action.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "DELETE from " + ACTION_TEST_TABLE_NAME + " where ID=1") + .build(), + null)); + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addStage(action) + .addConnection(sink.getName(), action.getName()) + .addConnection(source.getName(), sink.getName()) + .build(); + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("actionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + Connection connection = createConnection(); + Statement statement = connection.createStatement(); + + ResultSet results = statement.executeQuery("select * from " + ACTION_TEST_TABLE_NAME); + results.next(); + int id = results.getInt("ID"); + Assert.assertEquals(id, 2); + Assert.assertNotEquals(id, 1); + Assert.assertFalse(results.next()); + + } + + @After + public void dropTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("DROP TABLE " + ACTION_TEST_TABLE_NAME + ";"); + } + +} diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java new file mode 100644 index 000000000..42d894976 --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestBase.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.sap.db.jdbc.Driver; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.plugin.PluginClass; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.batch.DatabasePluginTestBase; +import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat; +import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat; +import org.junit.BeforeClass; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Collections; +import java.util.Map; + + +public abstract class SapHanaPluginTestBase extends DatabasePluginTestBase { + + + protected static final String JDBC_DRIVER_NAME = "sap"; + protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0"); + protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final Map BASE_PROPS = ImmutableMap.builder() + .put(ConnectionConfig.HOST, System.getProperty("saphana.host", "localhost")) + .put(ConnectionConfig.PORT, System.getProperty("saphana.port", "39017")) + .put(ConnectionConfig.DATABASE, System.getProperty("sapahana.database", "SYSTEMDB")) + .put(ConnectionConfig.USER, System.getProperty("sapahana.user", "SYSTEM")) + .put(ConnectionConfig.PASSWORD, System.getProperty("sapahana.password", "SAPhxe123")) + .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME) + .build(); + private static String connectionUrl; + private static Boolean setupCompleted = false; + + + @BeforeClass + public static void setupTest() throws Exception { + if (setupCompleted) { + return; + } + System.out.println("Setting up batch artifacts"); + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + System.out.println("Adding plugin artifact"); + addPluginArtifact(NamespaceId.DEFAULT.artifact(SapHanaConstants.PLUGIN_NAME, "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, SapHanaSource.class, DBRecord.class, ETLDBOutputFormat.class, + DataDrivenETLDBInputFormat.class, SapHanaAction.class, SapHanaPostAction.class); + + PluginClass sapHanaDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, + "SapHana driver class", Driver.class.getName(), null, Collections.emptyMap()); + addPluginArtifact(NamespaceId.DEFAULT.artifact("saphana-jdbc-connector", "1.0.0"), DATAPIPELINE_ARTIFACT_ID, + Sets.newHashSet(sapHanaDriver), Driver.class); + + connectionUrl = "jdbc:sap://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" + + BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE); + setupCompleted = true; + } + + + protected static Connection createConnection() throws Exception { + Class.forName(Driver.class.getCanonicalName()); + return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER), + BASE_PROPS.get(ConnectionConfig.PASSWORD)); + } + + +} + diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java new file mode 100644 index 000000000..714212e6f --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPluginTestSuite.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + +import io.cdap.cdap.common.test.TestSuite; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + + +/** + * This is a test suite that runs all the tests for Database plugins. + */ +@RunWith(TestSuite.class) +@Suite.SuiteClasses({ + SapHanaActionTestRun.class, + SapHanaSourceTestRun.class, + SapHanaSinkTestRun.class, + SapHanaPostActionTestRun.class +}) +public class SapHanaPluginTestSuite extends SapHanaPluginTestBase { + +} diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java new file mode 100644 index 000000000..1e9de305c --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaPostActionTestRun.java @@ -0,0 +1,100 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.batch.PostAction; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.common.batch.action.Condition; +import io.cdap.plugin.db.batch.action.QueryActionConfig; +import io.cdap.plugin.db.batch.action.QueryConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class SapHanaPostActionTestRun extends SapHanaPluginTestBase { + + protected static final String POST_ACTION_TEST_TABLE_NAME = "POST_ACTION_TEST"; + + @Before + public void createTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE " + POST_ACTION_TEST_TABLE_NAME + "(ID INT);"); + statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(1)"); + statement.execute("INSERT INTO " + POST_ACTION_TEST_TABLE_NAME + " VALUES(2)"); + } + + + @Test + public void testDBPostAction() throws Exception { + ETLStage source = new ETLStage("source", MockSource.getPlugin("postActionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("postActionOutput")); + ETLStage action = new ETLStage("postAction", new ETLPlugin( + SapHanaConstants.PLUGIN_NAME, + PostAction.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "DELETE from " + POST_ACTION_TEST_TABLE_NAME + " where ID=1") + .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name()) + .build(), + null)); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addPostAction(action) + .addConnection(source.getName(), sink.getName()) + .build(); + + + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + Connection connection = createConnection(); + Statement statement = connection.createStatement(); + + ResultSet results = statement.executeQuery("select * from " + POST_ACTION_TEST_TABLE_NAME); + results.next(); + int id = results.getInt("ID"); + Assert.assertEquals(id, 2); + Assert.assertFalse(results.next()); + connection.close(); + } + + @After + public void dropTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("DROP TABLE " + POST_ACTION_TEST_TABLE_NAME + ";"); + } +} diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java new file mode 100644 index 000000000..45e473106 --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSinkTestRun.java @@ -0,0 +1,116 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.db.batch.sink.AbstractDBSink; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + + +public class SapHanaSinkTestRun extends SapHanaPluginTestBase { + + public static final String SINK_TEST_TABLE_NAME = "SINK_TEST"; + + + private static final Schema SCHEMA = Schema.recordOf( + "dbRecord", + Schema.Field.of("ID", Schema.of(Schema.Type.INT)) + ); + + @Before + public void createTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE " + SINK_TEST_TABLE_NAME + "(ID INT);"); + } + + + private void createInputData(String inputDatasetName) throws Exception { + // add some data to the input table + DataSetManager inputManager = getDataset(inputDatasetName); + List inputRecords = new ArrayList<>(); + for (int i = 1; i <= 2; i++) { + inputRecords.add(StructuredRecord.builder(SCHEMA) + .set("ID", i) + .build()); + } + MockSource.writeInput(inputManager, inputRecords); + } + + @Test + public void testDBSinkWithExplicitInputSchema() throws Exception { + testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", true); + } + + public void testDBSink(String appName, String inputDatasetName, boolean setInputSchema) throws Exception { + ETLPlugin sourceConfig = (setInputSchema) + ? MockSource.getPlugin(inputDatasetName, SCHEMA) + : MockSource.getPlugin(inputDatasetName); + + ETLPlugin sinkConfig = getSinkConfig(); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName); + createInputData(inputDatasetName); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(0))); + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery("SELECT * FROM " + SINK_TEST_TABLE_NAME)) { + Assert.assertTrue(resultSet.next()); + + } + + } + + private ETLPlugin getSinkConfig() { + return new ETLPlugin( + SapHanaConstants.PLUGIN_NAME, + BatchSink.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, SINK_TEST_TABLE_NAME) + .put(Constants.Reference.REFERENCE_NAME, "DBTest") + .build(), + null); + } + + + @After + public void dropTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("DROP TABLE " + SINK_TEST_TABLE_NAME + ";"); + } + +} diff --git a/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java new file mode 100644 index 000000000..4084293df --- /dev/null +++ b/saphana-plugin/src/test/java/io/cdap/plugin/saphana/SapHanaSourceTestRun.java @@ -0,0 +1,88 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.saphana; + + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.db.batch.source.AbstractDBSource; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.Statement; + +public class SapHanaSourceTestRun extends SapHanaPluginTestBase { + + public static final String SOURCE_TEST_TABLE_NAME = "SOURCE_TEST"; + + @Before + public void createTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("CREATE TABLE " + SOURCE_TEST_TABLE_NAME + "(ID INT);"); + statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(1)"); + statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(2)"); + statement.execute("INSERT INTO " + SOURCE_TEST_TABLE_NAME + " VALUES(3)"); + } + + + @Test + public void testDBSource() throws Exception { + String importQuery = "SELECT ID from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS"; + String boundingQuery = "SELECT MIN(ID),MAX(ID) from " + SOURCE_TEST_TABLE_NAME + " where $CONDITIONS"; + String splitBy = "ID"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery) + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy) + .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build(); + + ETLPlugin sourceConfig = new ETLPlugin( + SapHanaConstants.PLUGIN_NAME, + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testDBMacro"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + DataSetManager
outputManager = getDataset("macroOutputTable"); + Assert.assertEquals(3, MockSink.readOutput(outputManager).size()); + } + + @After + public void dropTables() throws Exception { + Connection conn = createConnection(); + Statement statement = conn.createStatement(); + statement.execute("DROP TABLE " + SOURCE_TEST_TABLE_NAME + ";"); + } + +}