diff --git a/internal/impl/kafka/enterprise/integration_test.go b/internal/impl/kafka/enterprise/integration_test.go index 047f72555..1ff3f85cb 100644 --- a/internal/impl/kafka/enterprise/integration_test.go +++ b/internal/impl/kafka/enterprise/integration_test.go @@ -854,3 +854,36 @@ func TestRedpandaMigratorIntegration(t *testing.T) { readMessageWithCG(t, source, dummyTopic, dummyCG, dummyMessage) t.Logf("Finished reading second message from destination with consumer group %q", dummyCG) } + +func TestRedpandaConsumerGroupIntegration(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + pool.MaxWait = time.Minute + + source, err := startRedpanda(t, pool, true, true) + require.NoError(t, err) + + t.Logf("Source broker: %s", source.brokerAddr) + + dummyTopic := "test" + + // Create a schema associated with the test topic + createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil) + + // Produce one message + dummyMessage := `{"test":"foo"}` + produceMessage(t, source, dummyTopic, dummyMessage) + + dummyCG := "foobar_consumer_group" + // Read the message from source using a consumer group + readMessageWithCG(t, source, dummyTopic, dummyCG, dummyMessage) + + // TODO: This should fail because the consumer group should be up to date + readMessageWithCG(t, source, dummyTopic, dummyCG, dummyMessage) + + // time.Sleep(5 * time.Second) + t.Logf("Finished reading first message from source with consumer group %q", dummyCG) +}