Skip to content

[FLINK-37244] mention scylladb in docs #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Apache Flink Cassandra Connector

This repository contains the official Apache Flink Cassandra connector.
This repository contains the official Apache Flink Cassandra connector.

As ScyllaDB is a drop-in replacement for Apache Cassandra and can be used also by connector just by replacing connection string from Cassandra to ScyllaDB.

## Apache Flink

Expand Down
36 changes: 36 additions & 0 deletions docs/content.zh/docs/connectors/datastream/scylladb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
title: ScyllaDB
weight: 4
type: docs
aliases:
- /zh/dev/connectors/scylladb.html
- /zh/apis/streaming/connectors/scylladb.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# ScyllaDB Connector

ScyllaDB is supported by Apache Cassandra Connector just by replacing connection string from running Cassandra to running ScyllaDB.

## Installing ScyllaDB
There are multiple ways to bring up a ScyllaDB instance on local machine:

1. Follow the instructions from [ScyllaDB Getting Started page](https://docs.scylladb.com/getting-started/).
2. Launch a container running ScyllaDB from [Official Docker Repository](https://hub.docker.com/r/scylladb/scylla/)
36 changes: 36 additions & 0 deletions docs/content/docs/connectors/datastream/scylladb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
title: ScyllaDB
weight: 4
type: docs
aliases:
- /zh/dev/connectors/scylladb.html
- /zh/apis/streaming/connectors/scylladb.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# ScyllaDB Connector

ScyllaDB is supported by Apache Cassandra Connector just by replacing connection string from running Cassandra to running ScyllaDB.

## Installing ScyllaDB
There are multiple ways to bring up a ScyllaDB instance on local machine:

1. Follow the instructions from [ScyllaDB Getting Started page](https://docs.scylladb.com/getting-started/).
2. Launch a container running ScyllaDB from [Official Docker Repository](https://hub.docker.com/r/scylladb/scylla/)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
Expand All @@ -48,7 +49,7 @@
@Testcontainers
public class CassandraTestEnvironment implements TestResource {
private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class);
private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
public static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
private static final int CQL_PORT = 9042;

private static final int READ_TIMEOUT_MILLIS = 36000;
Expand All @@ -70,7 +71,7 @@ public class CassandraTestEnvironment implements TestResource {
"INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)";
private static final int NB_SPLITS_RECORDS = 1000;

@Container private final CassandraContainer cassandraContainer;
@Container private GenericContainer<?> container;

boolean insertTestDataForSplitSizeTests;
private Cluster cluster;
Expand All @@ -80,10 +81,21 @@ public class CassandraTestEnvironment implements TestResource {

public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
container = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
// more generous timeouts
addJavaOpts(
cassandraContainer,
container,
"-Dcassandra.request_timeout_in_ms=30000",
"-Dcassandra.read_request_timeout_in_ms=15000",
"-Dcassandra.write_request_timeout_in_ms=6000");
}

public CassandraTestEnvironment(
GenericContainer<?> container, boolean insertTestDataForSplitSizeTests) {
this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
// more generous timeouts
addJavaOpts(
container,
"-Dcassandra.request_timeout_in_ms=30000",
"-Dcassandra.read_request_timeout_in_ms=15000",
"-Dcassandra.write_request_timeout_in_ms=6000");
Expand All @@ -106,29 +118,30 @@ private static void addJavaOpts(GenericContainer<?> container, String... opts) {

private void startEnv() throws Exception {
// configure container start to wait until cassandra is ready to receive queries
cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
container.waitingFor(new CassandraQueryWaitStrategy());
// start with retrials
cassandraContainer.start();
cassandraContainer.followOutput(
container.start();
container.followOutput(
new Slf4jLogConsumer(LOG),
OutputFrame.OutputType.END,
OutputFrame.OutputType.STDERR,
OutputFrame.OutputType.STDOUT);

cluster = cassandraContainer.getCluster();
cluster = getCluster(container);
// ConsistencyLevel.ONE is the minimum level for reading
builderForReading =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ONE,
cassandraContainer.getHost(),
cassandraContainer.getMappedPort(CQL_PORT));
container.getHost(),
container.getMappedPort(CQL_PORT));

// Lower consistency level ANY is only available for writing.
builderForWriting =
createBuilderWithConsistencyLevel(
ConsistencyLevel.ANY,
cassandraContainer.getHost(),
cassandraContainer.getMappedPort(CQL_PORT));
container.getHost(),
container.getMappedPort(CQL_PORT));

session = cluster.connect();
executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
// create a dedicated table for split size tests (to avoid having to flush with each test)
Expand All @@ -153,7 +166,15 @@ private void stopEnv() {
if (cluster != null) {
cluster.close();
}
cassandraContainer.stop();
container.stop();
}

private Cluster getCluster(ContainerState containerState) {
Cluster.Builder builder =
Cluster.builder()
.addContactPoint(containerState.getHost())
.withPort(containerState.getMappedPort(CQL_PORT));
return builder.build();
}

private ClusterBuilder createBuilderWithConsistencyLevel(
Expand Down Expand Up @@ -184,7 +205,7 @@ protected Cluster buildCluster(Cluster.Builder builder) {
* size_estimates system table.
*/
void flushMemTables(String table) throws Exception {
cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, table);
container.execInContainer("nodetool", "flush", KEYSPACE, table);
Thread.sleep(FLUSH_MEMTABLES_DELAY);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.apache.flink.connector.cassandra;

import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.GenericContainer;

import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

import static org.apache.flink.connector.cassandra.CassandraTestEnvironment.DOCKER_CASSANDRA_IMAGE;

/**
* Provides test environments for running tests with different Cassandra-based containers. This
* class implements the {@link TestTemplateInvocationContextProvider} interface to supply multiple
* invocation contexts for test templates. The invocation contexts generated by this provider
* represent various Cassandra environments, such as ScyllaDB or Cassandra containers, ensuring
* tests can be run in diverse setups.
*/
public class CassandraTestEnvironmentProvider implements TestTemplateInvocationContextProvider {

@Override
public boolean supportsTestTemplate(ExtensionContext context) {
return true;
}

@Override
public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
ExtensionContext context) {
return cassandraTestEnvironments(true).map(CassandraTestInvocationContext::new);
}

private Stream<CassandraTestEnvironment> cassandraTestEnvironments(
boolean insertTestDataForSplitSizeTests) {
return Stream.of(
new CassandraTestEnvironment(
new GenericContainer<>("scylladb/scylla:6.2"),
insertTestDataForSplitSizeTests),
new CassandraTestEnvironment(
new CassandraContainer<>(DOCKER_CASSANDRA_IMAGE),
insertTestDataForSplitSizeTests));
}

private static class CassandraTestInvocationContext implements TestTemplateInvocationContext {
private final CassandraTestEnvironment environment;

public CassandraTestInvocationContext(CassandraTestEnvironment environment) {
this.environment = environment;
}

@Override
public String getDisplayName(int invocationIndex) {
return String.format(
"Cassandra Test Environment: %s",
environment.getSession().getCluster().getClusterName());
}

@Override
public List<Extension> getAdditionalExtensions() {
return Collections.singletonList(
new ParameterResolver() {
@Override
public boolean supportsParameter(
ParameterContext parameterContext,
ExtensionContext extensionContext) {
return parameterContext
.getParameter()
.getType()
.equals(CassandraTestEnvironment.class);
}

@Override
public Object resolveParameter(
ParameterContext parameterContext,
ExtensionContext extensionContext) {
return environment;
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package org.apache.flink.connector.cassandra.source;

import org.apache.flink.connector.cassandra.CassandraTestEnvironment;
import org.apache.flink.connector.cassandra.CassandraTestEnvironmentProvider;
import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
Expand All @@ -39,38 +38,33 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.List;

import static java.util.concurrent.CompletableFuture.runAsync;
import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for the Cassandra source. */
@ExtendWith(CassandraTestEnvironmentProvider.class)
class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {

@TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment();

@TestExternalSystem
CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(true);

@TestSemantics
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};

@TestContext
CassandraTestContextFactory contextFactory =
new CassandraTestContextFactory(cassandraTestEnvironment);

@TestTemplate
@DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)")
public void testGenerateSplitsMurMur3Partitioner(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {
CheckpointingMode semantic,
CassandraTestEnvironment cassandraTestEnvironment) {
final int parallelism = 2;
SplitsGenerator generator =
new SplitsGenerator(
Expand Down Expand Up @@ -99,7 +93,8 @@ public void testGenerateSplitsMurMur3Partitioner(
public void testGenerateSplitsRandomPartitioner(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic) {
CheckpointingMode semantic,
CassandraTestEnvironment cassandraTestEnvironment) {
final int parallelism = 2;
final SplitsGenerator generator =
new SplitsGenerator(
Expand Down Expand Up @@ -130,7 +125,8 @@ public void testGenerateSplitsRandomPartitioner(
public void testGenerateSplitsWithCorrectSize(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic)
CheckpointingMode semantic,
CassandraTestEnvironment cassandraTestEnvironment)
throws Exception {
final int parallelism = 2;
final long maxSplitMemorySize = 10000L;
Expand All @@ -156,7 +152,8 @@ public void testGenerateSplitsWithCorrectSize(
public void testGenerateSplitsWithTooHighMaximumSplitSize(
TestEnvironment testEnv,
DataStreamSourceExternalContext<Pojo> externalContext,
CheckpointingMode semantic)
CheckpointingMode semantic,
CassandraTestEnvironment cassandraTestEnvironment)
throws Exception {
final int parallelism = 20;
final SplitsGenerator generator =
Expand Down
Loading