-
Notifications
You must be signed in to change notification settings - Fork 535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faust CreateTopicRequest not compatible with future versions of Kafka #154
Comments
Very strange, I haven't seen this error here, it seems almost like it's a Kafka server error. |
My many apologies. Our Kafka version is actually 0.10.1.0 (sorry we don't actually manage our kafka infrastructure).
Running 0.10.2.1 works fine so it really is a Kafka version issue. |
@zackpayton I may have found the issue. Thanks a lot for adding the server.log. Looks like the error is on the |
@zackpayton the reason it worked the second time is because the topic was already created (got created when you were on 0.10.2.1) this time so faust could move on. Will fix this soon but for now I am gonna update the documentation to reflect that Faust is only compatible on versions 0.10.2.1 and above. |
@vineetgoel Thanks man, looking forward to a fix. For any others that run into this problem, manually creating the topic named:
allowed me to bypass this problem. In my case it was |
@vineetgoel So I noticed that when I use a table in my app, it emits the log entry: But then it hangs there and never times out or notifies the user that anything is wrong for the same older version of Kafka. Not sure if you want me to open a separate issue or not but I thought I'd check. I again manually create the topic and things proceed smoothly. |
We are facing a very similar issue. Here is the code we are running. More details here under, but any fix is already known ? [Note]: if I put wrong user and/or password, I got the same error
Here under is the complete traceback.
Looking at the code, in aiokafka/conn.py in function _read line 474, the reading of 4 bytes returns resp = b'\x15\x03\x03\x00' which corresponds to int 352518912
|
To move a step "forward" I added:
However, I got stuck on the error:
EDIT Fixing the certificate validation error with:
Unfortunately I get this error: |
On the other hand, it seems that using the As specified here: https://faust.readthedocs.io/en/latest/userguide/settings.html#available-transports
|
@filippovitale I have the same problem. Do you know how to fix it ? |
@filippovitale and @irux, there is a fix, it's a bug in aiokafka and you need to patch conn.py to remove async from def _step around line 500. I think it's the saslbase class. |
As far as I can see in aiokafka |
@SaschaSchlemmer In the file you linked to, there is no |
@deed02392 I though you meant that the issues is also present in the original aiokafka (which is not the case). See also robinhood/aiokafka#15 |
Hmm, you are correct, I think my mistake was assuming that what I have in |
I was having this problem trying to connect Faust to Azure Event Hubs and describe an easy temporary fix if you're using Docker here. |
By combining a few answers, I have connected to the confluent cloud using the following code: import faust
from faust.types.auth import AuthProtocol
from aiokafka.helpers import create_ssl_context
broker_credentials=faust.SASLCredentials(
mechanism=faust.types.auth.SASLMechanism.PLAIN,
ssl_context=create_ssl_context(),
username='XXX',
password='XXX'
)
broker_credentials.protocol = AuthProtocol.SASL_SSL
app = faust.App(
"threshold-detection",
broker="kafka://XXX.confluent.cloud:9092",
value_serializer="json",
broker_credentials=broker_credentials
)
test_topic = app.topic("my-topic")
@app.agent(test_topic)
async def process_topic(messages):
async for message in messages:
print(message) I also had to manually create the leader topic in confluent see this - #154 (comment) It works fine now, not ideal with the creation of the topic manually but its a workaround at least for smaller projects. |
Checklist
master
branch of Faust.Steps to reproduce
When running the following simple program, Faust fails to consume from a Kafka topic:
Expected behavior
I would expect this program to start printing messages from my kafka topic.
Actual behavior
A full stacktrace is thrown.
The text was updated successfully, but these errors were encountered: