Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Str 2414 es indexes separate tables alias test #269

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ced9b07
added http.client.proxy.type
acristu Dec 19, 2023
efb55e2
ci: added package
acristu Dec 19, 2023
5d54b9d
ci fix package
acristu Dec 19, 2023
04c4c67
ci fix docker
acristu Dec 19, 2023
b0a9bc4
fix workflow
acristu Dec 19, 2023
3070af8
Merge remote-tracking branch 'origin/master' into STR-2361/lm-elastic…
acristu Dec 19, 2023
5d63a74
Merge pull request #1 from streamkap-com/STR-2361/lm-elastic-not-conn…
acristu Jan 9, 2024
8030ce4
Configurable redirection to topic based on _index from ES
EduardHantig Jan 20, 2024
b975f85
draft attempt to parallelize
acristu Jan 20, 2024
e6a991b
draft attempt to parallelize
acristu Jan 20, 2024
6e7ba3b
fixes
acristu Jan 20, 2024
70374e3
all offsets for all indexes must be saved in kafka source offsets topic
acristu Jan 20, 2024
be605e8
more work on offset management
acristu Jan 21, 2024
de81513
replaced index with endpoint, fix test
acristu Jan 21, 2024
6f5d641
fix tests
acristu Jan 21, 2024
c2a57cf
started adding test with opensearch
acristu Jan 21, 2024
dd7bf19
more work on tests
acristu Jan 21, 2024
c90ece6
StreamkapElasticConnectorTest basic test working
acristu Jan 21, 2024
f1fd591
index.refresh_interval defaults to 1s, so data inserted will be visib…
acristu Jan 22, 2024
35ec3e8
Change naming on fields for sourceRecord
EduardHantig Jan 22, 2024
d98dc0f
Fix unit tests after field naming changes
EduardHantig Jan 22, 2024
c15867a
added testTimestamp and testPerf
acristu Jan 22, 2024
7e6ffdb
Merge branch 'STR-2414-ES-indexes-separate-tables' of https://github.…
acristu Jan 22, 2024
c9e8900
make kafka.topic optional
acristu Jan 22, 2024
c3bc086
fix tests
acristu Jan 22, 2024
80ec073
fix: offsets not saving
acristu Jan 23, 2024
fd409f5
fix offset reading
acristu Jan 23, 2024
ca07044
fix offset reading
acristu Jan 23, 2024
342c725
replaced not allowed characters in the topic name
acristu Jan 25, 2024
6af76c1
for ES, when using aliases, the index name returned does not match th…
acristu Jan 25, 2024
84d5e4b
for ES, when using aliases, the index name returned does not match th…
acristu Jan 25, 2024
251e17c
for ES, when using aliases, the index name returned does not match th…
acristu Jan 25, 2024
9b6cfd1
Change index tests cover aliases flow
EduardHantig Jan 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
#
name: "CodeQL"

on:
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '35 18 * * 3'
on: workflow_dispatch
# push:
# branches: [ master ]
# pull_request:
# # The branches below must be a subset of the branches above
# branches: [ master ]
# schedule:
# - cron: '35 18 * * 3'

jobs:
analyze:
Expand Down
16 changes: 8 additions & 8 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
#
name: "CodeQL"

on:
push:
branches: [ master ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ master ]
schedule:
- cron: '22 7 * * 0'
on: workflow_dispatch
# push:
# branches: [ master ]
# pull_request:
# # The branches below must be a subset of the branches above
# branches: [ master ]
# schedule:
# - cron: '22 7 * * 0'

jobs:
analyze:
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/maven-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

name: Build

on:
push:
branches: [ master ]
pull_request_target:
branches: [ master ]
on: workflow_dispatch
# push:
# branches: [ master ]
# pull_request_target:
# branches: [ master ]


jobs:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/maven-release-central.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

name: Release to Maven Central

on:
push:
tags:
- 'v*'
on: workflow_dispatch
# push:
# tags:
# - 'v*'

jobs:
build:
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/maven-release-github.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

name: Release to GitHub

on:
push:
tags:
- 'v*'
on: workflow_dispatch
# push:
# tags:
# - 'v*'

jobs:
build:
Expand Down
80 changes: 80 additions & 0 deletions .github/workflows/skap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
name: Java CI

on: [push]

jobs:
test-and-build:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
with:
path: kafka-connect-http
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- uses: actions/cache@v1
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: kafka-connect-http test
run: |
cd kafka-connect-http
mvn -B test
- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure() # always run even if the previous step fails
with:
report_paths: 'kafka-connect-http/target/surefire-reports/TEST-*.xml'

package:
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v3
with:
path: kafka-connect-http
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- uses: actions/cache@v1
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: kafka-connect-http
run: |
cd kafka-connect-http
mvn -B clean package -DskipTests || true

- name: docker build
run: |
cd kafka-connect-http
version=`grep '<artifactId>kafka-connect-http-parent</artifactId>' -A1 pom.xml | grep version | grep -o '[0-9]*\.[0-9]*\.[0-9][^<]*'`
docker build -t 003675007768.dkr.ecr.us-west-2.amazonaws.com/connect-packages:kafka-connect-http-${version}-`date +%y%m%d`-`git rev-parse --short HEAD` .

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: docker push
run: |
cd kafka-connect-http
version=`grep '<artifactId>kafka-connect-http-parent</artifactId>' -A1 pom.xml | grep version | grep -o '[0-9]*\.[0-9]*\.[0-9][^<]*'`
docker push 003675007768.dkr.ecr.us-west-2.amazonaws.com/connect-packages:kafka-connect-http-${version}-`date +%y%m%d`-`git rev-parse --short HEAD`
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM busybox

ADD kafka-connect-http/target/components/packages/castorm-kafka-connect-http-*/castorm-kafka-connect-http-* /kafka-connect-http
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ For an Epoch representation of the same string, FreeMarker built-ins should be u
http.request.params=after=${offset.timestamp?datetime.iso?long}
```
For a complete understanding of the features provided by FreeMarker, please, refer to the
[User Manual](https://freemarker.apache.org/docs/index.html)
[User Manual](https://freemarker.apache.org/docs/endpoint.html)

---
<a name="client"/>
Expand Down Expand Up @@ -345,7 +345,7 @@ the list of `SourceRecord`s expected by Kafka Connect.
> ```java
> public interface HttpResponseParser extends Configurable {
>
> List<SourceRecord> parse(HttpResponse response);
> List<SourceRecord> parse(String endpoint, HttpResponse response);
> }
> ```
> * Type: `Class`
Expand Down Expand Up @@ -419,7 +419,7 @@ Parses the HTTP response into a key-value SourceRecord. This process is decompos
> ```java
> public interface KvSourceRecordMapper extends Configurable {
>
> SourceRecord map(KvRecord record);
> SourceRecord map(String endpoint, KvRecord record);
> }
> ```
> * Type: `Class`
Expand Down
9 changes: 8 additions & 1 deletion kafka-connect-http-infra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -56,6 +56,13 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>2.2.0</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ public class KafkaConnectInfra {
public KafkaConnectInfra() {
network = newNetwork();

kafka = new KafkaContainer(parse("confluentinc/cp-kafka:6.0.1"))
.withNetwork(network)
// Use following container instead for ARM Architecture (i.e. M2 Apple chip like)
// kafka = new KafkaContainer(parse("confluentinc/cp-kafka:7.5.3.arm64"))
kafka = new KafkaContainer(parse("confluentinc/cp-kafka:7.5.3"))
.withNetwork(network)
.withNetworkAliases("kafka");

kafkaConnect = new KafkaConnectContainer(parse("confluentinc/cp-kafka-connect:6.0.1"))
.withNetwork(network)
// Use following container instead for ARM Architecture (i.e. M2 Apple chip like)
// kafkaConnect = new KafkaConnectContainer(parse("confluentinc/cp-kafka-connect:7.5.3.arm64"))
kafkaConnect = new KafkaConnectContainer(parse("confluentinc/cp-kafka-connect:7.5.3"))
.withNetwork(network)
.withBootstrapServers("PLAINTEXT://kafka:9092")
.withFileSystemBind("target/kafka-connect-http", "/etc/kafka-connect/plugins/kafka-connect-http", READ_ONLY)
.withFileSystemBind("kafka-connect-http/target", "/etc/kafka-connect/plugins/kafka-connect-http", READ_ONLY)
.withStartupTimeout(Duration.ofMinutes(3));
}

Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-http-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/

import com.github.castorm.kafka.connect.http.HttpSourceConnector;
import com.github.castorm.kafka.connect.http.HttpSourceTask;
import com.github.castorm.kafka.connect.http.HttpSourceTaskSingleEndpoint;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -53,7 +53,7 @@ public static List<SourceRecord> readAllRecords(Map<String, String> config) {

@SneakyThrows
private static List<SourceRecord> runTaskUntilExhaust(Map<String, String> config) {
HttpSourceTask task = new HttpSourceTask();
HttpSourceTaskSingleEndpoint task = new HttpSourceTaskSingleEndpoint("dummy");
task.initialize(emptyContext());
task.start(config);
List<SourceRecord> allRecords = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,45 @@ class HttpSourceConnectorContainersIT {

KafkaClient kafkaClient = new KafkaClient(infra.getKafkaBootstrapServers());

// @Test
// @Timeout(60)
// void whenConnector1_thenRecordsInDifferentTopics() {
//
// String configJson = replaceVariables(readFileFromClasspath("connectors/connector1.json"), ImmutableMap.of("server.url", infra.getWiremockInternalUrl()));
//
// Map<String, String> config = kafkaConnectClient.createConnector(configJson);
//
// assertThat(kafkaClient.observeTopic("topic-name1")
// .take(1)
// .doOnNext(record -> log.info("{} {} {} {}", record.timestamp(), record.key(), record.value(), record.offset()))
// .collect(toList())
// .blockingGet())
// .extracting(ConsumerRecord::key)
// .containsExactly("Struct{key=TICKT-0002}");
//
// assertThat(kafkaClient.observeTopic("topic-name2")
// .take(1)
// .doOnNext(record -> log.info("{} {} {} {}", record.timestamp(), record.key(), record.value(), record.offset()))
// .collect(toList())
// .blockingGet())
// .extracting(ConsumerRecord::key)
// .containsExactly( "Struct{key=TICKT-0003}");
// }

@Test
@Timeout(60)
void whenConnector1_thenRecordsInTopic() {
void whenConnector2_thenRecordsInSingleTopic() {

String configJson = replaceVariables(readFileFromClasspath("connectors/connector1.json"), ImmutableMap.of("server.url", infra.getWiremockInternalUrl()));
String configJson = replaceVariables(readFileFromClasspath("connectors/connector2.json"), ImmutableMap.of("server.url", infra.getWiremockInternalUrl()));

Map<String, String> config = kafkaConnectClient.createConnector(configJson);

assertThat(kafkaClient.observeTopic(config.get("kafka.topic"))
.take(2)
.doOnNext(record -> log.info("{} {} {}", record.timestamp(), record.key(), record.value()))
.collect(toList())
.blockingGet())
.extracting(ConsumerRecord::key)
.containsExactly("Struct{key=TICKT-0002}", "Struct{key=TICKT-0003}");
.take(2)
.doOnNext(record -> log.info("{} {} {}", record.timestamp(), record.key(), record.value()))
.collect(toList())
.blockingGet())
.extracting(ConsumerRecord::key)
.containsExactly("Struct{key=TICKT-0002}", "Struct{key=TICKT-0003}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,26 @@ public void teardown() {
}

@Test
void validateConnector1() {

void validateConnectorWithKafkaTemplateConfig() {
Map<String, String> config = getConfigMap(replaceVariables(readFileFromClasspath("connectors/connector1.json"), properties));

List<SourceRecord> records = readAllRecords(config);

assertThat(records).hasSize(2);
assertThat(records).extracting(SourceRecord::topic).containsExactly("topic-name1", "topic-name2");
assertThat(records).extracting(record -> (String) record.sourceOffset().get("key")).containsExactly("TICKT-0002", "TICKT-0003");
assertThat(records).extracting(record -> (String) record.sourceOffset().get("timestamp")).containsExactly("2020-01-01T00:00:02Z", "2020-01-01T00:00:03Z");
assertThat(records).extracting(record -> (String) record.sourceOffset().get("endpoint")).containsExactly("topic-name1", "topic-name2");
assertThat(records).extracting(record -> record.value().toString()).containsExactly("Struct{_streamkap_value={\"id\":\"2\",\"key\":\"TICKT-0002\",\"fields\":{\"summary\":\"My first ticket\",\"created\":\"2020-01-01T00:00:02.000+0000\",\"updated\":\"2020-01-01T00:00:02.000+0000\",\"_index\":\"topic-name1\"}},_streamkap_key=TICKT-0002,_streamkap_index=topic-name1,_streamkap_timestamp=1577836802000}", "Struct{_streamkap_value={\"id\":\"3\",\"key\":\"TICKT-0003\",\"fields\":{\"summary\":\"My first ticket\",\"created\":\"2020-01-01T00:00:03.000+0000\",\"updated\":\"2020-01-01T00:00:03.000+0000\",\"_index\":\"topic-name2\"}},_streamkap_key=TICKT-0003,_streamkap_index=topic-name2,_streamkap_timestamp=1577836803000}");
}

@Test
void validateConnectorWithoutKafkaTemplateConfig() {

Map<String, String> config = getConfigMap(replaceVariables(readFileFromClasspath("connectors/connector2.json"), properties));

List<SourceRecord> records = readAllRecords(config);

assertThat(records).hasSize(2);
assertThat(records).extracting(SourceRecord::topic).allMatch(config.get("kafka.topic")::equals);
assertThat(records).extracting(record -> (String) record.sourceOffset().get("key")).containsExactly("TICKT-0002", "TICKT-0003");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class KafkaConnectInfraExtension implements InvocationInterceptor {

KafkaConnectInfra infra = new KafkaConnectInfra();

// Use following container instead for ARM Architecture (i.e. M2 Apple chip like)
// WiremockContainer wiremock = new WiremockContainer(parse("wiremock/wiremock:nightly"))
WiremockContainer wiremock = new WiremockContainer(parse("rodolpheche/wiremock:2.25.1"))
.withNetwork(infra.getNetwork())
.withNetworkAliases("wiremock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"fields": {
"summary": "My first ticket",
"created": "2020-01-01T00:00:02.000+0000",
"updated": "2020-01-01T00:00:02.000+0000"
"updated": "2020-01-01T00:00:02.000+0000",
"_index": "topic-name1"
}
},
{
Expand All @@ -18,7 +19,8 @@
"fields": {
"summary": "My first ticket",
"created": "2020-01-01T00:00:03.000+0000",
"updated": "2020-01-01T00:00:03.000+0000"
"updated": "2020-01-01T00:00:03.000+0000",
"_index": "topic-name2"
}
}
]
Expand Down
Loading
Loading