Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faust CreateTopicRequest not compatible with future versions of Kafka #154

Open
1 of 2 tasks
zackpayton opened this issue Aug 29, 2018 · 17 comments
Open
1 of 2 tasks

Comments

@zackpayton
Copy link

zackpayton commented Aug 29, 2018

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When running the following simple program, Faust fails to consume from a Kafka topic:

import faust

app = faust.App(
        'faust-leader',
        broker='kafka://my-kafka:9092'
    )
topic = app.topic('my-topic', value_type=str)

@app.agent(topic)
async def process(stream):
    async for value in stream:
        print(value)


if __name__ == '__main__':
    app.main()
$ faust -A leader worker -l debug --web-port 6066

Expected behavior

I would expect this program to start printing messages from my kafka topic.

Actual behavior

A full stacktrace is thrown.

faust -A leader worker -l debug --web-port 6066
┌ƒaµS† v1.0.30─────────────────────────────────────────────────────────┐
│ id        │ faust-leader                                             │
│ transport │ kafka://my-kafka:9092  │
│ store     │ memory:                                                  │
│ web       │ http://localhost:6066/                                   │
│ log       │ -stderr- (debug)                                         │
│ pid       │ 117                                                     │
│ hostname  │ dc08c7d7783e                                                │
│ platform  │ CPython 3.7.0 (Linux x86_64)                            │
│ drivers   │ aiokafka=0.4.18 aiohttp=3.4.0                            │
│ datadir   │ /Users/zack.payton/faust/faust-leader-data               │
│ appdir    │ /Users/zack.payton/faust/faust-leader-data/v1            │
└───────────┴──────────────────────────────────────────────────────────┘
[2018-08-28 18:49:31,676: INFO]: [^Worker]: Starting...
[2018-08-28 18:49:31,676: INFO]: [^-Website]: Starting...
[2018-08-28 18:49:31,677: INFO]: [^--Web]: Starting...
[2018-08-28 18:49:31,677: DEBUG]: Using selector: KqueueSelector
[2018-08-28 18:49:31,679: INFO]: [^---ServerThread]: Starting...
[2018-08-28 18:49:31,679: INFO]: [^--Web]: Serving on http://localhost:6066/
[2018-08-28 18:49:31,680: DEBUG]: [^---ServerThread]: Started.
[2018-08-28 18:49:31,680: DEBUG]: [^--Web]: Started.
[2018-08-28 18:49:31,680: DEBUG]: [^-Website]: Started.
[2018-08-28 18:49:31,681: INFO]: [^-App]: Starting...
[2018-08-28 18:49:31,681: INFO]: [^--MonitorService]: Starting...
[2018-08-28 18:49:31,682: DEBUG]: [^--MonitorService]: Started.
[2018-08-28 18:49:31,682: INFO]: [^--Producer]: Starting...
[2018-08-28 18:49:31,682: DEBUG]: Starting the Kafka producer
[2018-08-28 18:49:31,682: DEBUG]: Attempting to bootstrap via node at my-kafka:9092
[2018-08-28 18:49:32,116: DEBUG]: <AIOKafkaConnection host=my-kafka port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=2, host='10.1.1.5', port=9092), (node_id=1, host='10.1.1.214', port=9092), (node_id=3, host='10.1.1.81', port=9092)], topics=[(error_code=0, topic='topic', partitions=[(error_code=0, partition=23, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=41, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=59, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=50, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=53, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=35, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=17, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=26, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=11, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=56, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=47, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=29, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=5, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=14, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=55, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=58, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=40, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=49, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=31, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=16, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=7, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=52, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=25, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=10, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=37, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=1, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=19, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=45, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=57, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=51, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2], isr=[2, 1])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=41, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=17, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=35, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=26, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=11, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=29, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=47, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=5, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=14, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=49, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=40, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=13, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=31, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=16, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=7, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=43, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=25, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=10, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=37, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=1, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=19, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=45, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1])]), (error_code=0, topic='my-tp, partitions=[(error_code=0, partition=23, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=41, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=59, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=50, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=53, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=8, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=44, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=35, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=17, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=11, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=56, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=29, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=47, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=5, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=46, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=55, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=58, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=49, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=40, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=22, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=31, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=16, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=7, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=52, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=25, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=34, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=10, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=37, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=19, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=28, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=45, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=36, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=27, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=9, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=18, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=21, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=48, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=57, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=3, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=12, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=30, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=39, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=15, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=42, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=51, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=24, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=33, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=6, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])]), (error_code=0, topic='__confluent.support.metrics', partitions=[(error_code=0, partition=0, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3])]), (error_code=0, topic='topic-alerts', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])])])
[2018-08-28 18:49:32,120: DEBUG]: Updated cluster metadata to ClusterMetadata(brokers: 3, topics: 5, groups: 0)
[2018-08-28 18:49:32,120: DEBUG]: Closing connection at my-kafka:9092
[2018-08-28 18:49:32,120: DEBUG]: Received cluster metadata: ClusterMetadata(brokers: 3, topics: 5, groups: 0)
[2018-08-28 18:49:32,121: DEBUG]: Initiating connection to node 2 at 10.1.1.5:9092
[2018-08-28 18:49:32,211: DEBUG]: <AIOKafkaConnection host=10.1.1.5 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=2), (api_key=1, min_version=0, max_version=3), (api_key=2, min_version=0, max_version=1), (api_key=3, min_version=0, max_version=2), (api_key=4, min_version=0, max_version=0), (api_key=5, min_version=0, max_version=0), (api_key=6, min_version=0, max_version=2), (api_key=7, min_version=1, max_version=1), (api_key=8, min_version=0, max_version=2), (api_key=9, min_version=0, max_version=1), (api_key=10, min_version=0, max_version=0), (api_key=11, min_version=0, max_version=1), (api_key=12, min_version=0, max_version=0), (api_key=13, min_version=0, max_version=0), (api_key=14, min_version=0, max_version=0), (api_key=15, min_version=0, max_version=0), (api_key=16, min_version=0, max_version=0), (api_key=17, min_version=0, max_version=0), (api_key=18, min_version=0, max_version=0), (api_key=19, min_version=0, max_version=0), (api_key=20, min_version=0, max_version=0)])
[2018-08-28 18:49:32,262: DEBUG]: <AIOKafkaConnection host=10.1.1.5 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=2, host='10.1.1.5', port=9092), (node_id=1, host='10.1.1.214', port=9092), (node_id=3, host='10.1.1.81', port=9092)], topics=[(error_code=0, topic='my-topic', partitions=[(error_code=0, partition=23, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=41, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=59, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=50, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=53, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=35, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=17, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=26, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=11, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=56, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=47, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=29, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=5, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=14, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=55, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=58, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=40, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=49, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=31, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=16, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=7, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=52, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=25, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=10, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=37, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=1, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=19, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=45, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=57, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=51, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2], isr=[2, 1])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=41, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=17, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=35, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=26, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=11, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=29, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=47, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=5, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=14, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=49, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=40, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=13, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=31, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=16, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=7, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=43, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=25, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=10, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=37, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=1, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=19, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=45, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1])]), (error_code=0, topic='my-topic', partitions=[(error_code=0, partition=23, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=41, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=59, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=50, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=53, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=8, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=44, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=35, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=17, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=11, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=56, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=29, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=47, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=5, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=46, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=55, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=58, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=49, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=40, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=22, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=31, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=16, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=7, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=52, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=25, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=34, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=10, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=37, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=19, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=28, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=45, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=36, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=27, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=9, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=18, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=21, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=48, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=57, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=3, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=12, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=30, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=39, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=15, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=42, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=51, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=24, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=33, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=6, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])]), (error_code=0, topic='__confluent.support.metrics', partitions=[(error_code=0, partition=0, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3])]), (error_code=0, topic='kafka-alerts', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])])])
[2018-08-28 18:49:32,300: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,301: DEBUG]: Kafka producer started
[2018-08-28 18:49:32,301: DEBUG]: [^--Producer]: Started.
[2018-08-28 18:49:32,301: INFO]: [^--Consumer]: Starting...
[2018-08-28 18:49:32,301: DEBUG]: Attempting to bootstrap via node at my-kafka:9092
[2018-08-28 18:49:32,402: DEBUG]: <AIOKafkaConnection host=my-kafka=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=2, host='10.1.1.5', port=9092), (node_id=1, host='10.1.1.214', port=9092), (node_id=3, host='10.1.1.81', port=9092)], topics=[(error_code=0, topic='my-topic', partitions=[(error_code=0, partition=23, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=41, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=59, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=50, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=53, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=35, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=17, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=26, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=11, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=56, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=47, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=29, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=5, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=14, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=55, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=58, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=40, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=49, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=31, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=16, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=7, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=52, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=25, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=10, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=37, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=1, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=19, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=45, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=57, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=51, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2], isr=[2, 1])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=41, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=17, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=35, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=26, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=11, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=29, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=47, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=5, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=14, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=49, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=40, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=13, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=31, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=16, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=7, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=43, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=25, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=10, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=37, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=1, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=19, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=45, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1])]), (error_code=0, topic='my-topic', partitions=[(error_code=0, partition=23, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=41, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=59, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=50, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=53, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=8, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=44, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=35, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=17, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=11, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=56, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=29, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=47, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=5, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=46, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=55, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=58, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=49, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=40, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=22, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=31, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=16, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=7, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=52, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=25, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=34, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=10, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=37, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=19, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=28, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=45, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=36, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=27, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=9, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=18, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=21, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=48, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=57, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=3, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=12, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=30, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=39, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=15, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=42, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=51, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=24, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=33, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=6, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])]), (error_code=0, topic='__confluent.support.metrics', partitions=[(error_code=0, partition=0, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3])]), (error_code=0, topic='topic-alerts', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])])])
[2018-08-28 18:49:32,406: DEBUG]: Updated cluster metadata to ClusterMetadata(brokers: 3, topics: 5, groups: 0)
[2018-08-28 18:49:32,406: DEBUG]: Closing connection at my-kafka:9092
[2018-08-28 18:49:32,406: DEBUG]: Received cluster metadata: ClusterMetadata(brokers: 3, topics: 5, groups: 0)
[2018-08-28 18:49:32,406: DEBUG]: Initiating connection to node 2 at 10.1.1.5:9092
[2018-08-28 18:49:32,496: DEBUG]: <AIOKafkaConnection host=10.1.1.5 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=2), (api_key=1, min_version=0, max_version=3), (api_key=2, min_version=0, max_version=1), (api_key=3, min_version=0, max_version=2), (api_key=4, min_version=0, max_version=0), (api_key=5, min_version=0, max_version=0), (api_key=6, min_version=0, max_version=2), (api_key=7, min_version=1, max_version=1), (api_key=8, min_version=0, max_version=2), (api_key=9, min_version=0, max_version=1), (api_key=10, min_version=0, max_version=0), (api_key=11, min_version=0, max_version=1), (api_key=12, min_version=0, max_version=0), (api_key=13, min_version=0, max_version=0), (api_key=14, min_version=0, max_version=0), (api_key=15, min_version=0, max_version=0), (api_key=16, min_version=0, max_version=0), (api_key=17, min_version=0, max_version=0), (api_key=18, min_version=0, max_version=0), (api_key=19, min_version=0, max_version=0), (api_key=20, min_version=0, max_version=0)])
[2018-08-28 18:49:32,547: DEBUG]: <AIOKafkaConnection host=10.1.1.5 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=2, host='10.1.1.5', port=9092), (node_id=1, host='10.1.1214', port=9092), (node_id=3, host='10.1.1.81', port=9092)], topics=[(error_code=0, topic='topic', partitions=[(error_code=0, partition=23, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=41, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=59, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=50, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=53, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=35, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=17, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=26, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=11, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=56, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=47, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=29, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=5, leader=3, replicas=[2, 3], isr=[3, 2]), (error_code=0, partition=14, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=55, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=58, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=40, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=49, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=31, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=16, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=7, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=52, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=25, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=10, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=37, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=1, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=19, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=45, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=57, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=51, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2], isr=[2, 1])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=41, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=32, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=8, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=17, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=44, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=35, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=26, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=11, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=29, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=38, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=47, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=20, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=2, leader=3, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=5, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=14, leader=3, replicas=[1, 2, 3], isr=[2, 1, 3]), (error_code=0, partition=46, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=49, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=40, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=4, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=13, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=22, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=31, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=16, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=7, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=43, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=25, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=34, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=10, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=37, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=1, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=19, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=28, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3]), (error_code=0, partition=45, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=36, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=27, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=9, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=18, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=21, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=48, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=12, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=3, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=30, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=39, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=15, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=42, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=24, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=33, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=6, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1]), (error_code=0, partition=0, leader=1, replicas=[1, 2, 3], isr=[2, 3, 1])]), (error_code=0, topic='my-topic', partitions=[(error_code=0, partition=23, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=41, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=32, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=59, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=50, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=53, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=8, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=44, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=35, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=17, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=26, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=11, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=56, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=38, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=29, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=47, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=20, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=5, leader=1, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=14, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=46, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=55, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=58, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=49, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=40, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=4, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=13, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=22, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=31, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=16, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=7, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=52, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=43, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=25, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=34, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=10, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=37, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=19, leader=3, replicas=[1, 3], isr=[1, 3]), (error_code=0, partition=28, leader=3, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=54, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=45, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=36, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=27, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=9, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=18, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=21, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=48, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=57, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=3, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=12, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=30, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=39, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=15, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=42, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=51, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=24, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=33, leader=2, replicas=[1, 2], isr=[1, 2]), (error_code=0, partition=6, leader=2, replicas=[2, 3], isr=[2, 3]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])]), (error_code=0, topic='__confluent.support.metrics', partitions=[(error_code=0, partition=0, leader=2, replicas=[1, 2, 3], isr=[1, 2, 3])]), (error_code=0, topic='topic-alerts', partitions=[(error_code=0, partition=2, leader=1, replicas=[1, 2], isr=[2, 1]), (error_code=0, partition=1, leader=3, replicas=[1, 3], isr=[3, 1]), (error_code=0, partition=0, leader=2, replicas=[2, 3], isr=[2, 3])])])
[2018-08-28 18:49:32,567: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,567: DEBUG]: [^--Consumer]: Started.
[2018-08-28 18:49:32,567: INFO]: [^--LeaderAssignor]: Starting...
[2018-08-28 18:49:32,567: INFO]: [^--Producer]: Creating topic faust-leader-__assignor-__leader
[2018-08-28 18:49:32,568: DEBUG]: Initiating connection to node 2 at 10.1.1.5:9092
[2018-08-28 18:49:32,665: DEBUG]: <AIOKafkaConnection host=10.1.1.5 port=9092> Response 1: MetadataResponse_v1(brokers=[(node_id=2, host='10.1.1.5', port=9092, rack=None), (node_id=1, host='10.1.1.214', port=9092, rack=None), (node_id=3, host='10.1.1.81', port=9092, rack=None)], controller_id=2, topics=[])
[2018-08-28 18:49:32,666: INFO]: [^--Producer]: Found controller: 2
[2018-08-28 18:49:32,711: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,711: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,711: DEBUG]: Initiating connection to node 1 at 10.1.1.214:9092
[2018-08-28 18:49:32,712: ERROR]: [^Worker]: Error: ConnectionError('Connection at 10.1.1.5:9092 broken')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/aiokafka/conn.py", line 213, in _read
    resp = yield from self._reader.readexactly(4)
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/worker.py", line 189, in execute_from_commandline
    self.loop.run_until_complete(self.start())
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/mode/worker.py", line 241, in start
    await super().start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 657, in start
    await self._default_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 662, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 678, in _actually_start
    await child.maybe_start()
  File "/usr/local/lib/python3.7/site-packages/mode/proxy.py", line 50, in maybe_start
    await self._service.maybe_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 701, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 657, in start
    await self._default_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 662, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 678, in _actually_start
    await child.maybe_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 701, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 657, in start
    await self._default_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 662, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 673, in _actually_start
    await self.on_start()
  File "/usr/local/lib/python3.7/site-packages/faust/assignor/leader_assignor.py", line 20, in on_start
    await leader_topic.maybe_declare()
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 36, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.7/site-packages/faust/topics.py", line 334, in maybe_declare
    await self.declare()
  File "/usr/local/lib/python3.7/site-packages/faust/topics.py", line 352, in declare
    retention=self.retention,
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 650, in create_topic
    ensure_created=ensure_created,
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 741, in _create_topic
    await wrap()
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 36, in __call__
    result = await self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 808, in _really_create_topic
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 587, in wait
    return await self._wait_one(coros[0], timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 625, in _wait_one
    f.result()  # propagate exceptions
  File "/usr/local/lib/python3.7/site-packages/aiokafka/client.py", line 432, in send
    result = yield from future
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 412, in wait_for
    return fut.result()
rhkafka.errors.ConnectionError: ConnectionError: Connection at 10.1.1.5:9092 broken
[2018-08-28 18:49:32,721: INFO]: [^Worker]: Stopping...
[2018-08-28 18:49:32,721: INFO]: [^-App]: Stopping...
[2018-08-28 18:49:32,721: INFO]: [^--TableManager]: Stopping...
[2018-08-28 18:49:32,721: INFO]: [^--Fetcher]: Stopping...
[2018-08-28 18:49:32,721: DEBUG]: [^--Fetcher]: Shutting down...
[2018-08-28 18:49:32,721: INFO]: [^--Fetcher]: -Stopped!
[2018-08-28 18:49:32,722: DEBUG]: [^--TableManager]: Shutting down...
[2018-08-28 18:49:32,722: INFO]: [^--TableManager]: -Stopped!
[2018-08-28 18:49:32,722: INFO]: [^--Conductor]: Stopping...
[2018-08-28 18:49:32,722: DEBUG]: [^--Conductor]: Shutting down...
[2018-08-28 18:49:32,722: INFO]: [^--Conductor]: -Stopped!
[2018-08-28 18:49:32,722: INFO]: [^AgentManagerService]: Stopping...
[2018-08-28 18:49:32,723: INFO]: [^--Agent]: Stopping...
[2018-08-28 18:49:32,723: DEBUG]: [^--Agent]: Shutting down...
[2018-08-28 18:49:32,723: INFO]: [^--Agent]: -Stopped!
[2018-08-28 18:49:32,723: DEBUG]: [^AgentManagerService]: Shutting down...
[2018-08-28 18:49:32,723: INFO]: [^AgentManagerService]: -Stopped!
[2018-08-28 18:49:32,723: INFO]: [^--ReplyConsumer]: Stopping...
[2018-08-28 18:49:32,723: DEBUG]: [^--ReplyConsumer]: Shutting down...
[2018-08-28 18:49:32,723: INFO]: [^--ReplyConsumer]: -Stopped!
[2018-08-28 18:49:32,723: INFO]: [^--LeaderAssignor]: Stopping...
[2018-08-28 18:49:32,723: DEBUG]: [^--LeaderAssignor]: Shutting down...
[2018-08-28 18:49:32,724: INFO]: [^--LeaderAssignor]: -Stopped!
[2018-08-28 18:49:32,724: INFO]: [^--Consumer]: Stopping...
[2018-08-28 18:49:32,724: DEBUG]: Closing the KafkaConsumer.
[2018-08-28 18:49:32,724: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,724: DEBUG]: The KafkaConsumer has closed.
[2018-08-28 18:49:32,724: DEBUG]: [^--Consumer]: Shutting down...
[2018-08-28 18:49:32,724: INFO]: [^--Consumer]: -Stopped!
[2018-08-28 18:49:32,724: INFO]: [^--Producer]: Stopping...
[2018-08-28 18:49:32,725: DEBUG]: Closing connection at 10.1.1.5:9092
[2018-08-28 18:49:32,725: DEBUG]: The Kafka producer has closed.
[2018-08-28 18:49:32,725: DEBUG]: [^--Producer]: Shutting down...
[2018-08-28 18:49:32,725: INFO]: [^--Producer]: -Stopped!
[2018-08-28 18:49:32,725: INFO]: [^--MonitorService]: Stopping...
[2018-08-28 18:49:32,725: DEBUG]: [^--MonitorService]: Shutting down...
[2018-08-28 18:49:32,725: INFO]: [^--MonitorService]: -Stopped!
[2018-08-28 18:49:32,726: DEBUG]: [^-App]: Shutting down...
[2018-08-28 18:49:32,726: INFO]: [^-App]: -Stopped!
[2018-08-28 18:49:32,726: INFO]: [^-Website]: Stopping...
[2018-08-28 18:49:32,726: INFO]: [^--Web]: Stopping...
[2018-08-28 18:49:32,726: INFO]: [^---ServerThread]: Stopping...
[2018-08-28 18:49:32,726: DEBUG]: [^---ServerThread]: Shutting down...
[2018-08-28 18:49:32,726: DEBUG]: [^---ServerThread]: Waiting for shutdown
[2018-08-28 18:49:32,726: INFO]: [^--Web]: Closing server
[2018-08-28 18:49:32,726: INFO]: [^--Web]: Waiting for server to close handle
[2018-08-28 18:49:32,726: INFO]: [^--Web]: Shutting down web application
[2018-08-28 18:49:32,727: INFO]: [^--Web]: Waiting for handler to shut down
[2018-08-28 18:49:32,727: INFO]: [^--Web]: Cleanup
[2018-08-28 18:49:32,727: DEBUG]: [^---ServerThread]: Shutting down now
[2018-08-28 18:49:32,727: INFO]: [^---ServerThread]: -Stopped!
[2018-08-28 18:49:32,727: DEBUG]: [^--Web]: Shutting down...
[2018-08-28 18:49:32,727: INFO]: [^--Web]: -Stopped!
[2018-08-28 18:49:32,727: DEBUG]: [^-Website]: Shutting down...
[2018-08-28 18:49:32,727: INFO]: [^-Website]: -Stopped!
[2018-08-28 18:49:32,727: DEBUG]: [^Worker]: Shutting down...
[2018-08-28 18:49:32,727: INFO]: [^Worker]: -Stopped!
[2018-08-28 18:49:32,727: INFO]: [^Worker]: Gathering service tasks...
[2018-08-28 18:49:32,728: INFO]: [^Worker]: Gathering all futures...
[2018-08-28 18:49:33,833: INFO]: [^Worker]: Closing event loop
starting^%        
                                                                                                                                                                                        
# Versions

* Python version - 3.7
* Faust version - 1.0.30
* Operating system - Tested with both OSX and Docker
* Kafka version - 0.11
* RocksDB version (if applicable) - not applicable

@ask
Copy link
Contributor

ask commented Aug 30, 2018

Very strange, I haven't seen this error here, it seems almost like it's a Kafka server error.
Is there anything in Kafka logs?

@zackpayton
Copy link
Author

My many apologies. Our Kafka version is actually 0.10.1.0 (sorry we don't actually manage our kafka infrastructure).
I was able to replicate this in a test lab and here is the kafka server side log:

==> server.log <==
[2018-08-30 13:05:19,472] ERROR Closing socket for 10.202.216.222:9092-10.202.216.222:49365 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 19 and apiVersion: 1
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 19: 1
	at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
	at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
	at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
	at org.apache.kafka.common.requests.CreateTopicsRequest.parse(CreateTopicsRequest.java:225)
	at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:80)
	at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96)
	at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:91)
	at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492)
	at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
	at kafka.network.Processor.run(SocketServer.scala:417)
	at java.lang.Thread.run(Thread.java:748)

Running 0.10.2.1 works fine so it really is a Kafka version issue.
Where it starts to get really weird is that once I connect to a 0.10.2.1 server and then restart kafka with the older version, faust no longer has any problems connecting.

@vineetgoel
Copy link
Contributor

@zackpayton I may have found the issue. Thanks a lot for adding the server.log. Looks like the error is on the CreateTopicRequest. Since this is not part of the the aiokafka or kafka-python library, this was added manually to the aiokafka driver in the faust code. I didn't end up making it compatible with new versions. Looks like 0.11 onwards we would be needing the Request version 2 instead of 1 (which 0.10) uses. I will make the change soon.

@vineetgoel vineetgoel self-assigned this Aug 30, 2018
@vineetgoel vineetgoel added this to the 1.1 milestone Aug 30, 2018
@vineetgoel vineetgoel changed the title Faust unable to consume from Kafka topic Faust CreateTopicRequest not compatible with future versions of Kafka. Aug 30, 2018
@vineetgoel vineetgoel changed the title Faust CreateTopicRequest not compatible with future versions of Kafka. Faust CreateTopicRequest not compatible with future versions of Kafka Aug 30, 2018
@vineetgoel
Copy link
Contributor

@zackpayton the reason it worked the second time is because the topic was already created (got created when you were on 0.10.2.1) this time so faust could move on.

Will fix this soon but for now I am gonna update the documentation to reflect that Faust is only compatible on versions 0.10.2.1 and above.

@zackpayton
Copy link
Author

zackpayton commented Sep 4, 2018

@vineetgoel Thanks man, looking forward to a fix.

For any others that run into this problem, manually creating the topic named:

<your-application-name>-__assignor-__leader

allowed me to bypass this problem. In my case it was faust-leader-__assignor-__leader.

@ask ask modified the milestones: 1.1, 1.2 Oct 4, 2018
@zackpayton
Copy link
Author

@vineetgoel So I noticed that when I use a table in my app, it emits the log entry:
[2018-10-16 16:24:54,284: INFO]: [^--Producer]: Creating topic faust-windower-flows-changelog

But then it hangs there and never times out or notifies the user that anything is wrong for the same older version of Kafka. Not sure if you want me to open a separate issue or not but I thought I'd check. I again manually create the topic and things proceed smoothly.

@ask ask modified the milestones: 1.2, 1.4 Nov 9, 2018
@ask ask modified the milestones: 1.4, 1.5 Dec 6, 2018
@ask ask modified the milestones: 1.5, 1.6 Mar 20, 2019
@danchemla
Copy link

We are facing a very similar issue. Here is the code we are running.

More details here under, but any fix is already known ?

[Note]: if I put wrong user and/or password, I got the same error

import os
import faust

KAFKA_USER = os.environ.get('KAFKA_USER')
KAFKA_PASSWORD = os.environ.get('KAFKA_PASSWORD')
KAFKA_URL = os.environ.get('KAFKA_URL')

app = faust.App('hello_world',
                broker='kafka://' + KAFKA_URL + '.confluent.cloud:9092',
                broker_credentials=faust.SASLCredentials(username=KAFKA_USER,
                                                         password=KAFKA_PASSWORD,))

greetings_topic = app.topic('greetings')


@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)

Here under is the complete traceback.
We tried to manually create the topic as explained in the issue, but it did not help.

ccloud kafka topic create greetings-__assignor-__leader

Looking at the code, in aiokafka/conn.py in function _read line 474, the reading of 4 bytes returns

resp = b'\x15\x03\x03\x00' which corresponds to int 352518912

File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 474, in _read
            resp = yield from reader.readexactly(4)
            size, = struct.unpack(">i", resp)
(test-faust) MYMACHINE:~/MYPASS$ faust -A hello_world  worker  -l debug
┌ƒaµS† v1.8.0─┬───────────────────────────────────────────────────────────────────┐
│ id          │ greetings                                                         │
│ transport   │ [URL('kafka://KAFKA_URL.confluent.cloud:9092')]  │
│ store       │ memory:                                                           │
│ web         │ http://my-machine:6066                                    │
│ log         │ -stderr- (debug)                                                  │
│ pid         │ 18155                                                             │
│ hostname    │ machine                                                │
│ platform    │ CPython 3.7.4 (Linux x86_64)                                      │
│ drivers     │                                                                   │
│   transport │ aiokafka=1.0.4                                                    │
│   web       │ aiohttp=3.6.2                                                     │
│ datadir     │ MYPASS/hello_world-data               │
│ appdir      │ MYPASS/hello_world-data/v1            │
└─────────────┴───────────────────────────────────────────────────────────────────┘
[2019-10-10 16:15:15,042] [18155] [INFO] [^Worker]: Starting...
[2019-10-10 16:15:15,044] [18155] [DEBUG] Using selector: EpollSelector
[2019-10-10 16:15:15,045] [18155] [INFO] [^-App]: Starting...
[2019-10-10 16:15:15,045] [18155] [INFO] [^--Monitor]: Starting...
[2019-10-10 16:15:15,045] [18155] [DEBUG] [^--Monitor]: Started.
[2019-10-10 16:15:15,045] [18155] [INFO] [^--Producer]: Starting...
[2019-10-10 16:15:15,045] [18155] [INFO] [^---ProducerBuffer]: Starting...
[2019-10-10 16:15:15,045] [18155] [DEBUG] [^---ProducerBuffer]: Started.
[2019-10-10 16:15:15,046] [18155] [DEBUG] Starting the Kafka producer
[2019-10-10 16:15:15,046] [18155] [DEBUG] Attempting to bootstrap via node at KAFKA_URL.confluent.cloud:9092
[2019-10-10 16:15:15,046] [18155] [DEBUG] Timer _main_keepalive woke up - iteration=0 time_spent=1.0000009820068954 drift=-9.820068953558803e-07 sleep_time=0.9999990179931046 since_epoch=9.820068953558803e-07
[2019-10-10 16:15:15,046] [18155] [DEBUG] Timer Monitor.sampler woke up - iteration=0 time_spent=1.0000005570036592 drift=-5.57003659196198e-07 sleep_time=0.9999994429963408 since_epoch=5.57003659196198e-07
[2019-10-10 16:15:15,065] [18155] [DEBUG] <AIOKafkaConnection host=KAFKA_URL.confluent.cloud port=9092> Request 1: SaslHandShakeRequest_v0(mechanism='PLAIN')
[2019-10-10 16:15:15,090] [18155] [DEBUG] Closing connection at KAFKA_URL.confluent.cloud:9092
[2019-10-10 16:15:15,091] [18155] [ERROR] [^Worker]: Error: ConnectionError('Connection at KAFKA_URL.confluent.cloud:9092 closed')
Traceback (most recent call last):
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 338, in _on_read_task_error
    read_task.result()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 478, in _read
    resp = yield from reader.readexactly(size)
  File "/usr/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 3 bytes read on a total of 352518912 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/worker.py", line 267, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/usr/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 727, in start
    await self._default_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 734, in _default_start
    await self._actually_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 758, in _actually_start
    await child.maybe_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 786, in maybe_start
    await self.start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 727, in start
    await self._default_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 734, in _default_start
    await self._actually_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 758, in _actually_start
    await child.maybe_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 786, in maybe_start
    await self.start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 727, in start
    await self._default_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 734, in _default_start
    await self._actually_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/mode/services.py", line 751, in _actually_start
    await self.on_start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 649, in on_start
    await producer.start()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 173, in start
    yield from self.client.bootstrap()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/client.py", line 205, in bootstrap
    version_hint=version_hint)
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 89, in create_conn
    yield from conn.connect()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 217, in connect
    yield from self._do_sasl_handshake()
  File "HOME/.virtualenvs/test-faust/lib/python3.7/site-packages/aiokafka/conn.py", line 248, in _do_sasl_handshake
    response = yield from self.send(sasl_handshake)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 442, in wait_for
    return fut.result()
kafka.errors.ConnectionError: ConnectionError: Connection at KAFKA_URL.confluent.cloud:9092 closed
[2019-10-10 16:15:15,095] [18155] [INFO] [^Worker]: Stopping...
[2019-10-10 16:15:15,095] [18155] [INFO] [^-App]: Stopping...
[2019-10-10 16:15:15,095] [18155] [INFO] [^-App]: Flush producer buffer...
[2019-10-10 16:15:15,095] [18155] [INFO] [^--TableManager]: Stopping...
[2019-10-10 16:15:15,096] [18155] [INFO] [^--Fetcher]: Stopping...
[2019-10-10 16:15:15,096] [18155] [DEBUG] [^--Fetcher]: Shutting down...
[2019-10-10 16:15:15,096] [18155] [DEBUG] [^--Fetcher]: -Stopped!
[2019-10-10 16:15:15,096] [18155] [DEBUG] [^--TableManager]: Shutting down...
[2019-10-10 16:15:15,096] [18155] [DEBUG] [^--TableManager]: -Stopped!
[2019-10-10 16:15:15,097] [18155] [INFO] [^--Conductor]: Stopping...
[2019-10-10 16:15:15,097] [18155] [DEBUG] [^--Conductor]: Shutting down...
[2019-10-10 16:15:15,097] [18155] [DEBUG] [^--Conductor]: -Stopped!
[2019-10-10 16:15:15,097] [18155] [INFO] [^--AgentManager]: Stopping...
[2019-10-10 16:15:15,098] [18155] [INFO] [^Agent: hello_world.greet]: Stopping...
[2019-10-10 16:15:15,098] [18155] [DEBUG] [^Agent: hello_world.greet]: Shutting down...
[2019-10-10 16:15:15,098] [18155] [DEBUG] [^Agent: hello_world.greet]: -Stopped!
[2019-10-10 16:15:15,098] [18155] [DEBUG] [^--AgentManager]: Shutting down...
[2019-10-10 16:15:15,098] [18155] [DEBUG] [^--AgentManager]: -Stopped!
[2019-10-10 16:15:15,099] [18155] [INFO] [^--ReplyConsumer]: Stopping...
[2019-10-10 16:15:15,099] [18155] [DEBUG] [^--ReplyConsumer]: Shutting down...
[2019-10-10 16:15:15,099] [18155] [DEBUG] [^--ReplyConsumer]: -Stopped!
[2019-10-10 16:15:15,099] [18155] [INFO] [^--LeaderAssignor]: Stopping...
[2019-10-10 16:15:15,099] [18155] [DEBUG] [^--LeaderAssignor]: Shutting down...
[2019-10-10 16:15:15,099] [18155] [DEBUG] [^--LeaderAssignor]: -Stopped!
[2019-10-10 16:15:15,100] [18155] [INFO] [^--Consumer]: Stopping...
[2019-10-10 16:15:15,100] [18155] [DEBUG] [^---MethodQueue]: Stopping...
[2019-10-10 16:15:15,100] [18155] [DEBUG] [^---MethodQueue]: Shutting down...
[2019-10-10 16:15:15,100] [18155] [DEBUG] [^---MethodQueue]: -Stopped!
[2019-10-10 16:15:15,101] [18155] [DEBUG] [^--Consumer]: Shutting down...
[2019-10-10 16:15:15,101] [18155] [DEBUG] [^--Consumer]: -Stopped!
[2019-10-10 16:15:15,101] [18155] [INFO] [^--Web]: Stopping...
[2019-10-10 16:15:15,101] [18155] [DEBUG] [^--Web]: Shutting down...
[2019-10-10 16:15:15,101] [18155] [DEBUG] [^--Web]: -Stopped!
[2019-10-10 16:15:15,102] [18155] [INFO] [^--CacheBackend]: Stopping...
[2019-10-10 16:15:15,102] [18155] [DEBUG] [^--CacheBackend]: Shutting down...
[2019-10-10 16:15:15,102] [18155] [DEBUG] [^--CacheBackend]: -Stopped!
[2019-10-10 16:15:15,102] [18155] [INFO] [^--Producer]: Stopping...
[2019-10-10 16:15:15,102] [18155] [DEBUG] The Kafka producer has closed.
[2019-10-10 16:15:15,102] [18155] [INFO] [^---ProducerBuffer]: Stopping...
[2019-10-10 16:15:15,103] [18155] [DEBUG] [^---ProducerBuffer]: Shutting down...
[2019-10-10 16:15:15,103] [18155] [DEBUG] [^---ProducerBuffer]: -Stopped!
[2019-10-10 16:15:15,103] [18155] [DEBUG] [^--Producer]: Shutting down...
[2019-10-10 16:15:15,104] [18155] [DEBUG] [^--Producer]: -Stopped!
[2019-10-10 16:15:15,104] [18155] [INFO] [^--Monitor]: Stopping...
[2019-10-10 16:15:15,104] [18155] [DEBUG] [^--Monitor]: Shutting down...
[2019-10-10 16:15:15,104] [18155] [DEBUG] [^--Monitor]: -Stopped!
[2019-10-10 16:15:15,105] [18155] [DEBUG] [^-App]: Shutting down...
[2019-10-10 16:15:15,105] [18155] [DEBUG] [^-App]: -Stopped!
[2019-10-10 16:15:15,105] [18155] [DEBUG] [^Worker]: Shutting down...
[2019-10-10 16:15:15,106] [18155] [DEBUG] [^Worker]: -Stopped!
[2019-10-10 16:15:15,106] [18155] [INFO] [^Worker]: Gathering service tasks...
[2019-10-10 16:15:15,106] [18155] [INFO] [^Worker]: Gathering all futures...
[2019-10-10 16:15:16,108] [18155] [INFO] [^Worker]: Closing event loop

@filippovitale
Copy link

filippovitale commented Jan 14, 2020

To move a step "forward" I added:

import ssl

...
    broker_credentials=faust.SASLCredentials(
        username=***,
        password=***,
        ssl_context=ssl.create_default_context(),
    ))
...

However, I got stuck on the error:

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056)
...
Unable connect to "*****.confluent.cloud:9092": [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056)
...
Error: ConnectionError("Unable to bootstrap from [('*****.confluent.cloud', 9092, <AddressFamily.AF_UNSPEC: 0>)]")

EDIT

Fixing the certificate validation error with:

import ssl
import certifi

ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(cafile=certifi.where())

app = faust.App(
    broker=f"kafka://{KAFKA_BROKERS}",
    broker_credentials=faust.SASLCredentials(
        username=KAFKA_KEY,
        password=KAFKA_SECRET,
        ssl_context=ssl_context,

Unfortunately I get this error: RuntimeError: await wasn't used with future

@filippovitale
Copy link

On the other hand, it seems that using the confluent:// transport to connect to the Confluent Cloud could be a better approach.

As specified here: https://faust.readthedocs.io/en/latest/userguide/settings.html#available-transports

Experimental transport using the confluent-kafka client.

Limitations: Does not do sticky partition assignment (not
suitable for tables), and do not create any necessary internal topics (you have to create them manually).

@irux
Copy link

irux commented Jan 22, 2020

@filippovitale I have the same problem. Do you know how to fix it ?

@deed02392
Copy link

deed02392 commented Jan 30, 2020

@filippovitale and @irux, there is a fix, it's a bug in aiokafka and you need to patch conn.py to remove async from def _step around line 500. I think it's the saslbase class.

@stereobutter
Copy link

As far as I can see in aiokafka step returns a Future via loop.run_in_executor which is correctly awaited by the calling code in AIOKafkaConnection's _do_sasl_handshake method

@deed02392
Copy link

@SaschaSchlemmer In the file you linked to, there is no async def on _step, which is also what you get if you follow my fix instructions.

@stereobutter
Copy link

@deed02392 I though you meant that the issues is also present in the original aiokafka (which is not the case). See also robinhood/aiokafka#15

@deed02392
Copy link

deed02392 commented Feb 4, 2020

Hmm, you are correct, I think my mistake was assuming that what I have in site-packages/aiokafka came from aio-libs when actually it came from pip install faust, that being the robinhood version of aiokafka.

@databasedav
Copy link

I was having this problem trying to connect Faust to Azure Event Hubs and describe an easy temporary fix if you're using Docker here.

@ask ask modified the milestones: 1.6, 1.11 Feb 27, 2020
@bencleary
Copy link

By combining a few answers, I have connected to the confluent cloud using the following code:

import faust
from faust.types.auth import AuthProtocol
from aiokafka.helpers import create_ssl_context

broker_credentials=faust.SASLCredentials(
    mechanism=faust.types.auth.SASLMechanism.PLAIN,
    ssl_context=create_ssl_context(),
    username='XXX',
    password='XXX'
)

broker_credentials.protocol = AuthProtocol.SASL_SSL

app = faust.App(
    "threshold-detection",
    broker="kafka://XXX.confluent.cloud:9092",
    value_serializer="json",
    broker_credentials=broker_credentials
)

test_topic = app.topic("my-topic")

@app.agent(test_topic)
async def process_topic(messages):
    async for message in messages:
        print(message)

I also had to manually create the leader topic in confluent see this - #154 (comment)

It works fine now, not ideal with the creation of the topic manually but its a workaround at least for smaller projects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants