Replies: 1 comment
-
Hey! 👋 I guess it's a bit of a contrived use case if you need to stop Benthos and #1069 might've been a potential way to approach the problem, but it got abandoned. As a hack, you could look into using a It's also possible to craft your own custom input by importing Benthos as a library if you're happy to write a bit of Go. You can sorta' copy / paste the existing one including its dependencies.
No idea how to do it either, but if the underlying library offers some mechanism for it, then it can be exposed as new config fields. PRs are welcome! |
Beta Was this translation helpful? Give feedback.
-
Hello,
I'm in the initial stages of evaluating benthos. We are looking to use the Kafka input to read from a topic. One piece of functionality we are looking to implement is being able to do one-off replays of data using timestamps. For example, read all messages between time T1 and T2 and send those to the configured output. Then after passing T2, I need to stop and shutdown the benthos instances.
In the
kafka_franz
input, per https://www.benthos.dev/docs/components/inputs/kafka_franz#topics, it looks like I can specify offsets for each partition. I think I could use that and have a separate process that calculates offsets for each partition based on a timestamp, though I'll have to investigate how to do that. Assuming that works, I now need to be able to signal a shutdown after all partitions passed a specific message timestamp (T2). I see that the input adds metadata, particularlykafka_timestamp_unix
which could be helpful, but I'm unsure how to detect my shutdown condition and actually trigger the shutdown across all instances for my one-off execution (I plan to use Keda for autoscaling so there may be more than one instance running).Any initial advice as I continue my evaluation?
Thanks
Beta Was this translation helpful? Give feedback.
All reactions