Replies: 1 comment
-
Thanks for this writeup, @jches. It has been a while since I looked at the details of these subsystems, but at this level, this all sounds reasonable. I particularly appreciate the incremental approach detailed here to deal with each problem area separately. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi team! A few months ago I opened #14761 and started on a PR implementing a fix (see #14846). My focus at work shifted away to other things for a while but I want to come back to this and start a discussion about the approach I started on in that PR to get some feedback. Rather than resurrect that PR and resolve all of the conflicts that there might be, I'd like to split the changes into a few different logical parts, which I'll describe below.
High-level review of the problem
First, to recap the problem:
When acknowledgements are enabled and vector receives a shutdown signal, some sources may shut down before processing pending acknowledgements. I've observed this in the kafka and file sources, but this is very likely true for any source that uses a
FinalizerSet
for acknowledgements (so in addition to file and kafka, the splunk, journald, gcp pubsub, aws sqs, and amqp sources probably have this behavior).FinalizerSet
takes aShutdownSignal
and when that signal arrives it stops processing acknowledgements that may be waiting in itsnew_entries
channelFor the kafka source, we see this problem at shutdown as well as a very similar problem that can happen during consumer group rebalance events. In the rebalance case, it is the kafka source's main task which is not able to process acknowledgements during a rebalance, because the kafka client and acknowledgement stream are handled in the same task loop. I believe we can fix both of these at the same time but it will take a little refactoring of the kafka source.
Proposed changes
The first 2 items here could be a single PR, and the basic idea behind this was discussed in the previous PR. The kafka source changes are a little more invasive in order to address the rebalance issue too, and merit a separate PR:
Update
FinalizerSet
to accept anOption<ShutdownSignal>
and only handle the shutdown signal if it has aSome
. All sources that use this will be updated to pass inSome
, preserving existing behavior. Unfortunately this can't just be removed, because some sources may be unintentionally relying on the acknowledgement stream ending on the shutdown signal for proper shutdown (e.g. the file source hangs at exit if some other stream handling is not also fixed). See discussion about this in the previous PR here: fix(kafka/file source): avoid duplicate message processing caused by acknowledgement stream shutdown handling #14846 (comment)Update the file source to pass
None
to itsFinalizerSet
and fix stream handlingUpdate the kafka source to pass
None
to itsFinalizerSet
, and also to handle messages (i.e. drive the main kafka client) and process acknowledgements on separate tasks, so that during a rebalance event, acknowledgements can be handled. Separate tasks are required so that while the kafka client task drives the rebalance process, the acknowledgement task can continue draining acknowledgementsDuring a rebalance that revokes partitions from a given vector instance, the acknowledgement task needs to coordinate with the kafka client's rebalance handler
If it can be done without too much performance impact, handling separate acknowledgement streams for each partition is beneficial. A group rebalance can revoke a subset of the partitions a client is handling, and processing acks for each partition through dedicated streams makes it easy to know when all of the acknowledgements for the particular subset being revoked have been processed. Tokio's
StreamMap
or something like it should be a reasonable thing to use for this, I thinkThe coordination sequence should look something like this:
pre_rebalance
handler is notified of a set of partitions being removedFinalizerSet
for each of the revoking partitions; no new entries can be added for these partitions at this point and any pending acks will be drained in the subsequent stepspre_rebalance
handler, allowing the rebalance process to proceedrebalance_drain_ms
setting for the kafka source)I don't have expertise in (or access to infrastructure for testing) all of the other affected sources, they would continue to work as they currently do after the change in 1) but could hopefully be updated at some point by other folks with the knowledge and resources to do so
Let me know if this sounds reasonable or if there are any big red flags here. I have most of this implemented against an older version of vector if seeing code helps (https://github.com/vectordotdev/vector/compare/master...jches:vector:kafka-rebalance-ack-draining?diff=split) but I wanted to get some feedback/discussion before getting back into that branch and updating it against the latest commits.
Beta Was this translation helpful? Give feedback.
All reactions