-
Notifications
You must be signed in to change notification settings - Fork 3
fix(deps): update module github.com/segmentio/kafka-go to v0.4.48 #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
[puLL-Merge] - segmentio/kafka-go@v0.4.47..v0.4.48 Diffdiff --git .circleci/config.yml .circleci/config.yml
index 25fad54e3..a32a0d322 100644
--- .circleci/config.yml
+++ .circleci/config.yml
@@ -9,34 +9,40 @@ jobs:
# The kafka 0.10 tests are maintained as a separate configuration because
# kafka only supported plain text SASL in this version.
+ # NOTE: Bitnami does not have suport for kafka version 0.10.1.1. Hence we use 0.10.2.1
kafka-010:
working_directory: &working_directory /go/src/github.com/segmentio/kafka-go
- environment:
- KAFKA_VERSION: "0.10.1"
docker:
- image: circleci/golang
- - image: wurstmeister/zookeeper
+ - image: bitnami/zookeeper:latest
ports:
- 2181:2181
- - image: wurstmeister/kafka:0.10.1.1
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ - image: bitnami/kafka:0.10.2.1
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID: '1'
- KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
+ KAFKA_BROKER_ID: 1
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_ADVERTISED_HOST_NAME: 'localhost'
KAFKA_ADVERTISED_PORT: '9092'
- KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
+ KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: '200000000'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
- KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
- KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
- CUSTOM_INIT_SCRIPT: |-
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
+ KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+ KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint:
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_server_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram; exec /app-entrypoint.sh /start-kafka.sh
+
steps: &steps
- checkout
- restore_cache:
@@ -48,6 +54,9 @@ jobs:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
paths:
- /go/pkg/mod
+ - run:
+ name: Wait for kafka
+ command: ./scripts/wait-for-kafka.sh
- run:
name: Test kafka-go
command: go test -race -cover ./...
@@ -59,156 +68,11 @@ jobs:
working_directory: ./sasl/aws_msk_iam
command: go test -race -cover ./...
- # Starting at version 0.11, the kafka features and configuration remained
- # mostly stable, so we can use this CI job configuration as template for other
- # versions as well.
- kafka-011:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "0.11.0"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.11-0.11.0.3
- ports:
- - 9092:9092
- - 9093:9093
- environment: &environment
- KAFKA_BROKER_ID: '1'
- KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
- KAFKA_DELETE_TOPIC_ENABLE: 'true'
- KAFKA_ADVERTISED_HOST_NAME: 'localhost'
- KAFKA_ADVERTISED_PORT: '9092'
- KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_MESSAGE_MAX_BYTES: '200000000'
- KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
- KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
- KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
- CUSTOM_INIT_SCRIPT: |-
- apk add libgcc;
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
- /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
- steps: *steps
-
- kafka-101:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "1.0.1"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.11-1.0.1
- ports:
- - 9092:9092
- - 9093:9093
- environment: *environment
- steps: *steps
-
- kafka-111:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "1.1.1"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.11-1.1.1
- ports:
- - 9092:9092
- - 9093:9093
- environment: *environment
- steps: *steps
-
- kafka-201:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "2.0.1"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.12-2.0.1
- ports:
- - 9092:9092
- - 9093:9093
- environment:
- <<: *environment
- KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
- steps: *steps
-
- kafka-211:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "2.1.1"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.12-2.1.1
- ports:
- - 9092:9092
- - 9093:9093
- # recently, the base image for some newer versions of kafka switched from
- # alpine to debian, which causes the "apk add ..." line to fail. The env
- # map should be used for any versions that fail due to being part of this
- # migration.
- environment: &environmentDebian
- <<: *environment
- KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
- CUSTOM_INIT_SCRIPT: |-
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
- /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
- steps: *steps
-
- kafka-222:
+ # NOTE: Bitnami does not have suport for kafka version 2.7.1. Hence we use 2.7.0
+ kafka-270:
working_directory: *working_directory
environment:
- KAFKA_VERSION: "2.2.2"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.12-2.2.2
- ports:
- - 9092:9092
- - 9093:9093
- environment:
- <<: *environmentDebian
- steps: *steps
-
- kafka-231:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "2.3.1"
- docker:
- - image: circleci/golang
- - image: wurstmeister/zookeeper
- ports:
- - 2181:2181
- - image: wurstmeister/kafka:2.12-2.3.1
- ports:
- - 9092:9092
- - 9093:9093
- environment:
- <<: *environmentDebian
- steps: *steps
-
- kafka-241:
- working_directory: *working_directory
- environment:
- KAFKA_VERSION: "2.4.1"
+ KAFKA_VERSION: "2.7.0"
# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
@@ -222,21 +86,40 @@ jobs:
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- - image: wurstmeister/zookeeper
+ - image: bitnami/zookeeper:latest
ports:
- 2181:2181
- - image: wurstmeister/kafka:2.12-2.4.1
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ - image: bitnami/kafka:2.7.0
ports:
- 9092:9092
- 9093:9093
- environment:
- <<: *environmentDebian
+ environment: &environment
+ KAFKA_CFG_BROKER_ID: 1
+ KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_CFG_ADVERTISED_PORT: '9092'
+ KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+ KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+ KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+ KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint: &entrypoint
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
steps: *steps
- kafka-260:
+ kafka-281:
working_directory: *working_directory
environment:
- KAFKA_VERSION: "2.6.0"
+ KAFKA_VERSION: "2.8.1"
# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
@@ -250,23 +133,23 @@ jobs:
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- - image: wurstmeister/zookeeper
+ - image: bitnami/zookeeper:latest
ports:
- 2181:2181
- - image: wurstmeister/kafka:2.13-2.6.0
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ - image: bitnami/kafka:2.8.1
ports:
- 9092:9092
- 9093:9093
- environment:
- <<: *environment
- KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ environment: *environment
+ entrypoint: *entrypoint
steps: *steps
- kafka-271:
+ kafka-370:
working_directory: *working_directory
environment:
- KAFKA_VERSION: "2.7.1"
+ KAFKA_VERSION: "3.7.0"
# Need to skip nettest to avoid these kinds of errors:
# --- FAIL: TestConn/nettest (17.56s)
@@ -280,32 +163,80 @@ jobs:
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- - image: wurstmeister/zookeeper
+ - image: bitnami/zookeeper:latest
ports:
- 2181:2181
- - image: wurstmeister/kafka:2.13-2.7.1
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ - image: bitnami/kafka:3.7.0
ports:
- - 9092:9092
- - 9093:9093
+ - 9092:9092
+ - 9093:9093
environment:
<<: *environment
- KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+ entrypoint: *entrypoint
steps: *steps
+ # NOTE: this fails quite often due to Java heap errors from Kafka.
+ # Once we figure out how to fix that, we can re-enable this.
+ # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900
+ # kafka-400:
+ # working_directory: *working_directory
+ # environment:
+ # KAFKA_VERSION: "4.0.0"
+
+ # # Need to skip nettest to avoid these kinds of errors:
+ # # --- FAIL: TestConn/nettest (17.56s)
+ # # --- FAIL: TestConn/nettest/PingPong (7.40s)
+ # # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
+ # # conntest.go:118: mismatching value: got 77, want 78
+ # # conntest.go:118: mismatching value: got 78, want 79
+ # # ...
+ # #
+ # # TODO: Figure out why these are happening and fix them (they don't appear to be new).
+ # KAFKA_SKIP_NETTEST: "1"
+ # docker:
+ # - image: circleci/golang
+ # - image: bitnami/kafka:4.0.0
+ # ports:
+ # - 9092:9092
+ # - 9093:9093
+ # environment:
+ # KAFKA_CFG_NODE_ID: 1
+ # KAFKA_CFG_BROKER_ID: 1
+ # KAFKA_CFG_PROCESS_ROLES: broker,controller
+ # KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ # KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ # KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
+ # KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
+ # KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
+ # KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
+ # KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ # KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
+ # ALLOW_PLAINTEXT_LISTENER: yes
+ # KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ # KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ # KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ # KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ # KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ # KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
+ # KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
+ # KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
+ # KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
+ # KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
+ # KAFKA_INTER_BROKER_USER: adminscram512
+ # KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
+ # KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+ # steps: *steps
+
workflows:
version: 2
run:
jobs:
- lint
- kafka-010
- - kafka-011
- - kafka-101
- - kafka-111
- - kafka-201
- - kafka-211
- - kafka-222
- - kafka-231
- - kafka-241
- - kafka-260
- - kafka-271
+ - kafka-270
+ - kafka-281
+ - kafka-370
+ #- kafka-400
diff --git Makefile Makefile
index e2374f2e3..47f45c950 100644
--- Makefile
+++ Makefile
@@ -4,4 +4,4 @@ test:
go test -race -cover ./...
docker:
- docker-compose up -d
+ docker compose up -d
diff --git README.md README.md
index e17878825..4e6cd1229 100644
--- README.md
+++ README.md
@@ -108,7 +108,7 @@ if err := conn.Close(); err != nil {
```
### To Create Topics
-By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_AUTO_CREATE_TOPICS_ENABLE='true'` in the wurstmeister/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
+By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'` in the bitnami/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
```go
// to create topics when auto.create.topics.enable='true'
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
@@ -797,3 +797,8 @@ KAFKA_VERSION=2.3.1 \
KAFKA_SKIP_NETTEST=1 \
go test -race ./...
```
+
+(or) to clean up the cached test results and run tests:
+```
+go clean -cache && make test
+```
diff --git addoffsetstotxn_test.go addoffsetstotxn_test.go
index 56d6ee46a..86fd3d711 100644
--- addoffsetstotxn_test.go
+++ addoffsetstotxn_test.go
@@ -3,9 +3,7 @@ package kafka
import (
"context"
"log"
- "net"
"os"
- "strconv"
"testing"
"time"
@@ -16,17 +14,26 @@ func TestClientAddOffsetsToTxn(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}
+
+ // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+ // work is revisited.
+ if ktesting.KafkaIsAtLeast("3.0.0") {
+ t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+ }
+
topic := makeTopic()
transactionalID := makeTransactionalID()
client, shutdown := newLocalClient()
defer shutdown()
err := clientCreateTopic(client, topic, 3)
+ defer deleteTopic(t, topic)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
+ waitForTopic(ctx, t, topic)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
@@ -75,9 +82,9 @@ func TestClientAddOffsetsToTxn(t *testing.T) {
t.Fatal(err)
}
- transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
- client, shutdown = newClient(transactionCoordinator)
- defer shutdown()
+ if respc.Error != nil {
+ t.Fatal(err)
+ }
ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
TransactionalID: transactionalID,
diff --git addpartitionstotxn_test.go addpartitionstotxn_test.go
index 7e7efe8e5..34f3d6da6 100644
--- addpartitionstotxn_test.go
+++ addpartitionstotxn_test.go
@@ -14,6 +14,13 @@ func TestClientAddPartitionsToTxn(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}
+
+ // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+ // work is revisited.
+ if ktesting.KafkaIsAtLeast("3.0.0") {
+ t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+ }
+
topic1 := makeTopic()
topic2 := makeTopic()
diff --git alterclientquotas_test.go alterclientquotas_test.go
index d61c745e3..23568ad83 100644
--- alterclientquotas_test.go
+++ alterclientquotas_test.go
@@ -3,6 +3,7 @@ package kafka
import (
"context"
"testing"
+ "time"
ktesting "github.com/segmentio/kafka-go/testing"
"github.com/stretchr/testify/assert"
@@ -65,6 +66,11 @@ func TestClientAlterClientQuotas(t *testing.T) {
assert.Equal(t, expectedAlterResp, *alterResp)
+ // kraft mode is slow
+ if ktesting.KafkaIsAtLeast("3.7.0") {
+ time.Sleep(3 * time.Second)
+ }
+
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
Components: []DescribeClientQuotasRequestComponent{
{
diff --git alterpartitionreassignments_test.go alterpartitionreassignments_test.go
index 7bbce8fff..48974c7c5 100644
--- alterpartitionreassignments_test.go
+++ alterpartitionreassignments_test.go
@@ -3,6 +3,7 @@ package kafka
import (
"context"
"testing"
+ "time"
ktesting "github.com/segmentio/kafka-go/testing"
)
@@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
BrokerIDs: []int{1},
},
},
+ Timeout: 5 * time.Second,
},
)
@@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
BrokerIDs: []int{1},
},
},
+ Timeout: 5 * time.Second,
},
)
diff --git batch.go batch.go
index 19dcef8cd..eb742712d 100644
--- batch.go
+++ batch.go
@@ -46,7 +46,7 @@ func (batch *Batch) Throttle() time.Duration {
return batch.throttle
}
-// Watermark returns the current highest watermark in a partition.
+// HighWaterMark returns the current highest watermark in a partition.
func (batch *Batch) HighWaterMark() int64 {
return batch.highWaterMark
}
diff --git conn.go conn.go
index 2b51afbd5..9f9f25903 100644
--- conn.go
+++ conn.go
@@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
// DeleteTopics deletes the specified topics.
func (c *Conn) DeleteTopics(topics ...string) error {
- _, err := c.deleteTopics(deleteTopicsRequestV0{
+ _, err := c.deleteTopics(deleteTopicsRequest{
Topics: topics,
})
return err
@@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
// joinGroup attempts to join a consumer group
//
// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
-func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
- var response joinGroupResponseV1
+func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
+ version, err := c.negotiateVersion(joinGroup, v1, v2)
+ if err != nil {
+ return joinGroupResponse{}, err
+ }
- err := c.writeOperation(
+ response := joinGroupResponse{v: version}
+
+ err = c.writeOperation(
func(deadline time.Time, id int32) error {
- return c.writeRequest(joinGroup, v1, id, request)
+ return c.writeRequest(joinGroup, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
},
)
if err != nil {
- return joinGroupResponseV1{}, err
+ return joinGroupResponse{}, err
}
if response.ErrorCode != 0 {
- return joinGroupResponseV1{}, Error(response.ErrorCode)
+ return joinGroupResponse{}, Error(response.ErrorCode)
}
return response, nil
diff --git conn_test.go conn_test.go
index bdce327e0..ef5ce3071 100644
--- conn_test.go
+++ conn_test.go
@@ -13,8 +13,9 @@ import (
"testing"
"time"
- ktesting "github.com/segmentio/kafka-go/testing"
"golang.org/x/net/nettest"
+
+ ktesting "github.com/segmentio/kafka-go/testing"
)
type timeout struct{}
@@ -679,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
waitForCoordinator(t, conn, groupID)
- join := func() (joinGroup joinGroupResponseV1) {
+ join := func() (joinGroup joinGroupResponse) {
var err error
for attempt := 0; attempt < 10; attempt++ {
- joinGroup, err = conn.joinGroup(joinGroupRequestV1{
+ joinGroup, err = conn.joinGroup(joinGroupRequest{
GroupID: groupID,
SessionTimeout: int32(time.Minute / time.Millisecond),
RebalanceTimeout: int32(time.Second / time.Millisecond),
@@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
}
func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) {
- _, err := conn.joinGroup(joinGroupRequestV1{})
+ _, err := conn.joinGroup(joinGroupRequest{})
if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) {
t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err)
}
@@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)
- _, err := conn.joinGroup(joinGroupRequestV1{
+ _, err := conn.joinGroup(joinGroupRequest{
GroupID: groupID,
})
if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) {
@@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
groupID := makeGroupID()
waitForCoordinator(t, conn, groupID)
- _, err := conn.joinGroup(joinGroupRequestV1{
+ _, err := conn.joinGroup(joinGroupRequest{
GroupID: groupID,
SessionTimeout: int32(3 * time.Second / time.Millisecond),
})
diff --git consumergroup.go consumergroup.go
index b9d0a7e2e..b32f90162 100644
--- consumergroup.go
+++ consumergroup.go
@@ -527,7 +527,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
- l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
+ l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
})
return
}
@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
type coordinator interface {
io.Closer
findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
- joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
+ joinGroup(joinGroupRequest) (joinGroupResponse, error)
syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
return t.conn.findCoordinator(req)
}
-func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
// in the case of join group, the consumer group coordinator may wait up
// to rebalance timeout in order to wait for all members to join.
if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
- return joinGroupResponseV1{}, err
+ return joinGroupResponse{}, err
}
return t.conn.joinGroup(req)
}
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
- request, err := cg.makeJoinGroupRequestV1(memberID)
+ request, err := cg.makeJoinGroupRequest(memberID)
if err != nil {
return "", 0, nil, err
}
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
// request.
-func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
- request := joinGroupRequestV1{
+func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
+ request := joinGroupRequest{
GroupID: cg.config.ID,
MemberID: memberID,
SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
- return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
+ return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
// assignTopicPartitions uses the selected GroupBalancer to assign members to
// their various partitions.
-func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
+func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
cg.withLogger(func(l Logger) {
l.Printf("selected as leader for group, %s\n", cg.config.ID)
})
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
}
// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
-func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
+func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
members := make([]GroupMember, 0, len(in))
for _, item := range in {
metadata := groupMetadata{}
diff --git consumergroup_test.go consumergroup_test.go
index 0d3e290a9..dbbe4ec47 100644
--- consumergroup_test.go
+++ consumergroup_test.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"reflect"
+ "strconv"
"strings"
"sync"
"testing"
@@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{}
type mockCoordinator struct {
closeFunc func() error
findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
- joinGroupFunc func(joinGroupRequestV1) (joinGroupResponseV1, error)
+ joinGroupFunc func(joinGroupRequest) (joinGroupResponse, error)
syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error)
leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error)
heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -38,9 +39,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor
return c.findCoordinatorFunc(req)
}
-func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
if c.joinGroupFunc == nil {
- return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified")
+ return joinGroupResponse{}, errors.New("no joinGroup behavior specified")
}
return c.joinGroupFunc(req)
}
@@ -140,33 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
},
}
- newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 {
- resp := joinGroupResponseV1{
- GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
- }
+ newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse {
+ return func(v apiVersion) joinGroupResponse {
+ resp := joinGroupResponse{
+ v: v,
+ GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
+ }
- for memberID, topics := range topicsByMemberID {
- resp.Members = append(resp.Members, joinGroupResponseMemberV1{
- MemberID: memberID,
- MemberMetadata: groupMetadata{
- Topics: topics,
- }.bytes(),
- })
- }
+ for memberID, topics := range topicsByMemberID {
+ resp.Members = append(resp.Members, joinGroupResponseMember{
+ MemberID: memberID,
+ MemberMetadata: groupMetadata{
+ Topics: topics,
+ }.bytes(),
+ })
+ }
- return resp
+ return resp
+ }
}
testCases := map[string]struct {
- Members joinGroupResponseV1
+ MembersFunc func(v apiVersion) joinGroupResponse
Assignments GroupMemberAssignments
}{
"nil": {
- Members: newJoinGroupResponseV1(nil),
+ MembersFunc: newJoinGroupResponse(nil),
Assignments: GroupMemberAssignments{},
},
"one member, one topic": {
- Members: newJoinGroupResponseV1(map[string][]string{
+ MembersFunc: newJoinGroupResponse(map[string][]string{
"member-1": {"topic-1"},
}),
Assignments: GroupMemberAssignments{
@@ -176,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
},
},
"one member, two topics": {
- Members: newJoinGroupResponseV1(map[string][]string{
+ MembersFunc: newJoinGroupResponse(map[string][]string{
"member-1": {"topic-1", "topic-2"},
}),
Assignments: GroupMemberAssignments{
@@ -187,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
},
},
"two members, one topic": {
- Members: newJoinGroupResponseV1(map[string][]string{
+ MembersFunc: newJoinGroupResponse(map[string][]string{
"member-1": {"topic-1"},
"member-2": {"topic-1"},
}),
@@ -201,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
},
},
"two members, two unshared topics": {
- Members: newJoinGroupResponseV1(map[string][]string{
+ MembersFunc: newJoinGroupResponse(map[string][]string{
"member-1": {"topic-1"},
"member-2": {"topic-2"},
}),
@@ -216,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
},
}
+ supportedVersions := []apiVersion{v1, v2} // joinGroup versions
for label, tc := range testCases {
- t.Run(label, func(t *testing.T) {
- cg := ConsumerGroup{}
- cg.config.GroupBalancers = []GroupBalancer{
- RangeGroupBalancer{},
- RoundRobinGroupBalancer{},
- }
- assignments, err := cg.assignTopicPartitions(conn, tc.Members)
- if err != nil {
- t.Fatalf("bad err: %v", err)
- }
- if !reflect.DeepEqual(tc.Assignments, assignments) {
- t.Errorf("expected %v; got %v", tc.Assignments, assignments)
- }
- })
+ for _, v := range supportedVersions {
+ t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) {
+ cg := ConsumerGroup{}
+ cg.config.GroupBalancers = []GroupBalancer{
+ RangeGroupBalancer{},
+ RoundRobinGroupBalancer{},
+ }
+ assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v))
+ if err != nil {
+ t.Fatalf("bad err: %v", err)
+ }
+ if !reflect.DeepEqual(tc.Assignments, assignments) {
+ t.Errorf("expected %v; got %v", tc.Assignments, assignments)
+ }
+ })
+ }
}
}
@@ -419,8 +426,8 @@ func TestConsumerGroupErrors(t *testing.T) {
},
}, nil
}
- mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
- return joinGroupResponseV1{}, errors.New("join group failed")
+ mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+ return joinGroupResponse{}, errors.New("join group failed")
}
// NOTE : no stub for leaving the group b/c the member never joined.
},
@@ -449,8 +456,8 @@ func TestConsumerGroupErrors(t *testing.T) {
},
}, nil
}
- mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
- return joinGroupResponseV1{
+ mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+ return joinGroupResponse{
ErrorCode: int16(InvalidTopic),
}, nil
}
@@ -472,8 +479,8 @@ func TestConsumerGroupErrors(t *testing.T) {
{
scenario: "fails to join group (leader, unsupported protocol)",
prepare: func(mc *mockCoordinator) {
- mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
- return joinGroupResponseV1{
+ mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+ return joinGroupResponse{
GenerationID: 12345,
GroupProtocol: "foo",
LeaderID: "abc",
@@ -498,8 +505,8 @@ func TestConsumerGroupErrors(t *testing.T) {
{
scenario: "fails to sync group (general error)",
prepare: func(mc *mockCoordinator) {
- mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
- return joinGroupResponseV1{
+ mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+ return joinGroupResponse{
GenerationID: 12345,
GroupProtocol: "range",
LeaderID: "abc",
diff --git createtopics.go createtopics.go
index 8ad9ebf44..9c75d7aaa 100644
--- createtopics.go
+++ createtopics.go
@@ -10,7 +10,7 @@ import (
"github.com/segmentio/kafka-go/protocol/createtopics"
)
-// CreateTopicRequests represents a request sent to a kafka broker to create
+// CreateTopicsRequest represents a request sent to a kafka broker to create
// new topics.
type CreateTopicsRequest struct {
// Address of the kafka broker to send the request to.
@@ -27,7 +27,7 @@ type CreateTopicsRequest struct {
ValidateOnly bool
}
-// CreateTopicResponse represents a response from a kafka broker to a topic
+// CreateTopicsResponse represents a response from a kafka broker to a topic
// creation request.
type CreateTopicsResponse struct {
// The amount of time that the broker throttled the request.
@@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
}
// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsRequestV0 struct {
+type createTopicsRequest struct {
+ v apiVersion // v0, v1, v2
+
// Topics contains n array of single topic creation requests. Can not
// have multiple entries for the same topic.
Topics []createTopicsRequestV0Topic
@@ -270,86 +272,136 @@ type createTopicsRequestV0 struct {
// Timeout ms to wait for a topic to be completely created on the
// controller node. Values <= 0 will trigger topic creation and return immediately
Timeout int32
+
+ // If true, check that the topics can be created as specified, but don't create anything.
+ // Internal use only for Kafka 4.0 support.
+ ValidateOnly bool
}
-func (t createTopicsRequestV0) size() int32 {
- return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
+func (t createTopicsRequest) size() int32 {
+ sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
sizeofInt32(t.Timeout)
+ if t.v >= v1 {
+ sz += 1
+ }
+ return sz
}
-func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t createTopicsRequest) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
wb.writeInt32(t.Timeout)
+ if t.v >= v1 {
+ wb.writeBool(t.ValidateOnly)
+ }
}
-type createTopicsResponseV0TopicError struct {
+type createTopicsResponseTopicError struct {
+ v apiVersion
+
// Topic name
Topic string
// ErrorCode holds response error code
ErrorCode int16
+
+ // ErrorMessage holds response error message string
+ ErrorMessage string
}
-func (t createTopicsResponseV0TopicError) size() int32 {
- return sizeofString(t.Topic) +
+func (t createTopicsResponseTopicError) size() int32 {
+ sz := sizeofString(t.Topic) +
sizeofInt16(t.ErrorCode)
+ if t.v >= v1 {
+ sz += sizeofString(t.ErrorMessage)
+ }
+ return sz
}
-func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
+func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) {
wb.writeString(t.Topic)
wb.writeInt16(t.ErrorCode)
+ if t.v >= v1 {
+ wb.writeString(t.ErrorMessage)
+ }
}
-func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.Topic); err != nil {
return
}
if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
return
}
+ if t.v >= v1 {
+ if remain, err = readString(r, remain, &t.ErrorMessage); err != nil {
+ return
+ }
+ }
return
}
// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsResponseV0 struct {
- TopicErrors []createTopicsResponseV0TopicError
+type createTopicsResponse struct {
+ v apiVersion
+
+ ThrottleTime int32 // v2+
+ TopicErrors []createTopicsResponseTopicError
}
-func (t createTopicsResponseV0) size() int32 {
- return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+func (t createTopicsResponse) size() int32 {
+ sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+ if t.v >= v2 {
+ sz += sizeofInt32(t.ThrottleTime)
+ }
+ return sz
}
-func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t createTopicsResponse) writeTo(wb *writeBuffer) {
+ if t.v >= v2 {
+ wb.writeInt32(t.ThrottleTime)
+ }
wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
}
-func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- var topic createTopicsResponseV0TopicError
- if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
+ topic := createTopicsResponseTopicError{v: t.v}
+ if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil {
return
}
t.TopicErrors = append(t.TopicErrors, topic)
return
}
- if remain, err = readArrayWith(r, size, fn); err != nil {
+ remain = size
+ if t.v >= v2 {
+ if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+ return
+ }
+ }
+ if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}
return
}
-func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
- var response createTopicsResponseV0
+func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) {
+ version, err := c.negotiateVersion(createTopics, v0, v1, v2)
+ if err != nil {
+ return createTopicsResponse{}, err
+ }
+
+ request.v = version
+ response := createTopicsResponse{v: version}
- err := c.writeOperation(
+ err = c.writeOperation(
func(deadline time.Time, id int32) error {
if request.Timeout == 0 {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
- return c.writeRequest(createTopics, v0, id, request)
+ return c.writeRequest(createTopics, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
t.toCreateTopicsRequestV0Topic())
}
- _, err := c.createTopics(createTopicsRequestV0{
+ _, err := c.createTopics(createTopicsRequest{
Topics: requestV0Topics,
})
return err
diff --git createtopics_test.go createtopics_test.go
index 38819c382..119d17094 100644
--- createtopics_test.go
+++ createtopics_test.go
@@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) {
}
}
-func TestCreateTopicsResponseV0(t *testing.T) {
- item := createTopicsResponseV0{
- TopicErrors: []createTopicsResponseV0TopicError{
- {
- Topic: "topic",
- ErrorCode: 2,
+func TestCreateTopicsResponse(t *testing.T) {
+ supportedVersions := []apiVersion{v0, v1, v2}
+ for _, v := range supportedVersions {
+ item := createTopicsResponse{
+ v: v,
+ TopicErrors: []createTopicsResponseTopicError{
+ {
+ v: v,
+ Topic: "topic",
+ ErrorCode: 2,
+ },
},
- },
- }
+ }
- b := bytes.NewBuffer(nil)
- w := &writeBuffer{w: b}
- item.writeTo(w)
+ b := bytes.NewBuffer(nil)
+ w := &writeBuffer{w: b}
+ item.writeTo(w)
- var found createTopicsResponseV0
- remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- if remain != 0 {
- t.Errorf("expected 0 remain, got %v", remain)
- t.FailNow()
- }
- if !reflect.DeepEqual(item, found) {
- t.Error("expected item and found to be the same")
- t.FailNow()
+ found := createTopicsResponse{v: v}
+ remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ if remain != 0 {
+ t.Errorf("expected 0 remain, got %v", remain)
+ t.FailNow()
+ }
+ if !reflect.DeepEqual(item, found) {
+ t.Error("expected item and found to be the same")
+ t.FailNow()
+ }
}
}
diff --git deletetopics.go deletetopics.go
index d758d9fd6..ff73d553b 100644
--- deletetopics.go
+++ deletetopics.go
@@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D
}
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-type deleteTopicsRequestV0 struct {
+type deleteTopicsRequest struct {
// Topics holds the topic names
Topics []string
@@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct {
Timeout int32
}
-func (t deleteTopicsRequestV0) size() int32 {
+func (t deleteTopicsRequest) size() int32 {
return sizeofStringArray(t.Topics) +
sizeofInt32(t.Timeout)
}
-func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsRequest) writeTo(wb *writeBuffer) {
wb.writeStringArray(t.Topics)
wb.writeInt32(t.Timeout)
}
-type deleteTopicsResponseV0 struct {
+type deleteTopicsResponse struct {
+ v apiVersion // v0, v1
+
+ ThrottleTime int32
// TopicErrorCodes holds per topic error codes
TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
}
-func (t deleteTopicsResponseV0) size() int32 {
- return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+func (t deleteTopicsResponse) size() int32 {
+ sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+ if t.v >= v1 {
+ sz += sizeofInt32(t.ThrottleTime)
+ }
+ return sz
}
-func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
var item deleteTopicsResponseV0TopicErrorCode
- if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+ if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
return
}
t.TopicErrorCodes = append(t.TopicErrorCodes, item)
return
}
- if remain, err = readArrayWith(r, size, fn); err != nil {
+ remain = size
+ if t.v >= v1 {
+ if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+ return
+ }
+ }
+ if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}
return
}
-func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsResponse) writeTo(wb *writeBuffer) {
+ if t.v >= v1 {
+ wb.writeInt32(t.ThrottleTime)
+ }
wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
}
@@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
// deleteTopics deletes the specified topics.
//
// See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
- var response deleteTopicsResponseV0
- err := c.writeOperation(
+func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) {
+ version, err := c.negotiateVersion(deleteTopics, v0, v1)
+ if err != nil {
+ return deleteTopicsResponse{}, err
+ }
+
+ response := deleteTopicsResponse{
+ v: version,
+ }
+
+ err = c.writeOperation(
func(deadline time.Time, id int32) error {
if request.Timeout == 0 {
now := time.Now()
deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
}
- return c.writeRequest(deleteTopics, v0, id, request)
+ return c.writeRequest(deleteTopics, version, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
@@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
},
)
if err != nil {
- return deleteTopicsResponseV0{}, err
+ return deleteTopicsResponse{}, err
}
for _, c := range response.TopicErrorCodes {
if c.ErrorCode != 0 {
diff --git deletetopics_test.go deletetopics_test.go
index 3caffe840..4dc681831 100644
--- deletetopics_test.go
+++ deletetopics_test.go
@@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) {
}
func TestDeleteTopicsResponseV1(t *testing.T) {
- item := deleteTopicsResponseV0{
+ item := deleteTopicsResponse{
TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{
{
Topic: "a",
@@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) {
w := &writeBuffer{w: b}
item.writeTo(w)
- var found deleteTopicsResponseV0
+ var found deleteTopicsResponse
remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
if err != nil {
t.Fatal(err)
diff --git describeclientquotas.go describeclientquotas.go
index 6291dcd98..bfe712f28 100644
--- describeclientquotas.go
+++ describeclientquotas.go
@@ -35,7 +35,7 @@ type DescribeClientQuotasRequestComponent struct {
Match string
}
-// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
+// DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request.
type DescribeClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
diff --git docker-compose-241.yml docker-compose-241.yml
deleted file mode 100644
index 6feb1844b..000000000
--- docker-compose-241.yml
+++ /dev/null
@@ -1,32 +0,0 @@
-version: "3"
-services:
- kafka:
- image: wurstmeister/kafka:2.12-2.4.1
- restart: on-failure:3
- links:
- - zookeeper
- ports:
- - 9092:9092
- - 9093:9093
- environment:
- KAFKA_VERSION: '2.4.1'
- KAFKA_BROKER_ID: '1'
- KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
- KAFKA_DELETE_TOPIC_ENABLE: 'true'
- KAFKA_ADVERTISED_HOST_NAME: 'localhost'
- KAFKA_ADVERTISED_PORT: '9092'
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_MESSAGE_MAX_BYTES: '200000000'
- KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
- KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
- KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
- CUSTOM_INIT_SCRIPT: |-
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
- /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-
- zookeeper:
- image: wurstmeister/zookeeper
- ports:
- - 2181:2181
diff --git docker-compose.010.yml docker-compose.010.yml
deleted file mode 100644
index 56123f85c..000000000
--- docker-compose.010.yml
+++ /dev/null
@@ -1,29 +0,0 @@
-version: "3"
-services:
- kafka:
- image: wurstmeister/kafka:0.10.1.1
- links:
- - zookeeper
- ports:
- - 9092:9092
- - 9093:9093
- environment:
- KAFKA_BROKER_ID: '1'
- KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
- KAFKA_DELETE_TOPIC_ENABLE: 'true'
- KAFKA_ADVERTISED_HOST_NAME: 'localhost'
- KAFKA_ADVERTISED_PORT: '9092'
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_MESSAGE_MAX_BYTES: '200000000'
- KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
- KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
- KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
- CUSTOM_INIT_SCRIPT: |-
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
-
- zookeeper:
- image: wurstmeister/zookeeper
- ports:
- - 2181:2181
diff --git docker-compose.yml docker-compose.yml
index dc0c2e85e..dffb0e448 100644
--- docker-compose.yml
+++ docker-compose.yml
@@ -1,34 +1,39 @@
-version: "3"
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
services:
+ zookeeper:
+ container_name: zookeeper
+ hostname: zookeeper
+ image: bitnami/zookeeper:latest
+ ports:
+ - 2181:2181
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
kafka:
- image: wurstmeister/kafka:2.12-2.3.1
+ container_name: kafka
+ image: bitnami/kafka:3.7.0
restart: on-failure:3
links:
- - zookeeper
+ - zookeeper
ports:
- - 9092:9092
- - 9093:9093
+ - 9092:9092
+ - 9093:9093
environment:
- KAFKA_VERSION: '2.3.1'
- KAFKA_BROKER_ID: '1'
- KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
- KAFKA_DELETE_TOPIC_ENABLE: 'true'
- KAFKA_ADVERTISED_HOST_NAME: 'localhost'
- KAFKA_ADVERTISED_PORT: '9092'
- KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
- KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_MESSAGE_MAX_BYTES: '200000000'
- KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
- KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
- KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
- KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
- KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
- CUSTOM_INIT_SCRIPT: |-
- echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
- /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-
- zookeeper:
- image: wurstmeister/zookeeper
- ports:
- - 2181:2181
+ KAFKA_CFG_BROKER_ID: 1
+ KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_CFG_ADVERTISED_PORT: '9092'
+ KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+ KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+ KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+ KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint:
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/README.md b/docker_compose_versions/README.md
new file mode 100644
index 000000000..9d3b1639a
--- /dev/null
+++ docker_compose_versions/README.md
@@ -0,0 +1,152 @@
+# Bitnami Kafka
+
+This document outlines how to create a docker-compose file for a specific Bitnami Kafka version.
+
+
+## Steps to create docker-compose
+
+- Refer to [docker-hub Bitnami Kafka tags](https://hub.docker.com/r/bitnami/kafka/tags) and sort by NEWEST to locate the image preferred, for example: `2.7.0`
+- There is documentation in the (main branch)[https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md] for environment config setup information. Refer to the `Notable Changes` section.
+- Sometimes there is a need to understand how the set up is being done. To locate the appropriate Kafka release in the repo [bitnami/containers](https://github.com/bitnami/containers), go through the [kafka commit history](https://github.com/bitnami/containers/commits/main/bitnami/kafka).
+- Once a commit is located, Refer to README.md, Dockerfile, entrypoint and various init scripts to understand the environment variables to config server.properties mapping conventions. Alternatively, you can spin up the required Kafka image and refer the mapping inside the container.
+- Ensure you follow the environment variable conventions in your docker-compose. Without proper environment variables, the Kafka cluster cannot start or can start with undesired configs. For example, Since Kafka version 2.3, all server.properties docker-compose environment configs start with `KAFKA_CFG_<config_with_underscore>`
+- Older versions of Bitnami Kafka have different conventions and limited docker-compose environment variables exposed for configs needed in server.properties
+
+
+In kafka-go, for all the test cases to succeed, Kafka cluster should have following server.properties along with a relevant kafka_jaas.conf mentioned in the KAFKA_OPTS. Goal is to ensure that the docker-compose file generates below server.properties.
+
+
+server.properties
+```
+advertised.host.name=localhost
+advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
+advertised.port=9092
+auto.create.topics.enable=true
+broker.id=1
+delete.topic.enable=true
+group.initial.rebalance.delay.ms=0
+listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
+log.dirs=/kafka/kafka-logs-1d5951569d78
+log.retention.check.interval.ms=300000
+log.retention.hours=168
+log.segment.bytes=1073741824
+message.max.bytes=200000000
+num.io.threads=8
+num.network.threads=3
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+offsets.topic.replication.factor=1
+port=9092
+sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+socket.send.buffer.bytes=102400
+transaction.state.log.min.isr=1
+transaction.state.log.replication.factor=1
+zookeeper.connect=zookeeper:2181
+zookeeper.connection.timeout.ms=6000
+```
+
+
+## run docker-compose and test cases
+
+run docker-compose
+```
+# docker-compose -f ./docker_compose_versions/docker-compose-<kafka_version>.yml up -d
+```
+
+
+run test cases
+```
+# go clean -cache; KAFKA_SKIP_NETTEST=1 KAFKA_VERSION=<a.b.c> go test -race -cover ./...;
+```
+
+
+## Various Bitnami Kafka version issues observed in circleci
+
+
+### Kafka v101, v111, v201, v211 and v221
+
+
+In kafka-go repo, all the tests require sasl.enabled.mechanisms as PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 for the Kafka cluster.
+
+
+It has been observed for Kafka v101, v111, v201, v211 and v221 which are used in the circleci for build have issues with SCRAM.
+
+
+There is no way to override the config sasl.enabled.mechanisms causing Kafka cluster to start up as PLAIN.
+
+
+There has been some attempts made to override sasl.enabled.mechanisms
+- Modified entrypoint in docker-compose to append the server.properties with relevant configs sasl.enabled.mechanisms before running entrypoint.sh. This resulted in failures for Kafka v101, v111, v201, v211 and v221. Once Kafka server starts, server.properties gets appended with default value of sasl.enabled.mechanisms there by cluster to start with out PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+- Mounted a docker-compose volume for server.propeties. However, This also resulted in failures for Kafka v101, v111, v201, v211 and v221. Once Kafka server starts, server.properties gets appended with default value of sasl.enabled.mechanisms there by cluster to start with out PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+
+
+NOTE:
+- Kafka v101, v111, v201, v211 and v221 have no docker-compose files since we need SCRAM for kafka-go test cases to succeed.
+- There is no Bitnami Kafka image for v222 hence testing has been performed on v221
+
+
+### Kafka v231
+
+In Bitnami Kafka v2.3, all server.properties docker-compose environment configs start with `KAFKA_CFG_<config_with_underscore>`. However, it is not picking the custom populated kafka_jaas.conf.
+
+
+After a lot of debugging, it has been noticed that there aren't enough privileges to create the kafka_jaas.conf. Hence the environment variables below need to be added in docker-compose to generate the kafka_jaas.conf. This issue is not noticed after kafka v2.3
+
+
+```
+KAFKA_INTER_BROKER_USER: adminplain
+KAFKA_INTER_BROKER_PASSWORD: admin-secret
+KAFKA_BROKER_USER: adminplain
+KAFKA_BROKER_PASSWORD: admin-secret
+```
+
+There is a docker-compose file `docker-compose-231.yml` in the folder `kafka-go/docker_compose_versions` for reference.
+
+
+## References
+
+
+For user reference, please find the some of the older kafka versions commits from the [kafka commit history](https://github.com/bitnami/containers/commits/main/bitnami/kafka). For Kafka versions with no commit history, data is populated with the latest version available for the tag.
+
+
+### Kafka v010: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-010.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=0.10.2.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/c4240f0525916a418245c7ef46d9534a7a212c92/bitnami/kafka)
+
+
+### Kafka v011: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-011.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=0.11.0)
+- [kafka commit](https://github.com/bitnami/containers/tree/7724adf655e4ca9aac69d606d41ad329ef31eeca/bitnami/kafka)
+
+
+### Kafka v101: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=1.0.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/44cc8f4c43ead6edebd3758c8df878f4f9da82c2/bitnami/kafka)
+
+
+### Kafka v111: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=1.1.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/cb593dc98c2eb7a39f2792641e741d395dbe50e7/bitnami/kafka)
+
+
+### Kafka v201: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.0.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/9ff8763df265c87c8b59f8d7ff0cf69299d636c9/bitnami/kafka)
+
+
+### Kafka v211: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.1.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/d3a9d40afc2b7e7de53486538a63084c1a565d43/bitnami/kafka)
+
+
+### Kafka v221: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.2.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/f132ef830d1ba9b78392ec4619174b4640c276c9/bitnami/kafka)
+
+
+### Kafka v231: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-231.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.3.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/ae572036b5281456b0086345fec0bdb74f7cf3a3/bitnami/kafka)
+
diff --git a/docker_compose_versions/docker-compose-010.yml b/docker_compose_versions/docker-compose-010.yml
new file mode 100644
index 000000000..35df0b5c6
--- /dev/null
+++ docker_compose_versions/docker-compose-010.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+ zookeeper:
+ container_name: zookeeper
+ hostname: zookeeper
+ image: bitnami/zookeeper:latest
+ ports:
+ - 2181:2181
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ kafka:
+ container_name: kafka
+ image: bitnami/kafka:0.10.2.1
+ restart: on-,failure:3
+ links:
+ - zookeeper
+ ports:
+ - 9092:9092
+ - 9093:9093
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_ADVERTISED_PORT: '9092'
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+ KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+ KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+ KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint:
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_server_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram; exec /app-entrypoint.sh /start-kafka.sh
diff --git a/docker_compose_versions/docker-compose-270.yml b/docker_compose_versions/docker-compose-270.yml
new file mode 100644
index 000000000..de48cb290
--- /dev/null
+++ docker_compose_versions/docker-compose-270.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+ zookeeper:
+ container_name: zookeeper
+ hostname: zookeeper
+ image: bitnami/zookeeper:latest
+ ports:
+ - 2181:2181
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ kafka:
+ container_name: kafka
+ image: bitnami/kafka:2.7.0
+ restart: on-failure:3
+ links:
+ - zookeeper
+ ports:
+ - 9092:9092
+ - 9093:9093
+ environment:
+ KAFKA_CFG_BROKER_ID: 1
+ KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_CFG_ADVERTISED_PORT: '9092'
+ KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+ KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+ KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+ KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint:
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/docker-compose-370.yml b/docker_compose_versions/docker-compose-370.yml
new file mode 100644
index 000000000..dffb0e448
--- /dev/null
+++ docker_compose_versions/docker-compose-370.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+ zookeeper:
+ container_name: zookeeper
+ hostname: zookeeper
+ image: bitnami/zookeeper:latest
+ ports:
+ - 2181:2181
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
+ kafka:
+ container_name: kafka
+ image: bitnami/kafka:3.7.0
+ restart: on-failure:3
+ links:
+ - zookeeper
+ ports:
+ - 9092:9092
+ - 9093:9093
+ environment:
+ KAFKA_CFG_BROKER_ID: 1
+ KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_CFG_ADVERTISED_PORT: '9092'
+ KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+ KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+ KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+ KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ ALLOW_PLAINTEXT_LISTENER: yes
+ entrypoint:
+ - "/bin/bash"
+ - "-c"
+ - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml
new file mode 100644
index 000000000..b563d5e39
--- /dev/null
+++ docker_compose_versions/docker-compose-400.yml
@@ -0,0 +1,40 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+ kafka:
+ container_name: kafka
+ image: bitnami/kafka:4.0.0
+ restart: on-failure:3
+ ports:
+ - 9092:9092
+ - 9093:9093
+ environment:
+ KAFKA_CFG_NODE_ID: 1
+ KAFKA_CFG_BROKER_ID: 1
+ KAFKA_CFG_PROCESS_ROLES: broker,controller
+ KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+ KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
+ KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
+ KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
+ KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
+ KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+ KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
+ ALLOW_PLAINTEXT_LISTENER: yes
+ KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+ KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+ KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
+ KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
+ KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
+ KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
+ KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
+ KAFKA_INTER_BROKER_USER: adminscram512
+ KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
+ KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+ # Note you will need to increase this to at least 4GB of memory for the tests to pass
+ # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900
+ KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m'
+ KAFKA_JVM_OPTS: '-XX:+UseG1GC'
\ No newline at end of file
diff --git error.go error.go
index 4a7a8a278..300a1412f 100644
--- error.go
+++ error.go
@@ -329,6 +329,8 @@ func (e Error) Title() string {
return "Unsupported Compression Type"
case MemberIDRequired:
return "Member ID Required"
+ case FencedInstanceID:
+ return "Fenced Instance ID"
case EligibleLeadersNotAvailable:
return "Eligible Leader Not Available"
case ElectionNotNeeded:
@@ -538,6 +540,8 @@ func (e Error) Description() string {
return "the requesting client does not support the compression type of given partition"
case MemberIDRequired:
return "the group member needs to have a valid member id before actually entering a consumer group"
+ case FencedInstanceID:
+ return "the broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id"
case EligibleLeadersNotAvailable:
return "eligible topic partition leaders are not available"
case ElectionNotNeeded:
@@ -636,6 +640,7 @@ func coalesceErrors(errs ...error) error {
return nil
}
+// MessageTooLargeError is returned when a message is too large to fit within the allowed size.
type MessageTooLargeError struct {
Message Message
Remaining []Message
@@ -655,6 +660,10 @@ func (e MessageTooLargeError) Error() string {
return MessageSizeTooLarge.Error()
}
+func (e MessageTooLargeError) Unwrap() error {
+ return MessageSizeTooLarge
+}
+
func makeError(code int16, message string) error {
if code == 0 {
return nil
diff --git error_test.go error_test.go
index 3d461154c..8ae158661 100644
--- error_test.go
+++ error_test.go
@@ -3,6 +3,8 @@ package kafka
import (
"fmt"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func TestError(t *testing.T) {
@@ -110,4 +112,16 @@ func TestError(t *testing.T) {
t.Error("non-empty description:", s)
}
})
+
+ t.Run("MessageTooLargeError error.Is satisfaction", func(t *testing.T) {
+ err := MessageSizeTooLarge
+ msg := []Message{
+ {Key: []byte("key"), Value: []byte("value")},
+ {Key: []byte("key"), Value: make([]byte, 8)},
+ }
+ msgTooLarge := messageTooLarge(msg, 1)
+ assert.NotErrorIs(t, err, msgTooLarge)
+ assert.Contains(t, msgTooLarge.Error(), MessageSizeTooLarge.Error())
+ assert.ErrorIs(t, msgTooLarge, MessageSizeTooLarge)
+ })
}
diff --git examples/docker-compose.yaml examples/docker-compose.yaml
index 01b60fdbf..d06e64688 100644
--- examples/docker-compose.yaml
+++ examples/docker-compose.yaml
@@ -3,18 +3,27 @@ services:
zookeeper:
hostname: zookeeper
- image: wurstmeister/zookeeper:3.4.6
+ image: bitnami/zookeeper:latest
+ restart: always
expose:
- "2181"
ports:
- "2181:2181"
+ environment:
+ ALLOW_ANONYMOUS_LOGIN: yes
kafka:
- image: wurstmeister/kafka
+ hostname: kafka
+ image: bitnami/kafka:2.7.0
+ restart: always
env_file:
- kafka/kafka-variables.env
depends_on:
- zookeeper
+ expose:
+ - "9092"
+ - "8082"
+ - "8083"
ports:
- '9092:9092'
- '8082:8082'
@@ -22,6 +31,7 @@ services:
mongo-db:
image: mongo:4.0
+ restart: always
expose:
- "27017"
ports:
@@ -39,10 +49,11 @@ services:
collectionName: example_coll
kafkaURL: kafka:9092
topic: topic1
- GroupID: mongo-group
+ groupID: mongo-group
depends_on:
- kafka
- mongo-db
+ restart: always
consumer-logger:
build:
@@ -50,9 +61,10 @@ services:
environment:
kafkaURL: kafka:9092
topic: topic1
- GroupID: logger-group
+ groupID: logger-group
depends_on:
- kafka
+ restart: always
producer-random:
build:
@@ -62,6 +74,7 @@ services:
topic: topic1
depends_on:
- kafka
+ restart: always
producer-api:
build:
@@ -74,4 +87,5 @@ services:
ports:
- "8080:8080"
depends_on:
- - kafka
\ No newline at end of file
+ - kafka
+ restart: always
diff --git examples/kafka/kafka-variables.env examples/kafka/kafka-variables.env
index dc19833ac..9d6ce8668 100644
--- examples/kafka/kafka-variables.env
+++ examples/kafka/kafka-variables.env
@@ -1,22 +1,23 @@
-KAFKA_ADVERTISED_HOST_NAME=kafka
-KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
-KAFKA_CONNECT_BOOTSTRAP_SERVERS=localhost:9092
+KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
+KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+KAFKA_CFG_CONNECT_BOOTSTRAP_SERVERS=localhost:9092
-KAFKA_CONNECT_REST_PORT=8082
-KAFKA_CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
+KAFKA_CFG_CONNECT_REST_PORT=8082
+KAFKA_CFG_CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
-KAFKA_CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_OFFSET_STORAGE_FILE_FILENAME="/tmp/connect.offsets"
+KAFKA_CFG_CONNECT_OFFSET_STORAGE_FILE_FILENAME="/tmp/connect.offsets"
# Flush much faster than normal, which is useful for testing/debugging
-KAFKA_CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
+KAFKA_CFG_CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
+ALLOW_PLAINTEXT_LISTENER: yes
diff --git initproducerid_test.go initproducerid_test.go
index 061819e58..b5110a00f 100644
--- initproducerid_test.go
+++ initproducerid_test.go
@@ -15,6 +15,13 @@ func TestClientInitProducerId(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
return
}
+
+ // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+ // work is revisited.
+ if ktesting.KafkaIsAtLeast("3.0.0") {
+ t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+ }
+
client, shutdown := newLocalClient()
defer shutdown()
diff --git joingroup.go joingroup.go
index 30823a69a..f3d90a937 100644
--- joingroup.go
+++ joingroup.go
@@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
wb.writeBytes(t.ProtocolMetadata)
}
-type joinGroupRequestV1 struct {
+type joinGroupRequest struct {
// GroupID holds the unique group identifier
GroupID string
@@ -264,7 +264,7 @@ type joinGroupRequestV1 struct {
GroupProtocols []joinGroupRequestGroupProtocolV1
}
-func (t joinGroupRequestV1) size() int32 {
+func (t joinGroupRequest) size() int32 {
return sizeofString(t.GroupID) +
sizeofInt32(t.SessionTimeout) +
sizeofInt32(t.RebalanceTimeout) +
@@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 {
sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
}
-func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
+func (t joinGroupRequest) writeTo(wb *writeBuffer) {
wb.writeString(t.GroupID)
wb.writeInt32(t.SessionTimeout)
wb.writeInt32(t.RebalanceTimeout)
@@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
}
-type joinGroupResponseMemberV1 struct {
+type joinGroupResponseMember struct {
// MemberID assigned by the group coordinator
MemberID string
MemberMetadata []byte
}
-func (t joinGroupResponseMemberV1) size() int32 {
+func (t joinGroupResponseMember) size() int32 {
return sizeofString(t.MemberID) +
sizeofBytes(t.MemberMetadata)
}
-func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponseMember) writeTo(wb *writeBuffer) {
wb.writeString(t.MemberID)
wb.writeBytes(t.MemberMetadata)
}
-func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readString(r, size, &t.MemberID); err != nil {
return
}
@@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain
return
}
-type joinGroupResponseV1 struct {
+type joinGroupResponse struct {
+ v apiVersion // v1, v2
+
+ ThrottleTime int32
+
// ErrorCode holds response error code
ErrorCode int16
@@ -323,19 +327,26 @@ type joinGroupResponseV1 struct {
// MemberID assigned by the group coordinator
MemberID string
- Members []joinGroupResponseMemberV1
+ Members []joinGroupResponseMember
}
-func (t joinGroupResponseV1) size() int32 {
- return sizeofInt16(t.ErrorCode) +
+func (t joinGroupResponse) size() int32 {
+ sz := sizeofInt16(t.ErrorCode) +
sizeofInt32(t.GenerationID) +
sizeofString(t.GroupProtocol) +
sizeofString(t.LeaderID) +
sizeofString(t.MemberID) +
sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
+ if t.v >= v2 {
+ sz += sizeofInt32(t.ThrottleTime)
+ }
+ return sz
}
-func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponse) writeTo(wb *writeBuffer) {
+ if t.v >= v2 {
+ wb.writeInt32(t.ThrottleTime)
+ }
wb.writeInt16(t.ErrorCode)
wb.writeInt32(t.GenerationID)
wb.writeString(t.GroupProtocol)
@@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
}
-func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
+func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+ remain = size
+ if t.v >= v2 {
+ if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil {
+ return
+ }
+ }
+ if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
return
}
if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
@@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e
}
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- var item joinGroupResponseMemberV1
+ var item joinGroupResponseMember
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}
diff --git joingroup_test.go joingroup_test.go
index 926f5b4a6..73922d6a0 100644
--- joingroup_test.go
+++ joingroup_test.go
@@ -217,37 +217,41 @@ func TestMemberMetadata(t *testing.T) {
}
}
-func TestJoinGroupResponseV1(t *testing.T) {
- item := joinGroupResponseV1{
- ErrorCode: 2,
- GenerationID: 3,
- GroupProtocol: "a",
- LeaderID: "b",
- MemberID: "c",
- Members: []joinGroupResponseMemberV1{
- {
- MemberID: "d",
- MemberMetadata: []byte("blah"),
+func TestJoinGroupResponse(t *testing.T) {
+ supportedVersions := []apiVersion{v1, v2}
+ for _, v := range supportedVersions {
+ item := joinGroupResponse{
+ v: v,
+ ErrorCode: 2,
+ GenerationID: 3,
+ GroupProtocol: "a",
+ LeaderID: "b",
+ MemberID: "c",
+ Members: []joinGroupResponseMember{
+ {
+ MemberID: "d",
+ MemberMetadata: []byte("blah"),
+ },
},
- },
- }
+ }
- b := bytes.NewBuffer(nil)
- w := &writeBuffer{w: b}
- item.writeTo(w)
+ b := bytes.NewBuffer(nil)
+ w := &writeBuffer{w: b}
+ item.writeTo(w)
- var found joinGroupResponseV1
- remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- if remain != 0 {
- t.Errorf("expected 0 remain, got %v", remain)
- t.FailNow()
- }
- if !reflect.DeepEqual(item, found) {
- t.Error("expected item and found to be the same")
- t.FailNow()
+ found := joinGroupResponse{v: v}
+ remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ if remain != 0 {
+ t.Errorf("expected 0 remain, got %v", remain)
+ t.FailNow()
+ }
+ if !reflect.DeepEqual(item, found) {
+ t.Error("expected item and found to be the same")
+ t.FailNow()
+ }
}
}
diff --git listgroups.go listgroups.go
index 229de9352..5034b5440 100644
--- listgroups.go
+++ listgroups.go
@@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int,
fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
var item listGroupsResponseGroupV1
- if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+ if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
return
}
t.Groups = append(t.Groups, item)
diff --git listoffset.go listoffset.go
index 11c5d04b4..97779cecf 100644
--- listoffset.go
+++ listoffset.go
@@ -17,7 +17,7 @@ type OffsetRequest struct {
}
// FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
-// the parition given as argument.
+// the partition given as argument.
func FirstOffsetOf(partition int) OffsetRequest {
return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
}
diff --git metadata.go metadata.go
index 429a6a260..d151071b3 100644
--- metadata.go
+++ metadata.go
@@ -19,7 +19,7 @@ type MetadataRequest struct {
Topics []string
}
-// MetadatResponse represents a response from a kafka broker to a metadata
+// MetadataResponse represents a response from a kafka broker to a metadata
// request.
type MetadataResponse struct {
// The amount of time that the broker throttled the request.
diff --git offsetfetch.go offsetfetch.go
index b85bc5c83..ce80213f8 100644
--- offsetfetch.go
+++ offsetfetch.go
@@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
item := offsetFetchResponseV1PartitionResponse{}
- if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
+ if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
return
}
t.PartitionResponses = append(t.PartitionResponses, item)
diff --git protocol/record.go protocol/record.go
index e11af4dcc..c7987c390 100644
--- protocol/record.go
+++ protocol/record.go
@@ -191,7 +191,7 @@ func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) {
// Reconstruct the prefix that we had to read to determine the version
// of the record set from the magic byte.
//
- // Technically this may recurisvely stack readers when consuming all
+ // Technically this may recursively stack readers when consuming all
// items of the batch, which could hurt performance. In practice this
// path should not be taken tho, since the decoder would read from a
// *bufio.Reader which implements the bufferedReader interface.
@@ -304,7 +304,7 @@ type RawRecordSet struct {
// then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
//
// Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a
-// performance standpoint as it require an extra copy of the record bytes. Holding off
+// performance standpoint as it requires an extra copy of the record bytes. Holding off
// on optimizing, as this code path is only invoked in tests.
func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) {
rs := &RecordSet{}
diff --git reader.go reader.go
index cfc7cb8f5..04d90f355 100644
--- reader.go
+++ reader.go
@@ -469,9 +469,11 @@ type ReaderConfig struct {
JoinGroupBackoff time.Duration
// RetentionTime optionally sets the length of time the consumer group will be saved
- // by the broker
+ // by the broker. -1 will disable the setting and leave the
+ // retention up to the broker's offsets.retention.minutes property. By
+ // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.
//
- // Default: 24h
+ // Default: -1
//
// Only used when GroupID is set
RetentionTime time.Duration
diff --git reader_test.go reader_test.go
index f413d7429..63d81816e 100644
--- reader_test.go
+++ reader_test.go
@@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
conn.SetDeadline(time.Now().Add(10 * time.Second))
- _, err = conn.createTopics(createTopicsRequestV0{
+ _, err = conn.createTopics(createTopicsRequest{
Topics: []createTopicsRequestV0Topic{
{
Topic: topic,
@@ -309,7 +309,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
ReplicationFactor: 1,
},
},
- Timeout: milliseconds(time.Second),
+ Timeout: milliseconds(5 * time.Second),
})
if err != nil {
if !errors.Is(err, TopicAlreadyExists) {
@@ -364,8 +364,8 @@ func waitForTopic(ctx context.Context, t *testing.T, topic string) {
}
}
- t.Logf("retrying after 1s")
- time.Sleep(time.Second)
+ t.Logf("retrying after 100ms")
+ time.Sleep(100 * time.Millisecond)
continue
}
}
@@ -1374,7 +1374,9 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
// than partitions in a group.
// https://github.com/segmentio/kafka-go/issues/200
func TestRebalanceTooManyConsumers(t *testing.T) {
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
conf := ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: makeGroupID(),
@@ -1384,8 +1386,15 @@ func TestRebalanceTooManyConsumers(t *testing.T) {
// Create the first reader and wait for it to become the leader.
r1 := NewReader(conf)
+
+ // Give the reader some time to setup before reading a message
+ time.Sleep(1 * time.Second)
prepareReader(t, ctx, r1, makeTestSequence(1)...)
- r1.ReadMessage(ctx)
+
+ _, err := r1.ReadMessage(ctx)
+ if err != nil {
+ t.Fatalf("failed to read message: %v", err)
+ }
// Clear the stats from the first rebalance.
r1.Stats()
@@ -1559,17 +1568,22 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
}
}
-func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
client, shutdown := newLocalClient()
defer shutdown()
-
+ t1 := makeTopic()
+ createTopic(t, t1, 1)
+ defer deleteTopic(t, t1)
+ t2 := makeTopic()
+ createTopic(t, t2, 1)
+ defer deleteTopic(t, t2)
conf := ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: makeGroupID(),
- GroupTopics: []string{makeTopic(), makeTopic()},
+ GroupTopics: []string{t1, t2},
MaxWait: time.Second,
PartitionWatchInterval: 100 * time.Millisecond,
WatchPartitionChanges: true,
diff --git record.go record.go
index 1750889ac..8f8f7bd92 100644
--- record.go
+++ record.go
@@ -35,7 +35,7 @@ type Record = protocol.Record
// RecordReader values are not safe to use concurrently from multiple goroutines.
type RecordReader = protocol.RecordReader
-// NewRecordReade reconstructs a RecordSet which exposes the sequence of records
+// NewRecordReader reconstructs a RecordSet which exposes the sequence of records
// passed as arguments.
func NewRecordReader(records ...Record) RecordReader {
return protocol.NewRecordReader(records...)
diff --git sasl/sasl_test.go sasl/sasl_test.go
index a4101391a..57ff8b7cf 100644
--- sasl/sasl_test.go
+++ sasl/sasl_test.go
@@ -18,6 +18,11 @@ const (
)
func TestSASL(t *testing.T) {
+ scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"}
+ // kafka 4.0.0 test environment supports only different users for different scram algorithms.
+ if ktesting.KafkaIsAtLeast("4.0.0") {
+ scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"}
+ }
tests := []struct {
valid func() sasl.Mechanism
invalid func() sasl.Mechanism
@@ -39,22 +44,22 @@ func TestSASL(t *testing.T) {
},
{
valid: func() sasl.Mechanism {
- mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256")
+ mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256")
return mech
},
invalid: func() sasl.Mechanism {
- mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword")
+ mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword")
return mech
},
minKafka: "0.10.2.0",
},
{
valid: func() sasl.Mechanism {
- mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512")
+ mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512")
return mech
},
invalid: func() sasl.Mechanism {
- mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword")
+ mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword")
return mech
},
minKafka: "0.10.2.0",
diff --git a/scripts/wait-for-kafka.sh b/scripts/wait-for-kafka.sh
new file mode 100755
index 000000000..5cd336556
--- /dev/null
+++ scripts/wait-for-kafka.sh
@@ -0,0 +1,20 @@
+#/bin/bash
+
+COUNTER=0;
+echo foo | nc localhost 9092
+STATUS=$?
+ATTEMPTS=60
+until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ];
+do
+ let COUNTER=$COUNTER+1;
+ sleep 1;
+ echo "[$COUNTER] waiting for 9092 port to be open";
+ echo foo | nc localhost 9092
+ STATUS=$?
+done
+
+if [ "${COUNTER}" -gt "${ATTEMPTS}" ];
+then
+ echo "Kafka is not running, failing"
+ exit 1
+fi
\ No newline at end of file
diff --git txnoffsetcommit_test.go txnoffsetcommit_test.go
index eb3d33cbd..409584dbc 100644
--- txnoffsetcommit_test.go
+++ txnoffsetcommit_test.go
@@ -16,11 +16,19 @@ func TestClientTxnOffsetCommit(t *testing.T) {
t.Skip("Skipping test because kafka version is not high enough.")
}
+ // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+ // work is revisited.
+ if ktesting.KafkaIsAtLeast("3.0.0") {
+ t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+ }
+
transactionalID := makeTransactionalID()
topic := makeTopic()
client, shutdown := newLocalClientWithTopic(topic, 1)
defer shutdown()
+ waitForTopic(context.TODO(), t, topic)
+ defer deleteTopic(t, topic)
now := time.Now()
diff --git writer.go writer.go
index 3c7af907a..3817bf538 100644
--- writer.go
+++ writer.go
@@ -635,7 +635,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
}
}
- // We use int32 here to half the memory footprint (compared to using int
+ // We use int32 here to halve the memory footprint (compared to using int
// on 64 bits architectures). We map lists of the message indexes instead
// of the message values for the same reason, int32 is 4 bytes, vs a full
// Message value which is 100+ bytes and contains pointers and contributes
diff --git writer_test.go writer_test.go
index 6f894ecd3..bd64b668b 100644
--- writer_test.go
+++ writer_test.go
@@ -856,7 +856,7 @@ func testWriterAutoCreateTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, msg)
- if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+ if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) {
time.Sleep(time.Millisecond * 250)
continue
}
@@ -924,7 +924,7 @@ func testWriterSasl(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = w.WriteMessages(ctx, msg)
- if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+ if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) {
time.Sleep(time.Millisecond * 250)
continue
}
DescriptionThis PR updates the CI/CD setup to use bitnami/kafka Docker images instead of wurstmeister/kafka. The PR reduces the number of Kafka versions tested from 10 to 4, focusing on key versions (0.10, 2.7, 2.8, and 3.7). It also updates various test cases to accommodate differences in the new Kafka image configurations and adds compatibility for newer Kafka versions. Possible Issues
ChangesChanges.circleci/config.yml
Docker configuration
Code changes
Test improvements
sequenceDiagram
participant CI as CircleCI
participant Zookeeper as bitnami/zookeeper
participant Kafka as bitnami/kafka
participant Tests as kafka-go tests
CI->>Zookeeper: Start container
Note over Zookeeper: Configure with ALLOW_ANONYMOUS_LOGIN
CI->>Kafka: Start container with version (0.10/2.7/2.8/3.7)
Note over Kafka: Configure SASL, listeners, etc.
Kafka->>Zookeeper: Connect and register
CI->>CI: Run wait-for-kafka.sh
Note over CI: Ensures Kafka is ready
CI->>Tests: Execute tests
Tests->>Kafka: Create topics
Tests->>Kafka: Produce/consume messages
Tests->>Kafka: Test SASL auth
Tests->>Kafka: Test consumer groups
alt Transaction tests
alt Kafka < 3.0.0
Tests->>Kafka: Run transaction tests
else Kafka >= 3.0.0
Tests--xKafka: Skip transaction tests
end
end
Tests->>CI: Report results
|
This PR contains the following updates:
v0.4.47
->v0.4.48
Release Notes
segmentio/kafka-go (github.com/segmentio/kafka-go)
v0.4.48
Compare Source
What's Changed
New Contributors
Full Changelog: segmentio/kafka-go@v0.4.47...v0.4.48
Configuration
📅 Schedule: Branch creation - Tuesday through Thursday ( * * * * 2-4 ) (UTC), Automerge - At any time (no schedule defined).
🚦 Automerge: Disabled by config. Please merge this manually once you are satisfied.
♻ Rebasing: Whenever PR is behind base branch, or you tick the rebase/retry checkbox.
🔕 Ignore: Close this PR and you won't be reminded about this update again.
This PR was generated by Mend Renovate. View the repository job log.