Skip to content

fix(deps): update module github.com/segmentio/kafka-go to v0.4.48 #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

renovate[bot]
Copy link
Contributor

@renovate renovate bot commented May 21, 2025

This PR contains the following updates:

Package Change Age Adoption Passing Confidence
github.com/segmentio/kafka-go v0.4.47 -> v0.4.48 age adoption passing confidence

Release Notes

segmentio/kafka-go (github.com/segmentio/kafka-go)

v0.4.48

Compare Source

What's Changed

New Contributors

Full Changelog: segmentio/kafka-go@v0.4.47...v0.4.48


Configuration

📅 Schedule: Branch creation - Tuesday through Thursday ( * * * * 2-4 ) (UTC), Automerge - At any time (no schedule defined).

🚦 Automerge: Disabled by config. Please merge this manually once you are satisfied.

Rebasing: Whenever PR is behind base branch, or you tick the rebase/retry checkbox.

🔕 Ignore: Close this PR and you won't be reminded about this update again.


  • If you want to rebase/retry this PR, check this box

This PR was generated by Mend Renovate. View the repository job log.

Copy link

[puLL-Merge] - segmentio/kafka-go@v0.4.47..v0.4.48

Diff
diff --git .circleci/config.yml .circleci/config.yml
index 25fad54e3..a32a0d322 100644
--- .circleci/config.yml
+++ .circleci/config.yml
@@ -9,34 +9,40 @@ jobs:
 
   # The kafka 0.10 tests are maintained as a separate configuration because
   # kafka only supported plain text SASL in this version.
+  # NOTE: Bitnami does not have suport for kafka version 0.10.1.1. Hence we use 0.10.2.1
   kafka-010:
     working_directory: &working_directory /go/src/github.com/segmentio/kafka-go
-    environment:
-      KAFKA_VERSION: "0.10.1"
     docker:
     - image: circleci/golang
-    - image: wurstmeister/zookeeper
+    - image: bitnami/zookeeper:latest
       ports:
       - 2181:2181
-    - image: wurstmeister/kafka:0.10.1.1
+      environment:
+        ALLOW_ANONYMOUS_LOGIN: yes
+    - image: bitnami/kafka:0.10.2.1
       ports:
       - 9092:9092
       - 9093:9093
       environment:
-        KAFKA_BROKER_ID: '1'
-        KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
+        KAFKA_BROKER_ID: 1
         KAFKA_DELETE_TOPIC_ENABLE: 'true'
         KAFKA_ADVERTISED_HOST_NAME: 'localhost'
         KAFKA_ADVERTISED_PORT: '9092'
-        KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
+        KAFKA_ZOOKEEPER_CONNECT: localhost:2181
         KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
         KAFKA_MESSAGE_MAX_BYTES: '200000000'
         KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
         KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
-        KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
-        KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
-        CUSTOM_INIT_SCRIPT: |-
-          echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
+        KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+        KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+        KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+        KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf"
+        ALLOW_PLAINTEXT_LISTENER: yes
+      entrypoint:
+        - "/bin/bash"
+        - "-c"
+        - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_server_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram; exec /app-entrypoint.sh /start-kafka.sh
+
     steps: &steps
     - checkout
     - restore_cache:
@@ -48,6 +54,9 @@ jobs:
         key: kafka-go-mod-{{ checksum "go.sum" }}-1
         paths:
         - /go/pkg/mod
+    - run:
+        name: Wait for kafka
+        command: ./scripts/wait-for-kafka.sh
     - run:
         name: Test kafka-go
         command: go test -race -cover ./...
@@ -59,156 +68,11 @@ jobs:
         working_directory: ./sasl/aws_msk_iam
         command: go test -race -cover ./...
 
-  # Starting at version 0.11, the kafka features and configuration remained
-  # mostly stable, so we can use this CI job configuration as template for other
-  # versions as well.
-  kafka-011:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "0.11.0"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.11-0.11.0.3
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment: &environment
-        KAFKA_BROKER_ID: '1'
-        KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
-        KAFKA_DELETE_TOPIC_ENABLE: 'true'
-        KAFKA_ADVERTISED_HOST_NAME: 'localhost'
-        KAFKA_ADVERTISED_PORT: '9092'
-        KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
-        KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-        KAFKA_MESSAGE_MAX_BYTES: '200000000'
-        KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
-        KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
-        KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
-        KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
-        CUSTOM_INIT_SCRIPT: |-
-          apk add libgcc;
-          echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
-          /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-    steps: *steps
-
-  kafka-101:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "1.0.1"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.11-1.0.1
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment: *environment
-    steps: *steps
-
-  kafka-111:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "1.1.1"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.11-1.1.1
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment: *environment
-    steps: *steps
-
-  kafka-201:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "2.0.1"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.12-2.0.1
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment:
-        <<: *environment
-        KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
-        KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
-    steps: *steps
-
-  kafka-211:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "2.1.1"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.12-2.1.1
-      ports:
-      - 9092:9092
-      - 9093:9093
-      # recently, the base image for some newer versions of kafka switched from
-      # alpine to debian, which causes the "apk add ..." line to fail. The env
-      # map should be used for any versions that fail due to being part of this
-      # migration.
-      environment: &environmentDebian
-        <<: *environment
-        KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
-        KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
-        CUSTOM_INIT_SCRIPT: |-
-          echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
-          /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-    steps: *steps
-
-  kafka-222:
+  # NOTE: Bitnami does not have suport for kafka version 2.7.1. Hence we use 2.7.0
+  kafka-270:
     working_directory: *working_directory
     environment:
-      KAFKA_VERSION: "2.2.2"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.12-2.2.2
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment:
-        <<: *environmentDebian
-    steps: *steps
-
-  kafka-231:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "2.3.1"
-    docker:
-    - image: circleci/golang
-    - image: wurstmeister/zookeeper
-      ports:
-      - 2181:2181
-    - image: wurstmeister/kafka:2.12-2.3.1
-      ports:
-      - 9092:9092
-      - 9093:9093
-      environment:
-        <<: *environmentDebian
-    steps: *steps
-
-  kafka-241:
-    working_directory: *working_directory
-    environment:
-      KAFKA_VERSION: "2.4.1"
+      KAFKA_VERSION: "2.7.0"
 
       # Need to skip nettest to avoid these kinds of errors:
       #  --- FAIL: TestConn/nettest (17.56s)
@@ -222,21 +86,40 @@ jobs:
       KAFKA_SKIP_NETTEST: "1"
     docker:
     - image: circleci/golang
-    - image: wurstmeister/zookeeper
+    - image: bitnami/zookeeper:latest
       ports:
       - 2181:2181
-    - image: wurstmeister/kafka:2.12-2.4.1
+      environment:
+        ALLOW_ANONYMOUS_LOGIN: yes
+    - image: bitnami/kafka:2.7.0
       ports:
       - 9092:9092
       - 9093:9093
-      environment:
-        <<: *environmentDebian
+      environment: &environment
+        KAFKA_CFG_BROKER_ID: 1
+        KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+        KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+        KAFKA_CFG_ADVERTISED_PORT: '9092'
+        KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181
+        KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+        KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+        KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+        KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+        KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+        KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+        KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+        KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+        ALLOW_PLAINTEXT_LISTENER: yes
+      entrypoint: &entrypoint
+        - "/bin/bash"
+        - "-c"
+        - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
     steps: *steps
 
-  kafka-260:
+  kafka-281:
     working_directory: *working_directory
     environment:
-      KAFKA_VERSION: "2.6.0"
+      KAFKA_VERSION: "2.8.1"
 
       # Need to skip nettest to avoid these kinds of errors:
       #  --- FAIL: TestConn/nettest (17.56s)
@@ -250,23 +133,23 @@ jobs:
       KAFKA_SKIP_NETTEST: "1"
     docker:
     - image: circleci/golang
-    - image: wurstmeister/zookeeper
+    - image: bitnami/zookeeper:latest
       ports:
       - 2181:2181
-    - image: wurstmeister/kafka:2.13-2.6.0
+      environment:
+        ALLOW_ANONYMOUS_LOGIN: yes
+    - image: bitnami/kafka:2.8.1
       ports:
       - 9092:9092
       - 9093:9093
-      environment:
-        <<: *environment
-        KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
-        KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      environment: *environment
+      entrypoint: *entrypoint
     steps: *steps
 
-  kafka-271:
+  kafka-370:
     working_directory: *working_directory
     environment:
-      KAFKA_VERSION: "2.7.1"
+      KAFKA_VERSION: "3.7.0"
 
       # Need to skip nettest to avoid these kinds of errors:
       #  --- FAIL: TestConn/nettest (17.56s)
@@ -280,32 +163,80 @@ jobs:
       KAFKA_SKIP_NETTEST: "1"
     docker:
     - image: circleci/golang
-    - image: wurstmeister/zookeeper
+    - image: bitnami/zookeeper:latest
       ports:
       - 2181:2181
-    - image: wurstmeister/kafka:2.13-2.7.1
+      environment:
+        ALLOW_ANONYMOUS_LOGIN: yes
+    - image: bitnami/kafka:3.7.0
       ports:
-      - 9092:9092
-      - 9093:9093
+        - 9092:9092
+        - 9093:9093
       environment:
         <<: *environment
-        KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
-        KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+        KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+      entrypoint: *entrypoint
     steps: *steps
 
+  # NOTE: this fails quite often due to Java heap errors from Kafka.
+  # Once we figure out how to fix that, we can re-enable this.
+  # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900
+  # kafka-400:
+  #   working_directory: *working_directory
+  #   environment:
+  #     KAFKA_VERSION: "4.0.0"
+
+  #     # Need to skip nettest to avoid these kinds of errors:
+  #     #  --- FAIL: TestConn/nettest (17.56s)
+  #     #    --- FAIL: TestConn/nettest/PingPong (7.40s)
+  #     #      conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
+  #     #      conntest.go:118: mismatching value: got 77, want 78
+  #     #      conntest.go:118: mismatching value: got 78, want 79
+  #     # ...
+  #     #
+  #     # TODO: Figure out why these are happening and fix them (they don't appear to be new).
+  #     KAFKA_SKIP_NETTEST: "1"
+  #   docker:
+  #   - image: circleci/golang
+  #   - image: bitnami/kafka:4.0.0
+  #     ports:
+  #       - 9092:9092
+  #       - 9093:9093
+  #     environment:
+  #       KAFKA_CFG_NODE_ID: 1
+  #       KAFKA_CFG_BROKER_ID: 1
+  #       KAFKA_CFG_PROCESS_ROLES: broker,controller
+  #       KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+  #       KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+  #       KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
+  #       KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
+  #       KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
+  #       KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
+  #       KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+  #       KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
+  #       ALLOW_PLAINTEXT_LISTENER: yes
+  #       KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+  #       KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+  #       KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+  #       KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+  #       KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+  #       KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
+  #       KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
+  #       KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
+  #       KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
+  #       KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
+  #       KAFKA_INTER_BROKER_USER: adminscram512
+  #       KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
+  #       KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+  #   steps: *steps
+
 workflows:
   version: 2
   run:
     jobs:
     - lint
     - kafka-010
-    - kafka-011
-    - kafka-101
-    - kafka-111
-    - kafka-201
-    - kafka-211
-    - kafka-222
-    - kafka-231
-    - kafka-241
-    - kafka-260
-    - kafka-271
+    - kafka-270
+    - kafka-281
+    - kafka-370 
+    #- kafka-400
diff --git Makefile Makefile
index e2374f2e3..47f45c950 100644
--- Makefile
+++ Makefile
@@ -4,4 +4,4 @@ test:
 	go test -race -cover ./...
 
 docker:
-	docker-compose up -d
+	docker compose up -d
diff --git README.md README.md
index e17878825..4e6cd1229 100644
--- README.md
+++ README.md
@@ -108,7 +108,7 @@ if err := conn.Close(); err != nil {
 ```
 
 ### To Create Topics
-By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_AUTO_CREATE_TOPICS_ENABLE='true'` in the wurstmeister/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
+By default kafka has the `auto.create.topics.enable='true'` (`KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'` in the bitnami/kafka kafka docker image). If this value is set to `'true'` then topics will be created as a side effect of `kafka.DialLeader` like so:
 ```go
 // to create topics when auto.create.topics.enable='true'
 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0)
@@ -797,3 +797,8 @@ KAFKA_VERSION=2.3.1 \
   KAFKA_SKIP_NETTEST=1 \
   go test -race ./...
 ```
+
+(or) to clean up the cached test results and run tests:
+```
+go clean -cache && make test
+```
diff --git addoffsetstotxn_test.go addoffsetstotxn_test.go
index 56d6ee46a..86fd3d711 100644
--- addoffsetstotxn_test.go
+++ addoffsetstotxn_test.go
@@ -3,9 +3,7 @@ package kafka
 import (
 	"context"
 	"log"
-	"net"
 	"os"
-	"strconv"
 	"testing"
 	"time"
 
@@ -16,17 +14,26 @@ func TestClientAddOffsetsToTxn(t *testing.T) {
 	if !ktesting.KafkaIsAtLeast("0.11.0") {
 		t.Skip("Skipping test because kafka version is not high enough.")
 	}
+
+	// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+	// work is revisited.
+	if ktesting.KafkaIsAtLeast("3.0.0") {
+		t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+	}
+
 	topic := makeTopic()
 	transactionalID := makeTransactionalID()
 	client, shutdown := newLocalClient()
 	defer shutdown()
 
 	err := clientCreateTopic(client, topic, 3)
+	defer deleteTopic(t, topic)
 	if err != nil {
 		t.Fatal(err)
 	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
+	waitForTopic(ctx, t, topic)
 	defer cancel()
 	respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
 		Addr:    client.Addr,
@@ -75,9 +82,9 @@ func TestClientAddOffsetsToTxn(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
-	client, shutdown = newClient(transactionCoordinator)
-	defer shutdown()
+	if respc.Error != nil {
+		t.Fatal(err)
+	}
 
 	ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
 		TransactionalID:      transactionalID,
diff --git addpartitionstotxn_test.go addpartitionstotxn_test.go
index 7e7efe8e5..34f3d6da6 100644
--- addpartitionstotxn_test.go
+++ addpartitionstotxn_test.go
@@ -14,6 +14,13 @@ func TestClientAddPartitionsToTxn(t *testing.T) {
 	if !ktesting.KafkaIsAtLeast("0.11.0") {
 		t.Skip("Skipping test because kafka version is not high enough.")
 	}
+
+	// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+	// work is revisited.
+	if ktesting.KafkaIsAtLeast("3.0.0") {
+		t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+	}
+
 	topic1 := makeTopic()
 	topic2 := makeTopic()
 
diff --git alterclientquotas_test.go alterclientquotas_test.go
index d61c745e3..23568ad83 100644
--- alterclientquotas_test.go
+++ alterclientquotas_test.go
@@ -3,6 +3,7 @@ package kafka
 import (
 	"context"
 	"testing"
+	"time"
 
 	ktesting "github.com/segmentio/kafka-go/testing"
 	"github.com/stretchr/testify/assert"
@@ -65,6 +66,11 @@ func TestClientAlterClientQuotas(t *testing.T) {
 
 	assert.Equal(t, expectedAlterResp, *alterResp)
 
+	// kraft mode is slow
+	if ktesting.KafkaIsAtLeast("3.7.0") {
+		time.Sleep(3 * time.Second)
+	}
+
 	describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{
 		Components: []DescribeClientQuotasRequestComponent{
 			{
diff --git alterpartitionreassignments_test.go alterpartitionreassignments_test.go
index 7bbce8fff..48974c7c5 100644
--- alterpartitionreassignments_test.go
+++ alterpartitionreassignments_test.go
@@ -3,6 +3,7 @@ package kafka
 import (
 	"context"
 	"testing"
+	"time"
 
 	ktesting "github.com/segmentio/kafka-go/testing"
 )
@@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) {
 					BrokerIDs:   []int{1},
 				},
 			},
+			Timeout: 5 * time.Second,
 		},
 	)
 
@@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) {
 					BrokerIDs:   []int{1},
 				},
 			},
+			Timeout: 5 * time.Second,
 		},
 	)
 
diff --git batch.go batch.go
index 19dcef8cd..eb742712d 100644
--- batch.go
+++ batch.go
@@ -46,7 +46,7 @@ func (batch *Batch) Throttle() time.Duration {
 	return batch.throttle
 }
 
-// Watermark returns the current highest watermark in a partition.
+// HighWaterMark returns the current highest watermark in a partition.
 func (batch *Batch) HighWaterMark() int64 {
 	return batch.highWaterMark
 }
diff --git conn.go conn.go
index 2b51afbd5..9f9f25903 100644
--- conn.go
+++ conn.go
@@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) {
 
 // DeleteTopics deletes the specified topics.
 func (c *Conn) DeleteTopics(topics ...string) error {
-	_, err := c.deleteTopics(deleteTopicsRequestV0{
+	_, err := c.deleteTopics(deleteTopicsRequest{
 		Topics: topics,
 	})
 	return err
@@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error
 // joinGroup attempts to join a consumer group
 //
 // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
-func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
-	var response joinGroupResponseV1
+func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) {
+	version, err := c.negotiateVersion(joinGroup, v1, v2)
+	if err != nil {
+		return joinGroupResponse{}, err
+	}
 
-	err := c.writeOperation(
+	response := joinGroupResponse{v: version}
+
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
-			return c.writeRequest(joinGroup, v1, id, request)
+			return c.writeRequest(joinGroup, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
 		},
 	)
 	if err != nil {
-		return joinGroupResponseV1{}, err
+		return joinGroupResponse{}, err
 	}
 	if response.ErrorCode != 0 {
-		return joinGroupResponseV1{}, Error(response.ErrorCode)
+		return joinGroupResponse{}, Error(response.ErrorCode)
 	}
 
 	return response, nil
diff --git conn_test.go conn_test.go
index bdce327e0..ef5ce3071 100644
--- conn_test.go
+++ conn_test.go
@@ -13,8 +13,9 @@ import (
 	"testing"
 	"time"
 
-	ktesting "github.com/segmentio/kafka-go/testing"
 	"golang.org/x/net/nettest"
+
+	ktesting "github.com/segmentio/kafka-go/testing"
 )
 
 type timeout struct{}
@@ -679,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
 func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) {
 	waitForCoordinator(t, conn, groupID)
 
-	join := func() (joinGroup joinGroupResponseV1) {
+	join := func() (joinGroup joinGroupResponse) {
 		var err error
 		for attempt := 0; attempt < 10; attempt++ {
-			joinGroup, err = conn.joinGroup(joinGroupRequestV1{
+			joinGroup, err = conn.joinGroup(joinGroupRequest{
 				GroupID:          groupID,
 				SessionTimeout:   int32(time.Minute / time.Millisecond),
 				RebalanceTimeout: int32(time.Second / time.Millisecond),
@@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
 }
 
 func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) {
-	_, err := conn.joinGroup(joinGroupRequestV1{})
+	_, err := conn.joinGroup(joinGroupRequest{})
 	if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) {
 		t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err)
 	}
@@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) {
 	groupID := makeGroupID()
 	waitForCoordinator(t, conn, groupID)
 
-	_, err := conn.joinGroup(joinGroupRequestV1{
+	_, err := conn.joinGroup(joinGroupRequest{
 		GroupID: groupID,
 	})
 	if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) {
@@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) {
 	groupID := makeGroupID()
 	waitForCoordinator(t, conn, groupID)
 
-	_, err := conn.joinGroup(joinGroupRequestV1{
+	_, err := conn.joinGroup(joinGroupRequest{
 		GroupID:        groupID,
 		SessionTimeout: int32(3 * time.Second / time.Millisecond),
 	})
diff --git consumergroup.go consumergroup.go
index b9d0a7e2e..b32f90162 100644
--- consumergroup.go
+++ consumergroup.go
@@ -527,7 +527,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
 				case err == nil, errors.Is(err, UnknownTopicOrPartition):
 					if len(ops) != oParts {
 						g.log(func(l Logger) {
-							l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
+							l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
 						})
 						return
 					}
@@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
 type coordinator interface {
 	io.Closer
 	findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
-	joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error)
+	joinGroup(joinGroupRequest) (joinGroupResponse, error)
 	syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error)
 	leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error)
 	heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find
 	return t.conn.findCoordinator(req)
 }
 
-func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
 	// in the case of join group, the consumer group coordinator may wait up
 	// to rebalance timeout in order to wait for all members to join.
 	if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil {
-		return joinGroupResponseV1{}, err
+		return joinGroupResponse{}, err
 	}
 	return t.conn.joinGroup(req)
 }
@@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
 //  * InvalidSessionTimeout:
 //  * GroupAuthorizationFailed:
 func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
-	request, err := cg.makeJoinGroupRequestV1(memberID)
+	request, err := cg.makeJoinGroupRequest(memberID)
 	if err != nil {
 		return "", 0, nil, err
 	}
@@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
 
 // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
 // request.
-func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) {
-	request := joinGroupRequestV1{
+func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) {
+	request := joinGroupRequest{
 		GroupID:          cg.config.ID,
 		MemberID:         memberID,
 		SessionTimeout:   int32(cg.config.SessionTimeout / time.Millisecond),
@@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
 	for _, balancer := range cg.config.GroupBalancers {
 		userData, err := balancer.UserData()
 		if err != nil {
-			return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
+			return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
 		}
 		request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
 			ProtocolName: balancer.ProtocolName(),
@@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
 
 // assignTopicPartitions uses the selected GroupBalancer to assign members to
 // their various partitions.
-func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) {
+func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
 	cg.withLogger(func(l Logger) {
 		l.Printf("selected as leader for group, %s\n", cg.config.ID)
 	})
@@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
 }
 
 // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
-func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) {
+func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {
 	members := make([]GroupMember, 0, len(in))
 	for _, item := range in {
 		metadata := groupMetadata{}
diff --git consumergroup_test.go consumergroup_test.go
index 0d3e290a9..dbbe4ec47 100644
--- consumergroup_test.go
+++ consumergroup_test.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"errors"
 	"reflect"
+	"strconv"
 	"strings"
 	"sync"
 	"testing"
@@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{}
 type mockCoordinator struct {
 	closeFunc           func() error
 	findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error)
-	joinGroupFunc       func(joinGroupRequestV1) (joinGroupResponseV1, error)
+	joinGroupFunc       func(joinGroupRequest) (joinGroupResponse, error)
 	syncGroupFunc       func(syncGroupRequestV0) (syncGroupResponseV0, error)
 	leaveGroupFunc      func(leaveGroupRequestV0) (leaveGroupResponseV0, error)
 	heartbeatFunc       func(heartbeatRequestV0) (heartbeatResponseV0, error)
@@ -38,9 +39,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor
 	return c.findCoordinatorFunc(req)
 }
 
-func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) {
+func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) {
 	if c.joinGroupFunc == nil {
-		return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified")
+		return joinGroupResponse{}, errors.New("no joinGroup behavior specified")
 	}
 	return c.joinGroupFunc(req)
 }
@@ -140,33 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 		},
 	}
 
-	newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 {
-		resp := joinGroupResponseV1{
-			GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
-		}
+	newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse {
+		return func(v apiVersion) joinGroupResponse {
+			resp := joinGroupResponse{
+				v:             v,
+				GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(),
+			}
 
-		for memberID, topics := range topicsByMemberID {
-			resp.Members = append(resp.Members, joinGroupResponseMemberV1{
-				MemberID: memberID,
-				MemberMetadata: groupMetadata{
-					Topics: topics,
-				}.bytes(),
-			})
-		}
+			for memberID, topics := range topicsByMemberID {
+				resp.Members = append(resp.Members, joinGroupResponseMember{
+					MemberID: memberID,
+					MemberMetadata: groupMetadata{
+						Topics: topics,
+					}.bytes(),
+				})
+			}
 
-		return resp
+			return resp
+		}
 	}
 
 	testCases := map[string]struct {
-		Members     joinGroupResponseV1
+		MembersFunc func(v apiVersion) joinGroupResponse
 		Assignments GroupMemberAssignments
 	}{
 		"nil": {
-			Members:     newJoinGroupResponseV1(nil),
+			MembersFunc: newJoinGroupResponse(nil),
 			Assignments: GroupMemberAssignments{},
 		},
 		"one member, one topic": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 			}),
 			Assignments: GroupMemberAssignments{
@@ -176,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"one member, two topics": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1", "topic-2"},
 			}),
 			Assignments: GroupMemberAssignments{
@@ -187,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"two members, one topic": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 				"member-2": {"topic-1"},
 			}),
@@ -201,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 			},
 		},
 		"two members, two unshared topics": {
-			Members: newJoinGroupResponseV1(map[string][]string{
+			MembersFunc: newJoinGroupResponse(map[string][]string{
 				"member-1": {"topic-1"},
 				"member-2": {"topic-2"},
 			}),
@@ -216,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) {
 		},
 	}
 
+	supportedVersions := []apiVersion{v1, v2} // joinGroup versions
 	for label, tc := range testCases {
-		t.Run(label, func(t *testing.T) {
-			cg := ConsumerGroup{}
-			cg.config.GroupBalancers = []GroupBalancer{
-				RangeGroupBalancer{},
-				RoundRobinGroupBalancer{},
-			}
-			assignments, err := cg.assignTopicPartitions(conn, tc.Members)
-			if err != nil {
-				t.Fatalf("bad err: %v", err)
-			}
-			if !reflect.DeepEqual(tc.Assignments, assignments) {
-				t.Errorf("expected %v; got %v", tc.Assignments, assignments)
-			}
-		})
+		for _, v := range supportedVersions {
+			t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) {
+				cg := ConsumerGroup{}
+				cg.config.GroupBalancers = []GroupBalancer{
+					RangeGroupBalancer{},
+					RoundRobinGroupBalancer{},
+				}
+				assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v))
+				if err != nil {
+					t.Fatalf("bad err: %v", err)
+				}
+				if !reflect.DeepEqual(tc.Assignments, assignments) {
+					t.Errorf("expected %v; got %v", tc.Assignments, assignments)
+				}
+			})
+		}
 	}
 }
 
@@ -419,8 +426,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 						},
 					}, nil
 				}
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{}, errors.New("join group failed")
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{}, errors.New("join group failed")
 				}
 				// NOTE : no stub for leaving the group b/c the member never joined.
 			},
@@ -449,8 +456,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 						},
 					}, nil
 				}
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						ErrorCode: int16(InvalidTopic),
 					}, nil
 				}
@@ -472,8 +479,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 		{
 			scenario: "fails to join group (leader, unsupported protocol)",
 			prepare: func(mc *mockCoordinator) {
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						GenerationID:  12345,
 						GroupProtocol: "foo",
 						LeaderID:      "abc",
@@ -498,8 +505,8 @@ func TestConsumerGroupErrors(t *testing.T) {
 		{
 			scenario: "fails to sync group (general error)",
 			prepare: func(mc *mockCoordinator) {
-				mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) {
-					return joinGroupResponseV1{
+				mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) {
+					return joinGroupResponse{
 						GenerationID:  12345,
 						GroupProtocol: "range",
 						LeaderID:      "abc",
diff --git createtopics.go createtopics.go
index 8ad9ebf44..9c75d7aaa 100644
--- createtopics.go
+++ createtopics.go
@@ -10,7 +10,7 @@ import (
 	"github.com/segmentio/kafka-go/protocol/createtopics"
 )
 
-// CreateTopicRequests represents a request sent to a kafka broker to create
+// CreateTopicsRequest represents a request sent to a kafka broker to create
 // new topics.
 type CreateTopicsRequest struct {
 	// Address of the kafka broker to send the request to.
@@ -27,7 +27,7 @@ type CreateTopicsRequest struct {
 	ValidateOnly bool
 }
 
-// CreateTopicResponse represents a response from a kafka broker to a topic
+// CreateTopicsResponse represents a response from a kafka broker to a topic
 // creation request.
 type CreateTopicsResponse struct {
 	// The amount of time that the broker throttled the request.
@@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsRequestV0 struct {
+type createTopicsRequest struct {
+	v apiVersion // v0, v1, v2
+
 	// Topics contains n array of single topic creation requests. Can not
 	// have multiple entries for the same topic.
 	Topics []createTopicsRequestV0Topic
@@ -270,86 +272,136 @@ type createTopicsRequestV0 struct {
 	// Timeout ms to wait for a topic to be completely created on the
 	// controller node. Values <= 0 will trigger topic creation and return immediately
 	Timeout int32
+
+	// If true, check that the topics can be created as specified, but don't create anything.
+	// Internal use only for Kafka 4.0 support.
+	ValidateOnly bool
 }
 
-func (t createTopicsRequestV0) size() int32 {
-	return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
+func (t createTopicsRequest) size() int32 {
+	sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
 		sizeofInt32(t.Timeout)
+	if t.v >= v1 {
+		sz += 1
+	}
+	return sz
 }
 
-func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t createTopicsRequest) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
 	wb.writeInt32(t.Timeout)
+	if t.v >= v1 {
+		wb.writeBool(t.ValidateOnly)
+	}
 }
 
-type createTopicsResponseV0TopicError struct {
+type createTopicsResponseTopicError struct {
+	v apiVersion
+
 	// Topic name
 	Topic string
 
 	// ErrorCode holds response error code
 	ErrorCode int16
+
+	// ErrorMessage holds response error message string
+	ErrorMessage string
 }
 
-func (t createTopicsResponseV0TopicError) size() int32 {
-	return sizeofString(t.Topic) +
+func (t createTopicsResponseTopicError) size() int32 {
+	sz := sizeofString(t.Topic) +
 		sizeofInt16(t.ErrorCode)
+	if t.v >= v1 {
+		sz += sizeofString(t.ErrorMessage)
+	}
+	return sz
 }
 
-func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
+func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) {
 	wb.writeString(t.Topic)
 	wb.writeInt16(t.ErrorCode)
+	if t.v >= v1 {
+		wb.writeString(t.ErrorMessage)
+	}
 }
 
-func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	if remain, err = readString(r, size, &t.Topic); err != nil {
 		return
 	}
 	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
 		return
 	}
+	if t.v >= v1 {
+		if remain, err = readString(r, remain, &t.ErrorMessage); err != nil {
+			return
+		}
+	}
 	return
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
-type createTopicsResponseV0 struct {
-	TopicErrors []createTopicsResponseV0TopicError
+type createTopicsResponse struct {
+	v apiVersion
+
+	ThrottleTime int32 // v2+
+	TopicErrors  []createTopicsResponseTopicError
 }
 
-func (t createTopicsResponseV0) size() int32 {
-	return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+func (t createTopicsResponse) size() int32 {
+	sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
+	if t.v >= v2 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t createTopicsResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v2 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
 }
 
-func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
-		var topic createTopicsResponseV0TopicError
-		if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
+		topic := createTopicsResponseTopicError{v: t.v}
+		if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil {
 			return
 		}
 		t.TopicErrors = append(t.TopicErrors, topic)
 		return
 	}
-	if remain, err = readArrayWith(r, size, fn); err != nil {
+	remain = size
+	if t.v >= v2 {
+		if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readArrayWith(r, remain, fn); err != nil {
 		return
 	}
 
 	return
 }
 
-func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
-	var response createTopicsResponseV0
+func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) {
+	version, err := c.negotiateVersion(createTopics, v0, v1, v2)
+	if err != nil {
+		return createTopicsResponse{}, err
+	}
+
+	request.v = version
+	response := createTopicsResponse{v: version}
 
-	err := c.writeOperation(
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
 			if request.Timeout == 0 {
 				now := time.Now()
 				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
 				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
 			}
-			return c.writeRequest(createTopics, v0, id, request)
+			return c.writeRequest(createTopics, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -383,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
 			t.toCreateTopicsRequestV0Topic())
 	}
 
-	_, err := c.createTopics(createTopicsRequestV0{
+	_, err := c.createTopics(createTopicsRequest{
 		Topics: requestV0Topics,
 	})
 	return err
diff --git createtopics_test.go createtopics_test.go
index 38819c382..119d17094 100644
--- createtopics_test.go
+++ createtopics_test.go
@@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) {
 	}
 }
 
-func TestCreateTopicsResponseV0(t *testing.T) {
-	item := createTopicsResponseV0{
-		TopicErrors: []createTopicsResponseV0TopicError{
-			{
-				Topic:     "topic",
-				ErrorCode: 2,
+func TestCreateTopicsResponse(t *testing.T) {
+	supportedVersions := []apiVersion{v0, v1, v2}
+	for _, v := range supportedVersions {
+		item := createTopicsResponse{
+			v: v,
+			TopicErrors: []createTopicsResponseTopicError{
+				{
+					v:         v,
+					Topic:     "topic",
+					ErrorCode: 2,
+				},
 			},
-		},
-	}
+		}
 
-	b := bytes.NewBuffer(nil)
-	w := &writeBuffer{w: b}
-	item.writeTo(w)
+		b := bytes.NewBuffer(nil)
+		w := &writeBuffer{w: b}
+		item.writeTo(w)
 
-	var found createTopicsResponseV0
-	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	if remain != 0 {
-		t.Errorf("expected 0 remain, got %v", remain)
-		t.FailNow()
-	}
-	if !reflect.DeepEqual(item, found) {
-		t.Error("expected item and found to be the same")
-		t.FailNow()
+		found := createTopicsResponse{v: v}
+		remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		if remain != 0 {
+			t.Errorf("expected 0 remain, got %v", remain)
+			t.FailNow()
+		}
+		if !reflect.DeepEqual(item, found) {
+			t.Error("expected item and found to be the same")
+			t.FailNow()
+		}
 	}
 }
diff --git deletetopics.go deletetopics.go
index d758d9fd6..ff73d553b 100644
--- deletetopics.go
+++ deletetopics.go
@@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D
 }
 
 // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-type deleteTopicsRequestV0 struct {
+type deleteTopicsRequest struct {
 	// Topics holds the topic names
 	Topics []string
 
@@ -77,41 +77,57 @@ type deleteTopicsRequestV0 struct {
 	Timeout int32
 }
 
-func (t deleteTopicsRequestV0) size() int32 {
+func (t deleteTopicsRequest) size() int32 {
 	return sizeofStringArray(t.Topics) +
 		sizeofInt32(t.Timeout)
 }
 
-func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsRequest) writeTo(wb *writeBuffer) {
 	wb.writeStringArray(t.Topics)
 	wb.writeInt32(t.Timeout)
 }
 
-type deleteTopicsResponseV0 struct {
+type deleteTopicsResponse struct {
+	v apiVersion // v0, v1
+
+	ThrottleTime int32
 	// TopicErrorCodes holds per topic error codes
 	TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
 }
 
-func (t deleteTopicsResponseV0) size() int32 {
-	return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+func (t deleteTopicsResponse) size() int32 {
+	sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
+	if t.v >= v1 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
 		var item deleteTopicsResponseV0TopicErrorCode
-		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
 			return
 		}
 		t.TopicErrorCodes = append(t.TopicErrorCodes, item)
 		return
 	}
-	if remain, err = readArrayWith(r, size, fn); err != nil {
+	remain = size
+	if t.v >= v1 {
+		if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readArrayWith(r, remain, fn); err != nil {
 		return
 	}
 	return
 }
 
-func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
+func (t deleteTopicsResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v1 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
 }
 
@@ -146,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
 // deleteTopics deletes the specified topics.
 //
 // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
-func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
-	var response deleteTopicsResponseV0
-	err := c.writeOperation(
+func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) {
+	version, err := c.negotiateVersion(deleteTopics, v0, v1)
+	if err != nil {
+		return deleteTopicsResponse{}, err
+	}
+
+	response := deleteTopicsResponse{
+		v: version,
+	}
+
+	err = c.writeOperation(
 		func(deadline time.Time, id int32) error {
 			if request.Timeout == 0 {
 				now := time.Now()
 				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
 				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
 			}
-			return c.writeRequest(deleteTopics, v0, id, request)
+			return c.writeRequest(deleteTopics, version, id, request)
 		},
 		func(deadline time.Time, size int) error {
 			return expectZeroSize(func() (remain int, err error) {
@@ -164,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse
 		},
 	)
 	if err != nil {
-		return deleteTopicsResponseV0{}, err
+		return deleteTopicsResponse{}, err
 	}
 	for _, c := range response.TopicErrorCodes {
 		if c.ErrorCode != 0 {
diff --git deletetopics_test.go deletetopics_test.go
index 3caffe840..4dc681831 100644
--- deletetopics_test.go
+++ deletetopics_test.go
@@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) {
 }
 
 func TestDeleteTopicsResponseV1(t *testing.T) {
-	item := deleteTopicsResponseV0{
+	item := deleteTopicsResponse{
 		TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{
 			{
 				Topic:     "a",
@@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) {
 	w := &writeBuffer{w: b}
 	item.writeTo(w)
 
-	var found deleteTopicsResponseV0
+	var found deleteTopicsResponse
 	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
 	if err != nil {
 		t.Fatal(err)
diff --git describeclientquotas.go describeclientquotas.go
index 6291dcd98..bfe712f28 100644
--- describeclientquotas.go
+++ describeclientquotas.go
@@ -35,7 +35,7 @@ type DescribeClientQuotasRequestComponent struct {
 	Match string
 }
 
-// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request.
+// DescribeClientQuotasResponse represents a response from a kafka broker to a describe client quota request.
 type DescribeClientQuotasResponse struct {
 	// The amount of time that the broker throttled the request.
 	Throttle time.Duration
diff --git docker-compose-241.yml docker-compose-241.yml
deleted file mode 100644
index 6feb1844b..000000000
--- docker-compose-241.yml
+++ /dev/null
@@ -1,32 +0,0 @@
-version: "3"
-services:
-  kafka:
-    image: wurstmeister/kafka:2.12-2.4.1
-    restart: on-failure:3
-    links:
-    - zookeeper
-    ports:
-    - 9092:9092
-    - 9093:9093
-    environment:
-      KAFKA_VERSION: '2.4.1'
-      KAFKA_BROKER_ID: '1'
-      KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
-      KAFKA_DELETE_TOPIC_ENABLE: 'true'
-      KAFKA_ADVERTISED_HOST_NAME: 'localhost'
-      KAFKA_ADVERTISED_PORT: '9092'
-      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_MESSAGE_MAX_BYTES: '200000000'
-      KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
-      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
-      KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
-      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
-      CUSTOM_INIT_SCRIPT: |-
-        echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
-        /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-
-  zookeeper:
-    image: wurstmeister/zookeeper
-    ports:
-    - 2181:2181
diff --git docker-compose.010.yml docker-compose.010.yml
deleted file mode 100644
index 56123f85c..000000000
--- docker-compose.010.yml
+++ /dev/null
@@ -1,29 +0,0 @@
-version: "3"
-services:
-  kafka:
-    image: wurstmeister/kafka:0.10.1.1
-    links:
-    - zookeeper
-    ports:
-    - 9092:9092
-    - 9093:9093
-    environment:
-      KAFKA_BROKER_ID: '1'
-      KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
-      KAFKA_DELETE_TOPIC_ENABLE: 'true'
-      KAFKA_ADVERTISED_HOST_NAME: 'localhost'
-      KAFKA_ADVERTISED_PORT: '9092'
-      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_MESSAGE_MAX_BYTES: '200000000'
-      KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
-      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
-      KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
-      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
-      CUSTOM_INIT_SCRIPT: |-
-        echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
-
-  zookeeper:
-    image: wurstmeister/zookeeper
-    ports:
-    - 2181:2181
diff --git docker-compose.yml docker-compose.yml
index dc0c2e85e..dffb0e448 100644
--- docker-compose.yml
+++ docker-compose.yml
@@ -1,34 +1,39 @@
-version: "3"
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
 services:
+  zookeeper:
+    container_name: zookeeper
+    hostname: zookeeper
+    image: bitnami/zookeeper:latest
+    ports:
+      - 2181:2181
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: yes
   kafka:
-    image: wurstmeister/kafka:2.12-2.3.1
+    container_name: kafka
+    image: bitnami/kafka:3.7.0
     restart: on-failure:3
     links:
-    - zookeeper
+      - zookeeper
     ports:
-    - 9092:9092
-    - 9093:9093
+      - 9092:9092
+      - 9093:9093
     environment:
-      KAFKA_VERSION: '2.3.1'
-      KAFKA_BROKER_ID: '1'
-      KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
-      KAFKA_DELETE_TOPIC_ENABLE: 'true'
-      KAFKA_ADVERTISED_HOST_NAME: 'localhost'
-      KAFKA_ADVERTISED_PORT: '9092'
-      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_MESSAGE_MAX_BYTES: '200000000'
-      KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
-      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
-      KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
-      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
-      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
-      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
-      CUSTOM_INIT_SCRIPT: |-
-        echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/kafka/config/kafka_server_jaas.conf;
-        /opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
-
-  zookeeper:
-    image: wurstmeister/zookeeper
-    ports:
-    - 2181:2181
+      KAFKA_CFG_BROKER_ID: 1
+      KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+      KAFKA_CFG_ADVERTISED_PORT: '9092'
+      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+      KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+      KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+      KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+      KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+      KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+      ALLOW_PLAINTEXT_LISTENER: yes
+    entrypoint:
+      - "/bin/bash"
+      - "-c"
+      - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/README.md b/docker_compose_versions/README.md
new file mode 100644
index 000000000..9d3b1639a
--- /dev/null
+++ docker_compose_versions/README.md
@@ -0,0 +1,152 @@
+# Bitnami Kafka
+
+This document outlines how to create a docker-compose file for a specific Bitnami Kafka version.
+
+
+## Steps to create docker-compose
+
+- Refer to [docker-hub Bitnami Kafka tags](https://hub.docker.com/r/bitnami/kafka/tags) and sort by NEWEST to locate the image preferred, for example: `2.7.0`
+- There is documentation in the (main branch)[https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md] for environment config setup information. Refer to the `Notable Changes` section.
+- Sometimes there is a need to understand how the set up is being done. To locate the appropriate Kafka release in the repo [bitnami/containers](https://github.com/bitnami/containers), go through the [kafka commit history](https://github.com/bitnami/containers/commits/main/bitnami/kafka).
+- Once a commit is located, Refer to README.md, Dockerfile, entrypoint and various init scripts to understand the environment variables to config server.properties mapping conventions. Alternatively, you can spin up the required Kafka image and refer the mapping inside the container.
+- Ensure you follow the environment variable conventions in your docker-compose. Without proper environment variables, the Kafka cluster cannot start or can start with undesired configs. For example, Since Kafka version 2.3, all server.properties docker-compose environment configs start with `KAFKA_CFG_<config_with_underscore>`
+- Older versions of Bitnami Kafka have different conventions and limited docker-compose environment variables exposed for configs needed in server.properties
+
+
+In kafka-go, for all the test cases to succeed, Kafka cluster should have following server.properties along with a relevant kafka_jaas.conf mentioned in the KAFKA_OPTS. Goal is to ensure that the docker-compose file generates below server.properties.
+
+
+server.properties
+```
+advertised.host.name=localhost
+advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
+advertised.port=9092
+auto.create.topics.enable=true
+broker.id=1
+delete.topic.enable=true
+group.initial.rebalance.delay.ms=0
+listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
+log.dirs=/kafka/kafka-logs-1d5951569d78
+log.retention.check.interval.ms=300000
+log.retention.hours=168
+log.segment.bytes=1073741824
+message.max.bytes=200000000
+num.io.threads=8
+num.network.threads=3
+num.partitions=1
+num.recovery.threads.per.data.dir=1
+offsets.topic.replication.factor=1
+port=9092
+sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+socket.receive.buffer.bytes=102400
+socket.request.max.bytes=104857600
+socket.send.buffer.bytes=102400
+transaction.state.log.min.isr=1
+transaction.state.log.replication.factor=1
+zookeeper.connect=zookeeper:2181
+zookeeper.connection.timeout.ms=6000
+```
+
+
+## run docker-compose and test cases
+
+run docker-compose
+```
+# docker-compose -f ./docker_compose_versions/docker-compose-<kafka_version>.yml up -d
+```
+
+
+run test cases
+```
+# go clean -cache; KAFKA_SKIP_NETTEST=1 KAFKA_VERSION=<a.b.c> go test -race -cover ./...;
+```
+
+
+## Various Bitnami Kafka version issues observed in circleci
+
+
+### Kafka v101, v111, v201, v211 and v221
+
+
+In kafka-go repo, all the tests require sasl.enabled.mechanisms as PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 for the Kafka cluster.
+
+
+It has been observed for Kafka v101, v111, v201, v211 and v221 which are used in the circleci for build have issues with SCRAM.
+
+
+There is no way to override the config sasl.enabled.mechanisms causing Kafka cluster to start up as PLAIN.
+
+
+There has been some attempts made to override sasl.enabled.mechanisms 
+- Modified entrypoint in docker-compose to append the server.properties with relevant configs sasl.enabled.mechanisms before running entrypoint.sh. This resulted in failures for Kafka v101, v111, v201, v211 and v221. Once Kafka server starts, server.properties gets appended with default value of sasl.enabled.mechanisms  there by cluster to start with out PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+- Mounted a docker-compose volume for server.propeties. However, This also resulted in failures for Kafka v101, v111, v201, v211 and v221. Once Kafka server starts, server.properties gets appended with default value of sasl.enabled.mechanisms there by cluster to start with out PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
+
+
+NOTE: 
+- Kafka v101, v111, v201, v211 and v221 have no docker-compose files since we need SCRAM for kafka-go test cases to succeed. 
+- There is no Bitnami Kafka image for v222 hence testing has been performed on v221
+
+
+### Kafka v231
+
+In Bitnami Kafka v2.3, all server.properties docker-compose environment configs start with `KAFKA_CFG_<config_with_underscore>`. However, it is not picking the custom populated kafka_jaas.conf.
+
+
+After a lot of debugging, it has been noticed that there aren't enough privileges to create the kafka_jaas.conf. Hence the environment variables below need to be added in docker-compose to generate the kafka_jaas.conf. This issue is not noticed after kafka v2.3
+
+
+```
+KAFKA_INTER_BROKER_USER: adminplain
+KAFKA_INTER_BROKER_PASSWORD: admin-secret
+KAFKA_BROKER_USER: adminplain
+KAFKA_BROKER_PASSWORD: admin-secret
+```
+
+There is a docker-compose file `docker-compose-231.yml` in the folder `kafka-go/docker_compose_versions` for reference.
+
+
+## References
+
+
+For user reference, please find the some of the older kafka versions commits from the [kafka commit history](https://github.com/bitnami/containers/commits/main/bitnami/kafka). For Kafka versions with no commit history, data is populated with the latest version available for the tag.
+
+
+### Kafka v010: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-010.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=0.10.2.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/c4240f0525916a418245c7ef46d9534a7a212c92/bitnami/kafka)
+
+
+### Kafka v011: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-011.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=0.11.0)
+- [kafka commit](https://github.com/bitnami/containers/tree/7724adf655e4ca9aac69d606d41ad329ef31eeca/bitnami/kafka)
+
+
+### Kafka v101: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=1.0.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/44cc8f4c43ead6edebd3758c8df878f4f9da82c2/bitnami/kafka)
+
+
+### Kafka v111: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=1.1.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/cb593dc98c2eb7a39f2792641e741d395dbe50e7/bitnami/kafka)
+
+
+### Kafka v201: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.0.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/9ff8763df265c87c8b59f8d7ff0cf69299d636c9/bitnami/kafka)
+
+
+### Kafka v211: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.1.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/d3a9d40afc2b7e7de53486538a63084c1a565d43/bitnami/kafka)
+
+
+### Kafka v221: docker-compose reference: N/A
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.2.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/f132ef830d1ba9b78392ec4619174b4640c276c9/bitnami/kafka)
+
+
+### Kafka v231: docker-compose reference: `kafka-go/docker_compose_versions/docker-compose-231.yml`
+- [tag](https://hub.docker.com/r/bitnami/kafka/tags?page=1&ordering=last_updated&name=2.3.1)
+- [kafka commit](https://github.com/bitnami/containers/tree/ae572036b5281456b0086345fec0bdb74f7cf3a3/bitnami/kafka)
+
diff --git a/docker_compose_versions/docker-compose-010.yml b/docker_compose_versions/docker-compose-010.yml
new file mode 100644
index 000000000..35df0b5c6
--- /dev/null
+++ docker_compose_versions/docker-compose-010.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+  zookeeper:
+    container_name: zookeeper
+    hostname: zookeeper
+    image: bitnami/zookeeper:latest
+    ports:
+      - 2181:2181
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: yes
+  kafka:
+    container_name: kafka
+    image: bitnami/kafka:0.10.2.1
+    restart: on-,failure:3
+    links:
+      - zookeeper
+    ports:
+      - 9092:9092
+      - 9093:9093
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_ADVERTISED_HOST_NAME: 'localhost'
+      KAFKA_ADVERTISED_PORT: '9092'
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_MESSAGE_MAX_BYTES: '200000000'
+      KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+      KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf"
+      ALLOW_PLAINTEXT_LISTENER: yes
+    entrypoint:
+      - "/bin/bash"
+      - "-c"
+      - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_server_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram; exec /app-entrypoint.sh /start-kafka.sh
diff --git a/docker_compose_versions/docker-compose-270.yml b/docker_compose_versions/docker-compose-270.yml
new file mode 100644
index 000000000..de48cb290
--- /dev/null
+++ docker_compose_versions/docker-compose-270.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+  zookeeper:
+    container_name: zookeeper
+    hostname: zookeeper
+    image: bitnami/zookeeper:latest
+    ports:
+      - 2181:2181
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: yes
+  kafka:
+    container_name: kafka
+    image: bitnami/kafka:2.7.0
+    restart: on-failure:3
+    links:
+      - zookeeper
+    ports:
+      - 9092:9092
+      - 9093:9093
+    environment:
+      KAFKA_CFG_BROKER_ID: 1
+      KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+      KAFKA_CFG_ADVERTISED_PORT: '9092'
+      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+      KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+      KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+      KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+      KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
+      KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+      ALLOW_PLAINTEXT_LISTENER: yes
+    entrypoint:
+      - "/bin/bash"
+      - "-c"
+      - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/docker-compose-370.yml b/docker_compose_versions/docker-compose-370.yml
new file mode 100644
index 000000000..dffb0e448
--- /dev/null
+++ docker_compose_versions/docker-compose-370.yml
@@ -0,0 +1,39 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+  zookeeper:
+    container_name: zookeeper
+    hostname: zookeeper
+    image: bitnami/zookeeper:latest
+    ports:
+      - 2181:2181
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: yes
+  kafka:
+    container_name: kafka
+    image: bitnami/kafka:3.7.0
+    restart: on-failure:3
+    links:
+      - zookeeper
+    ports:
+      - 9092:9092
+      - 9093:9093
+    environment:
+      KAFKA_CFG_BROKER_ID: 1
+      KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+      KAFKA_CFG_ADVERTISED_PORT: '9092'
+      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+      KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
+      KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
+      KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+      KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
+      KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+      ALLOW_PLAINTEXT_LISTENER: yes
+    entrypoint:
+      - "/bin/bash"
+      - "-c"
+      - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n  };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh
diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml
new file mode 100644
index 000000000..b563d5e39
--- /dev/null
+++ docker_compose_versions/docker-compose-400.yml
@@ -0,0 +1,40 @@
+# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list.
+version: '3'
+services:
+  kafka:
+    container_name: kafka
+    image: bitnami/kafka:4.0.0
+    restart: on-failure:3
+    ports:
+      - 9092:9092
+      - 9093:9093
+    environment:
+      KAFKA_CFG_NODE_ID: 1
+      KAFKA_CFG_BROKER_ID: 1
+      KAFKA_CFG_PROCESS_ROLES: broker,controller
+      KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost'
+      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT
+      KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093
+      KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093
+      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN
+      KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
+      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
+      ALLOW_PLAINTEXT_LISTENER: yes
+      KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
+      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000'
+      KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer'
+      KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain
+      KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain
+      KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret
+      KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
+      KAFKA_INTER_BROKER_USER: adminscram512
+      KAFKA_INTER_BROKER_PASSWORD: admin-secret-512
+      KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512
+      # Note you will need to increase this to at least 4GB of memory for the tests to pass
+      # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900
+      KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m'
+      KAFKA_JVM_OPTS: '-XX:+UseG1GC'
\ No newline at end of file
diff --git error.go error.go
index 4a7a8a278..300a1412f 100644
--- error.go
+++ error.go
@@ -329,6 +329,8 @@ func (e Error) Title() string {
 		return "Unsupported Compression Type"
 	case MemberIDRequired:
 		return "Member ID Required"
+	case FencedInstanceID:
+		return "Fenced Instance ID"
 	case EligibleLeadersNotAvailable:
 		return "Eligible Leader Not Available"
 	case ElectionNotNeeded:
@@ -538,6 +540,8 @@ func (e Error) Description() string {
 		return "the requesting client does not support the compression type of given partition"
 	case MemberIDRequired:
 		return "the group member needs to have a valid member id before actually entering a consumer group"
+	case FencedInstanceID:
+		return "the broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id"
 	case EligibleLeadersNotAvailable:
 		return "eligible topic partition leaders are not available"
 	case ElectionNotNeeded:
@@ -636,6 +640,7 @@ func coalesceErrors(errs ...error) error {
 	return nil
 }
 
+// MessageTooLargeError is returned when a message is too large to fit within the allowed size.
 type MessageTooLargeError struct {
 	Message   Message
 	Remaining []Message
@@ -655,6 +660,10 @@ func (e MessageTooLargeError) Error() string {
 	return MessageSizeTooLarge.Error()
 }
 
+func (e MessageTooLargeError) Unwrap() error {
+	return MessageSizeTooLarge
+}
+
 func makeError(code int16, message string) error {
 	if code == 0 {
 		return nil
diff --git error_test.go error_test.go
index 3d461154c..8ae158661 100644
--- error_test.go
+++ error_test.go
@@ -3,6 +3,8 @@ package kafka
 import (
 	"fmt"
 	"testing"
+
+	"github.com/stretchr/testify/assert"
 )
 
 func TestError(t *testing.T) {
@@ -110,4 +112,16 @@ func TestError(t *testing.T) {
 			t.Error("non-empty description:", s)
 		}
 	})
+
+	t.Run("MessageTooLargeError error.Is satisfaction", func(t *testing.T) {
+		err := MessageSizeTooLarge
+		msg := []Message{
+			{Key: []byte("key"), Value: []byte("value")},
+			{Key: []byte("key"), Value: make([]byte, 8)},
+		}
+		msgTooLarge := messageTooLarge(msg, 1)
+		assert.NotErrorIs(t, err, msgTooLarge)
+		assert.Contains(t, msgTooLarge.Error(), MessageSizeTooLarge.Error())
+		assert.ErrorIs(t, msgTooLarge, MessageSizeTooLarge)
+	})
 }
diff --git examples/docker-compose.yaml examples/docker-compose.yaml
index 01b60fdbf..d06e64688 100644
--- examples/docker-compose.yaml
+++ examples/docker-compose.yaml
@@ -3,18 +3,27 @@ services:
 
   zookeeper:
     hostname: zookeeper
-    image: wurstmeister/zookeeper:3.4.6
+    image: bitnami/zookeeper:latest
+    restart: always
     expose:
     - "2181"
     ports:
     - "2181:2181"
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: yes
   
   kafka:
-    image: wurstmeister/kafka
+    hostname: kafka
+    image: bitnami/kafka:2.7.0
+    restart: always
     env_file:
     - kafka/kafka-variables.env
     depends_on:
     - zookeeper
+    expose:
+    - "9092"
+    - "8082"
+    - "8083"
     ports:
     - '9092:9092'
     - '8082:8082'
@@ -22,6 +31,7 @@ services:
 
   mongo-db:
     image: mongo:4.0
+    restart: always
     expose:
     - "27017"
     ports:
@@ -39,10 +49,11 @@ services:
       collectionName: example_coll
       kafkaURL: kafka:9092
       topic: topic1
-      GroupID: mongo-group
+      groupID: mongo-group
     depends_on: 
     - kafka
     - mongo-db
+    restart: always
 
   consumer-logger:
     build:
@@ -50,9 +61,10 @@ services:
     environment:
       kafkaURL: kafka:9092
       topic: topic1
-      GroupID: logger-group
+      groupID: logger-group
     depends_on: 
     - kafka
+    restart: always
 
   producer-random:
     build:
@@ -62,6 +74,7 @@ services:
       topic: topic1
     depends_on: 
     - kafka
+    restart: always
 
   producer-api:
     build:
@@ -74,4 +87,5 @@ services:
     ports:
     - "8080:8080"
     depends_on: 
-    - kafka
\ No newline at end of file
+    - kafka
+    restart: always
diff --git examples/kafka/kafka-variables.env examples/kafka/kafka-variables.env
index dc19833ac..9d6ce8668 100644
--- examples/kafka/kafka-variables.env
+++ examples/kafka/kafka-variables.env
@@ -1,22 +1,23 @@
 
-KAFKA_ADVERTISED_HOST_NAME=kafka
-KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
-KAFKA_CONNECT_BOOTSTRAP_SERVERS=localhost:9092
+KAFKA_CFG_ADVERTISED_HOST_NAME=kafka
+KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
+KAFKA_CFG_CONNECT_BOOTSTRAP_SERVERS=localhost:9092
 
-KAFKA_CONNECT_REST_PORT=8082
-KAFKA_CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
+KAFKA_CFG_CONNECT_REST_PORT=8082
+KAFKA_CFG_CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
 
-KAFKA_CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=0
 
-KAFKA_CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-KAFKA_CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE=0
-KAFKA_CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
+KAFKA_CFG_CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE=0
+KAFKA_CFG_CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=0
 
-KAFKA_CONNECT_OFFSET_STORAGE_FILE_FILENAME="/tmp/connect.offsets"
+KAFKA_CFG_CONNECT_OFFSET_STORAGE_FILE_FILENAME="/tmp/connect.offsets"
 # Flush much faster than normal, which is useful for testing/debugging
-KAFKA_CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
+KAFKA_CFG_CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
 
+ALLOW_PLAINTEXT_LISTENER: yes
diff --git initproducerid_test.go initproducerid_test.go
index 061819e58..b5110a00f 100644
--- initproducerid_test.go
+++ initproducerid_test.go
@@ -15,6 +15,13 @@ func TestClientInitProducerId(t *testing.T) {
 	if !ktesting.KafkaIsAtLeast("0.11.0") {
 		return
 	}
+
+	// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+	// work is revisited.
+	if ktesting.KafkaIsAtLeast("3.0.0") {
+		t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+	}
+
 	client, shutdown := newLocalClient()
 	defer shutdown()
 
diff --git joingroup.go joingroup.go
index 30823a69a..f3d90a937 100644
--- joingroup.go
+++ joingroup.go
@@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
 	wb.writeBytes(t.ProtocolMetadata)
 }
 
-type joinGroupRequestV1 struct {
+type joinGroupRequest struct {
 	// GroupID holds the unique group identifier
 	GroupID string
 
@@ -264,7 +264,7 @@ type joinGroupRequestV1 struct {
 	GroupProtocols []joinGroupRequestGroupProtocolV1
 }
 
-func (t joinGroupRequestV1) size() int32 {
+func (t joinGroupRequest) size() int32 {
 	return sizeofString(t.GroupID) +
 		sizeofInt32(t.SessionTimeout) +
 		sizeofInt32(t.RebalanceTimeout) +
@@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 {
 		sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
 }
 
-func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
+func (t joinGroupRequest) writeTo(wb *writeBuffer) {
 	wb.writeString(t.GroupID)
 	wb.writeInt32(t.SessionTimeout)
 	wb.writeInt32(t.RebalanceTimeout)
@@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
 }
 
-type joinGroupResponseMemberV1 struct {
+type joinGroupResponseMember struct {
 	// MemberID assigned by the group coordinator
 	MemberID       string
 	MemberMetadata []byte
 }
 
-func (t joinGroupResponseMemberV1) size() int32 {
+func (t joinGroupResponseMember) size() int32 {
 	return sizeofString(t.MemberID) +
 		sizeofBytes(t.MemberMetadata)
 }
 
-func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponseMember) writeTo(wb *writeBuffer) {
 	wb.writeString(t.MemberID)
 	wb.writeBytes(t.MemberMetadata)
 }
 
-func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) {
 	if remain, err = readString(r, size, &t.MemberID); err != nil {
 		return
 	}
@@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain
 	return
 }
 
-type joinGroupResponseV1 struct {
+type joinGroupResponse struct {
+	v apiVersion // v1, v2
+
+	ThrottleTime int32
+
 	// ErrorCode holds response error code
 	ErrorCode int16
 
@@ -323,19 +327,26 @@ type joinGroupResponseV1 struct {
 
 	// MemberID assigned by the group coordinator
 	MemberID string
-	Members  []joinGroupResponseMemberV1
+	Members  []joinGroupResponseMember
 }
 
-func (t joinGroupResponseV1) size() int32 {
-	return sizeofInt16(t.ErrorCode) +
+func (t joinGroupResponse) size() int32 {
+	sz := sizeofInt16(t.ErrorCode) +
 		sizeofInt32(t.GenerationID) +
 		sizeofString(t.GroupProtocol) +
 		sizeofString(t.LeaderID) +
 		sizeofString(t.MemberID) +
 		sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
+	if t.v >= v2 {
+		sz += sizeofInt32(t.ThrottleTime)
+	}
+	return sz
 }
 
-func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
+func (t joinGroupResponse) writeTo(wb *writeBuffer) {
+	if t.v >= v2 {
+		wb.writeInt32(t.ThrottleTime)
+	}
 	wb.writeInt16(t.ErrorCode)
 	wb.writeInt32(t.GenerationID)
 	wb.writeString(t.GroupProtocol)
@@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
 	wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
 }
 
-func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
-	if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
+func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
+	remain = size
+	if t.v >= v2 {
+		if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil {
+			return
+		}
+	}
+	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
 		return
 	}
 	if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
@@ -362,7 +379,7 @@ func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, e
 	}
 
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
-		var item joinGroupResponseMemberV1
+		var item joinGroupResponseMember
 		if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
 			return
 		}
diff --git joingroup_test.go joingroup_test.go
index 926f5b4a6..73922d6a0 100644
--- joingroup_test.go
+++ joingroup_test.go
@@ -217,37 +217,41 @@ func TestMemberMetadata(t *testing.T) {
 	}
 }
 
-func TestJoinGroupResponseV1(t *testing.T) {
-	item := joinGroupResponseV1{
-		ErrorCode:     2,
-		GenerationID:  3,
-		GroupProtocol: "a",
-		LeaderID:      "b",
-		MemberID:      "c",
-		Members: []joinGroupResponseMemberV1{
-			{
-				MemberID:       "d",
-				MemberMetadata: []byte("blah"),
+func TestJoinGroupResponse(t *testing.T) {
+	supportedVersions := []apiVersion{v1, v2}
+	for _, v := range supportedVersions {
+		item := joinGroupResponse{
+			v:             v,
+			ErrorCode:     2,
+			GenerationID:  3,
+			GroupProtocol: "a",
+			LeaderID:      "b",
+			MemberID:      "c",
+			Members: []joinGroupResponseMember{
+				{
+					MemberID:       "d",
+					MemberMetadata: []byte("blah"),
+				},
 			},
-		},
-	}
+		}
 
-	b := bytes.NewBuffer(nil)
-	w := &writeBuffer{w: b}
-	item.writeTo(w)
+		b := bytes.NewBuffer(nil)
+		w := &writeBuffer{w: b}
+		item.writeTo(w)
 
-	var found joinGroupResponseV1
-	remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
-	if err != nil {
-		t.Error(err)
-		t.FailNow()
-	}
-	if remain != 0 {
-		t.Errorf("expected 0 remain, got %v", remain)
-		t.FailNow()
-	}
-	if !reflect.DeepEqual(item, found) {
-		t.Error("expected item and found to be the same")
-		t.FailNow()
+		found := joinGroupResponse{v: v}
+		remain, err := (&found).readFrom(bufio.NewReader(b), b.Len())
+		if err != nil {
+			t.Error(err)
+			t.FailNow()
+		}
+		if remain != 0 {
+			t.Errorf("expected 0 remain, got %v", remain)
+			t.FailNow()
+		}
+		if !reflect.DeepEqual(item, found) {
+			t.Error("expected item and found to be the same")
+			t.FailNow()
+		}
 	}
 }
diff --git listgroups.go listgroups.go
index 229de9352..5034b5440 100644
--- listgroups.go
+++ listgroups.go
@@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int,
 
 	fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
 		var item listGroupsResponseGroupV1
-		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil {
 			return
 		}
 		t.Groups = append(t.Groups, item)
diff --git listoffset.go listoffset.go
index 11c5d04b4..97779cecf 100644
--- listoffset.go
+++ listoffset.go
@@ -17,7 +17,7 @@ type OffsetRequest struct {
 }
 
 // FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
-// the parition given as argument.
+// the partition given as argument.
 func FirstOffsetOf(partition int) OffsetRequest {
 	return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
 }
diff --git metadata.go metadata.go
index 429a6a260..d151071b3 100644
--- metadata.go
+++ metadata.go
@@ -19,7 +19,7 @@ type MetadataRequest struct {
 	Topics []string
 }
 
-// MetadatResponse represents a response from a kafka broker to a metadata
+// MetadataResponse represents a response from a kafka broker to a metadata
 // request.
 type MetadataResponse struct {
 	// The amount of time that the broker throttled the request.
diff --git offsetfetch.go offsetfetch.go
index b85bc5c83..ce80213f8 100644
--- offsetfetch.go
+++ offsetfetch.go
@@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem
 
 	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
 		item := offsetFetchResponseV1PartitionResponse{}
-		if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
+		if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
 			return
 		}
 		t.PartitionResponses = append(t.PartitionResponses, item)
diff --git protocol/record.go protocol/record.go
index e11af4dcc..c7987c390 100644
--- protocol/record.go
+++ protocol/record.go
@@ -191,7 +191,7 @@ func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) {
 			// Reconstruct the prefix that we had to read to determine the version
 			// of the record set from the magic byte.
 			//
-			// Technically this may recurisvely stack readers when consuming all
+			// Technically this may recursively stack readers when consuming all
 			// items of the batch, which could hurt performance. In practice this
 			// path should not be taken tho, since the decoder would read from a
 			// *bufio.Reader which implements the bufferedReader interface.
@@ -304,7 +304,7 @@ type RawRecordSet struct {
 // then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
 //
 // Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a
-// performance standpoint as it require an extra copy of the record bytes. Holding off
+// performance standpoint as it requires an extra copy of the record bytes. Holding off
 // on optimizing, as this code path is only invoked in tests.
 func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) {
 	rs := &RecordSet{}
diff --git reader.go reader.go
index cfc7cb8f5..04d90f355 100644
--- reader.go
+++ reader.go
@@ -469,9 +469,11 @@ type ReaderConfig struct {
 	JoinGroupBackoff time.Duration
 
 	// RetentionTime optionally sets the length of time the consumer group will be saved
-	// by the broker
+	// by the broker. -1 will disable the setting and leave the
+	// retention up to the broker's offsets.retention.minutes property. By
+	// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.
 	//
-	// Default: 24h
+	// Default: -1
 	//
 	// Only used when GroupID is set
 	RetentionTime time.Duration
diff --git reader_test.go reader_test.go
index f413d7429..63d81816e 100644
--- reader_test.go
+++ reader_test.go
@@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
 
 	conn.SetDeadline(time.Now().Add(10 * time.Second))
 
-	_, err = conn.createTopics(createTopicsRequestV0{
+	_, err = conn.createTopics(createTopicsRequest{
 		Topics: []createTopicsRequestV0Topic{
 			{
 				Topic:             topic,
@@ -309,7 +309,7 @@ func createTopic(t *testing.T, topic string, partitions int) {
 				ReplicationFactor: 1,
 			},
 		},
-		Timeout: milliseconds(time.Second),
+		Timeout: milliseconds(5 * time.Second),
 	})
 	if err != nil {
 		if !errors.Is(err, TopicAlreadyExists) {
@@ -364,8 +364,8 @@ func waitForTopic(ctx context.Context, t *testing.T, topic string) {
 			}
 		}
 
-		t.Logf("retrying after 1s")
-		time.Sleep(time.Second)
+		t.Logf("retrying after 100ms")
+		time.Sleep(100 * time.Millisecond)
 		continue
 	}
 }
@@ -1374,7 +1374,9 @@ func TestCommitOffsetsWithRetry(t *testing.T) {
 // than partitions in a group.
 // https://github.com/segmentio/kafka-go/issues/200
 func TestRebalanceTooManyConsumers(t *testing.T) {
-	ctx := context.Background()
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
 	conf := ReaderConfig{
 		Brokers: []string{"localhost:9092"},
 		GroupID: makeGroupID(),
@@ -1384,8 +1386,15 @@ func TestRebalanceTooManyConsumers(t *testing.T) {
 
 	// Create the first reader and wait for it to become the leader.
 	r1 := NewReader(conf)
+
+	// Give the reader some time to setup before reading a message
+	time.Sleep(1 * time.Second)
 	prepareReader(t, ctx, r1, makeTestSequence(1)...)
-	r1.ReadMessage(ctx)
+
+	_, err := r1.ReadMessage(ctx)
+	if err != nil {
+		t.Fatalf("failed to read message: %v", err)
+	}
 	// Clear the stats from the first rebalance.
 	r1.Stats()
 
@@ -1559,17 +1568,22 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
 	}
 }
 
-func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) {
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
 	defer cancel()
 
 	client, shutdown := newLocalClient()
 	defer shutdown()
-
+	t1 := makeTopic()
+	createTopic(t, t1, 1)
+	defer deleteTopic(t, t1)
+	t2 := makeTopic()
+	createTopic(t, t2, 1)
+	defer deleteTopic(t, t2)
 	conf := ReaderConfig{
 		Brokers:                []string{"localhost:9092"},
 		GroupID:                makeGroupID(),
-		GroupTopics:            []string{makeTopic(), makeTopic()},
+		GroupTopics:            []string{t1, t2},
 		MaxWait:                time.Second,
 		PartitionWatchInterval: 100 * time.Millisecond,
 		WatchPartitionChanges:  true,
diff --git record.go record.go
index 1750889ac..8f8f7bd92 100644
--- record.go
+++ record.go
@@ -35,7 +35,7 @@ type Record = protocol.Record
 // RecordReader values are not safe to use concurrently from multiple goroutines.
 type RecordReader = protocol.RecordReader
 
-// NewRecordReade reconstructs a RecordSet which exposes the sequence of records
+// NewRecordReader reconstructs a RecordSet which exposes the sequence of records
 // passed as arguments.
 func NewRecordReader(records ...Record) RecordReader {
 	return protocol.NewRecordReader(records...)
diff --git sasl/sasl_test.go sasl/sasl_test.go
index a4101391a..57ff8b7cf 100644
--- sasl/sasl_test.go
+++ sasl/sasl_test.go
@@ -18,6 +18,11 @@ const (
 )
 
 func TestSASL(t *testing.T) {
+	scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"}
+	// kafka 4.0.0 test environment supports only different users for different scram algorithms.
+	if ktesting.KafkaIsAtLeast("4.0.0") {
+		scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"}
+	}
 	tests := []struct {
 		valid    func() sasl.Mechanism
 		invalid  func() sasl.Mechanism
@@ -39,22 +44,22 @@ func TestSASL(t *testing.T) {
 		},
 		{
 			valid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256")
+				mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256")
 				return mech
 			},
 			invalid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword")
+				mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword")
 				return mech
 			},
 			minKafka: "0.10.2.0",
 		},
 		{
 			valid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512")
+				mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512")
 				return mech
 			},
 			invalid: func() sasl.Mechanism {
-				mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword")
+				mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword")
 				return mech
 			},
 			minKafka: "0.10.2.0",
diff --git a/scripts/wait-for-kafka.sh b/scripts/wait-for-kafka.sh
new file mode 100755
index 000000000..5cd336556
--- /dev/null
+++ scripts/wait-for-kafka.sh
@@ -0,0 +1,20 @@
+#/bin/bash
+
+COUNTER=0; 
+echo foo | nc localhost 9092
+STATUS=$?
+ATTEMPTS=60
+until [ ${STATUS} -eq 0 ] || [ "$COUNTER" -ge "${ATTEMPTS}" ];
+do
+    let COUNTER=$COUNTER+1;
+    sleep 1;
+    echo "[$COUNTER] waiting for 9092 port to be open";
+    echo foo | nc localhost 9092
+    STATUS=$?
+done
+
+if [ "${COUNTER}" -gt "${ATTEMPTS}" ];
+then
+    echo "Kafka is not running, failing"
+    exit 1
+fi
\ No newline at end of file
diff --git txnoffsetcommit_test.go txnoffsetcommit_test.go
index eb3d33cbd..409584dbc 100644
--- txnoffsetcommit_test.go
+++ txnoffsetcommit_test.go
@@ -16,11 +16,19 @@ func TestClientTxnOffsetCommit(t *testing.T) {
 		t.Skip("Skipping test because kafka version is not high enough.")
 	}
 
+	// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
+	// work is revisited.
+	if ktesting.KafkaIsAtLeast("3.0.0") {
+		t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
+	}
+
 	transactionalID := makeTransactionalID()
 	topic := makeTopic()
 
 	client, shutdown := newLocalClientWithTopic(topic, 1)
 	defer shutdown()
+	waitForTopic(context.TODO(), t, topic)
+	defer deleteTopic(t, topic)
 
 	now := time.Now()
 
diff --git writer.go writer.go
index 3c7af907a..3817bf538 100644
--- writer.go
+++ writer.go
@@ -635,7 +635,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
 		}
 	}
 
-	// We use int32 here to half the memory footprint (compared to using int
+	// We use int32 here to halve the memory footprint (compared to using int
 	// on 64 bits architectures). We map lists of the message indexes instead
 	// of the message values for the same reason, int32 is 4 bytes, vs a full
 	// Message value which is 100+ bytes and contains pointers and contributes
diff --git writer_test.go writer_test.go
index 6f894ecd3..bd64b668b 100644
--- writer_test.go
+++ writer_test.go
@@ -856,7 +856,7 @@ func testWriterAutoCreateTopic(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 		defer cancel()
 		err = w.WriteMessages(ctx, msg)
-		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) {
 			time.Sleep(time.Millisecond * 250)
 			continue
 		}
@@ -924,7 +924,7 @@ func testWriterSasl(t *testing.T) {
 		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 		defer cancel()
 		err = w.WriteMessages(ctx, msg)
-		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
+		if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) {
 			time.Sleep(time.Millisecond * 250)
 			continue
 		}

Description

This PR updates the CI/CD setup to use bitnami/kafka Docker images instead of wurstmeister/kafka. The PR reduces the number of Kafka versions tested from 10 to 4, focusing on key versions (0.10, 2.7, 2.8, and 3.7). It also updates various test cases to accommodate differences in the new Kafka image configurations and adds compatibility for newer Kafka versions.

Possible Issues

  • The removal of tests for multiple Kafka versions (0.11, 1.0.1, 1.1.1, etc.) might reduce test coverage for potential compatibility issues with those versions.
  • The PR comments out Kafka 4.0.0 testing due to frequent Java heap errors, which could mask potential issues with that version.
  • Some transaction-related tests are explicitly skipped for Kafka 3.0.0+ versions rather than fixed.
Changes

Changes

.circleci/config.yml

  • Replaced wurstmeister/kafka images with bitnami/kafka
  • Reduced test matrix from 10 Kafka versions to 4 versions (0.10, 2.7, 2.8, 3.7)
  • Added a wait-for-kafka script to ensure Kafka is ready before running tests
  • Updated environment variables to match bitnami/kafka configuration pattern (KAFKA_CFG_*)
  • Commented out testing for Kafka 4.0.0 due to heap issues

Docker configuration

  • Removed old docker-compose files
  • Added new docker-compose files for different versions in docker_compose_versions/
  • Added comprehensive README.md explaining Kafka version differences and configuration
  • Updated examples/docker-compose.yaml to use bitnami/kafka

Code changes

  • Updated various protocol implementations to support newer Kafka versions:
    • Modified createTopics, deleteTopics, joinGroup implementations to handle different API versions
    • Fixed error handling in various test functions
    • Added proper error type for FencedInstanceID
  • Added skips for transaction-related tests on newer Kafka versions
  • Fixed typos in documentation and comments
  • Added waitForTopic helper in tests to improve test reliability

Test improvements

  • Added proper cleanup after topic creation in tests
  • Added timeout handling for slow Kafka operations
  • Fixed consumer group rebalance test to be more reliable
sequenceDiagram
    participant CI as CircleCI
    participant Zookeeper as bitnami/zookeeper
    participant Kafka as bitnami/kafka
    participant Tests as kafka-go tests
    
    CI->>Zookeeper: Start container
    Note over Zookeeper: Configure with ALLOW_ANONYMOUS_LOGIN
    
    CI->>Kafka: Start container with version (0.10/2.7/2.8/3.7)
    Note over Kafka: Configure SASL, listeners, etc.
    
    Kafka->>Zookeeper: Connect and register
    
    CI->>CI: Run wait-for-kafka.sh
    Note over CI: Ensures Kafka is ready
    
    CI->>Tests: Execute tests
    
    Tests->>Kafka: Create topics
    Tests->>Kafka: Produce/consume messages
    Tests->>Kafka: Test SASL auth
    Tests->>Kafka: Test consumer groups
    
    alt Transaction tests
        alt Kafka < 3.0.0
            Tests->>Kafka: Run transaction tests
        else Kafka >= 3.0.0
            Tests--xKafka: Skip transaction tests
        end
    end
    
    Tests->>CI: Report results
Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants