Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can the ConsumerStream implement Clone? #4267

Open
sagojez opened this issue Nov 19, 2024 · 6 comments
Open

Can the ConsumerStream implement Clone? #4267

sagojez opened this issue Nov 19, 2024 · 6 comments
Assignees

Comments

@sagojez
Copy link

sagojez commented Nov 19, 2024

Hello,

I frequently encounter the need to write code like the following (whether using Fluvio or Kafka):

async fn process_chunk_with_shutdown<'a>(
    &self,
    ...
) -> Result<Unit, Error> {
    match chunk {
        Ok(chunk) => {
            tokio::select! {
                _ = subsys.on_shutdown_requested() => {
                    // Perform shutdown-related actions here.
                },
                _ = self.process_chunk(ctx, *target, chunk.as_ref()) => {
                   ...
                },
            }
            Ok(())
        }
        Err(e) => ...
    }
}

async fn consume_chunks<'a>(
    &self,
    ...
) -> Result<Unit, Error> {
    let mut stream = ...

    stream
        .try_ready_chunks(consumer_batch_size)
        .map(|chunk| {
            async move {
                self.process_chunk_with_shutdown(chunk, ctx, target, subsys)
                    .await
            }
        })
        .buffered(consumer_batch_size)
        .collect::<Vec<_>>()
        .await;

    Ok(())
}

In essence, trying to read from a stream and process its chunks. However, with Fluvio I'm currently facing a limitation: the ConsumerStream isn't clonable, so I can't wrap it in an Arc and/or pass it by reference to a StreamIterator that consumes it (such as try_ready_chunks).

As far as I can tell, the only way to consume records via the Rust client while manually committing offsets is to process them one by one [next].

Would it be reasonable to implement Clone for ConsumerStream, or perhaps provide an operator that allows batch processing? If I'm missing something here, I’d greatly appreciate clarification. I'd also be happy to contribute if there’s something I can help with!

Thanks in advance for your time and support!

@sehz
Copy link
Contributor

sehz commented Nov 20, 2024

Both MultiplePartition Consumer and PartitionConsumer implements clone already. Am I missing something?

Also there is API to return stream of batches already. You just need to use PartitionConsumer https://docs.rs/fluvio/0.24.0/fluvio/consumer/struct.PartitionConsumer.html#method.stream_batches_with_config since batches only works with partition

@sagojez
Copy link
Author

sagojez commented Nov 20, 2024

Hi @sehz, thanks for getting back to me! The reason I didn’t mention those APIs is that they’re all deprecated. The new recommended approach doesn’t allow for cloning, which is why I didn’t explore it further.

I ended up implementing the code using PartitionConsumer. I was hoping to find a way to consume in batches at the topic level without having to worry too much about partition-specific details.

@sehz
Copy link
Contributor

sehz commented Nov 20, 2024

Curiously, why do want to process at batch level? It is low level optimization that shouldn't be done at application layer similar to application shouldn't deal with file blocks. Topic is not actually physical representation in the fluvio (similar to Kafka). Topics are actually made of partitions and SPU really works with partitions (to be precise, replica). So when you are consuming topic, you are consuming records coming from different partitions where batch doesn't make sense.

@fraidev, let's un-deprecate https://docs.rs/fluvio/0.24.0/fluvio/consumer/struct.PartitionConsumer.html#method.stream_batches_with_config API since we don't have alternative way to get batches with new API. We still need to keep this

@sagojez
Copy link
Author

sagojez commented Nov 21, 2024

Thank you @sehz for the response and explanation! I'm familiar with the construct and how it works—I've worked with Kafka for a while, so coming to Fluvio I felt at home. That said, the use case I presented was, to my knowledge, quite standard. Working at a batch level can be valuable, particularly in scenarios where strict ordering isn’t important (say for example cases where you don't even need a seq_nr but just to react to individual events in real time). Batches allow you to maximize the number of elements processed concurrently.

The reason this is particularly important (to me at least) is that in systems where you don't block but "keep moving forward"—and simply send unprocessable events to a dead letter queue—processing events individually rather than in batches can become a significant bottleneck.

@sehz
Copy link
Contributor

sehz commented Nov 21, 2024

Understood. Thanks for feedback!

@fraidev
Copy link
Contributor

fraidev commented Jan 8, 2025

The stream_batches_with_config was un-deprecated with #4272

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants