-
Notifications
You must be signed in to change notification settings - Fork 926
Fix error propagation rule for Python's C API #2019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c5bdee6
cb71833
42714b5
5f29f12
e693fac
f8dc43a
7d0822d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1575,6 +1575,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, | |
if (result) | ||
Py_DECREF(result); | ||
else { | ||
|
||
CallState_fetch_exception(cs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are other places as well where we need to handle this. For example |
||
CallState_crash(cs); | ||
rd_kafka_yield(rk); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,8 +275,66 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg); | |
typedef struct { | ||
PyThreadState *thread_state; | ||
int crashed; /* Callback crashed */ | ||
PyObject *exception_type; /* Stored exception type */ | ||
PyObject *exception_value; /* Stored exception value */ | ||
PyObject *exception_traceback; /* Stored exception traceback */ | ||
} CallState; | ||
|
||
/** | ||
* @brief Compatibility layer for Python exception handling API changes. | ||
* PyErr_Fetch/PyErr_Restore were deprecated in Python 3.12 in favor of | ||
* PyErr_GetRaisedException/PyErr_SetRaisedException. | ||
*/ | ||
|
||
static inline void | ||
cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) { | ||
#if PY_VERSION_HEX >= 0x030c0000 | ||
/* Python 3.12+ - use new API */ | ||
PyObject *exc = PyErr_GetRaisedException(); | ||
if (exc) { | ||
*exc_type = (PyObject *)Py_TYPE(exc); | ||
Py_INCREF(*exc_type); | ||
*exc_value = exc; | ||
*exc_traceback = PyException_GetTraceback(exc); | ||
Comment on lines
+295
to
+298
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need Check
|
||
} else { | ||
*exc_type = *exc_value = *exc_traceback = NULL; | ||
} | ||
#else | ||
/* Python < 3.12 - use legacy API */ | ||
PyErr_Fetch(exc_type, exc_value, exc_traceback); | ||
#endif | ||
} | ||
|
||
static inline void | ||
cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) { | ||
#if PY_VERSION_HEX >= 0x030c0000 | ||
/* Python 3.12+ - use new API */ | ||
if (exc_value) { | ||
PyErr_SetRaisedException(exc_value); | ||
Py_XDECREF(exc_type); | ||
Py_XDECREF(exc_traceback); | ||
} | ||
#else | ||
/* Python < 3.12 - use legacy API */ | ||
PyErr_Restore(exc_type, exc_value, exc_traceback); | ||
#endif | ||
} | ||
|
||
static inline void | ||
CallState_fetch_exception(CallState *cs) { | ||
cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
} | ||
|
||
static inline void | ||
CallState_restore_exception(CallState *cs) { | ||
if (cs->exception_type) { | ||
cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback); | ||
cs->exception_type = NULL; | ||
cs->exception_value = NULL; | ||
cs->exception_traceback = NULL; | ||
} | ||
} | ||
|
||
/** | ||
* @brief Initialiase a CallState and unlock the GIL prior to a | ||
* possibly blocking external call. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,8 @@ | |
def dummy_commit_cb(err, partitions): | ||
pass | ||
|
||
kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100', | ||
Check failure on line 23 in tests/test_Consumer.py
|
||
'session.timeout.ms': 1000, # Avoid close() blocking too long | ||
'on_commit': dummy_commit_cb}) | ||
|
||
kc.subscribe(["test"]) | ||
|
@@ -324,3 +324,38 @@ | |
with pytest.raises(ValueError) as ex: | ||
TestConsumer({'bootstrap.servers': "mybroker:9092"}) | ||
assert ex.match('group.id must be set') | ||
|
||
|
||
def test_callback_exception_no_system_error(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest adding more test cases related to librdkafka layer, C binding layer and python layer. |
||
|
||
exception_raised = [] | ||
|
||
def error_cb_that_raises(error): | ||
"""Error callback that raises an exception""" | ||
exception_raised.append(error) | ||
raise RuntimeError("Test exception from error_cb") | ||
|
||
# Create consumer with error callback that raises exception | ||
consumer = TestConsumer({ | ||
'group.id': 'test-callback-systemerror-fix', | ||
'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error | ||
'socket.timeout.ms': 100, | ||
'session.timeout.ms': 1000, | ||
'error_cb': error_cb_that_raises | ||
}) | ||
|
||
consumer.subscribe(['test-topic']) | ||
|
||
# This should trigger the error callback due to connection failure | ||
# Before fix: Would get RuntimeError + SystemError (Issue #865) | ||
# After fix: Should only get RuntimeError (no SystemError) | ||
with pytest.raises(RuntimeError) as exc_info: | ||
consumer.consume(timeout=0.1) | ||
|
||
# Verify we got the expected exception message | ||
assert "Test exception from error_cb" in str(exc_info.value) | ||
|
||
# Verify the error callback was actually called | ||
assert len(exception_raised) > 0 | ||
|
||
consumer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra line. From other places as well.