diff --git a/docs/content.zh/docs/connectors/datastream/cassandra.md b/docs/content.zh/docs/connectors/datastream/cassandra.md index a3aca81f..d79fb505 100644 --- a/docs/content.zh/docs/connectors/datastream/cassandra.md +++ b/docs/content.zh/docs/connectors/datastream/cassandra.md @@ -65,7 +65,7 @@ ClusterBuilder clusterBuilder = new ClusterBuilder() { .build(); } }; -long maxSplitMemorySize = ... //optional max split size in bytes minimum is 10MB. If not set, maxSplitMemorySize = 64 MB +long maxSplitMemorySize = ... //optional max split size in bytes minimum is 1MB. If not set, maxSplitMemorySize = 64 MB Source cassandraSource = new CassandraSource(clusterBuilder, maxSplitMemorySize, Pojo.class, diff --git a/docs/content/docs/connectors/datastream/cassandra.md b/docs/content/docs/connectors/datastream/cassandra.md index a4a1a924..4345cc20 100644 --- a/docs/content/docs/connectors/datastream/cassandra.md +++ b/docs/content/docs/connectors/datastream/cassandra.md @@ -65,7 +65,7 @@ ClusterBuilder clusterBuilder = new ClusterBuilder() { .build(); } }; -long maxSplitMemorySize = ... //optional max split size in bytes minimum is 10MB. If not set, maxSplitMemorySize = 64 MB +long maxSplitMemorySize = ... //optional max split size in bytes minimum is 1MB. If not set, maxSplitMemorySize = 64 MB Source cassandraSource = new CassandraSource(clusterBuilder, maxSplitMemorySize, Pojo.class, diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java index b9b6d0ff..5ba2eaec 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java @@ -70,7 +70,7 @@ * .build(); * } * }; - * long maxSplitMemorySize = ... //optional max split size in bytes minimum is 10MB. If not set, maxSplitMemorySize = 64 MB + * long maxSplitMemorySize = ... //optional max split size in bytes minimum is 1MB. If not set, maxSplitMemorySize = 64 MB * Source cassandraSource = new CassandraSource(clusterBuilder, * maxSplitMemorySize, * Pojo.class, @@ -104,7 +104,7 @@ public class CassandraSource private final MapperOptions mapperOptions; private final long maxSplitMemorySize; - private static final long MIN_SPLIT_MEMORY_SIZE = MemorySize.ofMebiBytes(10).getBytes(); + private static final long MIN_SPLIT_MEMORY_SIZE = MemorySize.ofMebiBytes(1).getBytes(); static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = MemorySize.ofMebiBytes(64).getBytes(); public CassandraSource( diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java index e44be343..98585c6e 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java @@ -27,7 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; @@ -147,7 +149,9 @@ private float getRingFraction(List tokenRanges) { addressedTokens.add(distance(tokenRange.rangeStart, tokenRange.rangeEnd)); } // it is < 1 because it is a percentage - return addressedTokens.divide(partitioner.ringSize).floatValue(); + return new BigDecimal(addressedTokens) + .divide(new BigDecimal(partitioner.ringSize), 6, RoundingMode.HALF_UP) + .floatValue(); } /** Gets the list of token ranges that the table occupies on a given Cassandra node. */