-
Notifications
You must be signed in to change notification settings - Fork 487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can the ConsumerStream
implement Clone
?
#4267
Comments
Both MultiplePartition Consumer and PartitionConsumer implements Also there is API to return stream of batches already. You just need to use |
Hi @sehz, thanks for getting back to me! The reason I didn’t mention those APIs is that they’re all deprecated. The new recommended approach doesn’t allow for cloning, which is why I didn’t explore it further. I ended up implementing the code using PartitionConsumer. I was hoping to find a way to consume in batches at the topic level without having to worry too much about partition-specific details. |
Curiously, why do want to process at batch level? It is low level optimization that shouldn't be done at application layer similar to application shouldn't deal with file blocks. Topic is not actually physical representation in the fluvio (similar to Kafka). Topics are actually made of partitions and SPU really works with partitions (to be precise, replica). So when you are consuming topic, you are consuming records coming from different partitions where batch doesn't make sense. @fraidev, let's un-deprecate https://docs.rs/fluvio/0.24.0/fluvio/consumer/struct.PartitionConsumer.html#method.stream_batches_with_config API since we don't have alternative way to get batches with new API. We still need to keep this |
Thank you @sehz for the response and explanation! I'm familiar with the construct and how it works—I've worked with Kafka for a while, so coming to Fluvio I felt at home. That said, the use case I presented was, to my knowledge, quite standard. Working at a batch level can be valuable, particularly in scenarios where strict ordering isn’t important (say for example cases where you don't even need a seq_nr but just to react to individual events in real time). Batches allow you to maximize the number of elements processed concurrently. The reason this is particularly important (to me at least) is that in systems where you don't block but "keep moving forward"—and simply send unprocessable events to a dead letter queue—processing events individually rather than in batches can become a significant bottleneck. |
Understood. Thanks for feedback! |
The |
Hello,
I frequently encounter the need to write code like the following (whether using Fluvio or Kafka):
In essence, trying to read from a stream and process its chunks. However, with Fluvio I'm currently facing a limitation: the
ConsumerStream
isn't clonable, so I can't wrap it in anArc
and/or pass it by reference to aStreamIterator
that consumes it (such astry_ready_chunks
).As far as I can tell, the only way to consume records via the Rust client while manually committing offsets is to process them one by one [
next
].Would it be reasonable to implement
Clone
forConsumerStream
, or perhaps provide an operator that allows batch processing? If I'm missing something here, I’d greatly appreciate clarification. I'd also be happy to contribute if there’s something I can help with!Thanks in advance for your time and support!
The text was updated successfully, but these errors were encountered: