Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka lag metrics gives incorrect value #21134

Open
fpytloun opened this issue Aug 22, 2024 · 4 comments
Open

Kafka lag metrics gives incorrect value #21134

fpytloun opened this issue Aug 22, 2024 · 4 comments
Labels
domain: observability Anything related to monitoring/observing Vector source: kafka Anything `kafka` source related type: bug A code related bug.

Comments

@fpytloun
Copy link
Contributor

fpytloun commented Aug 22, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I am working on vector dashboards and I noticed that kafka lag at some point spiked up probably due to some kafka or Elasticsearch glitch. But everything is processing just fine, it is also not growing, just settled on different level. And I don't see any delay in logs delivery either. Also digged deeper into per-partition metrics and it is similar for all (I wanted to make sure it is not just some partitions being stucked).

This chart shows amount of time (given current processing rate) needed to process all unconsumed messages. It would mean we have 1 hour delay which is not true, we have less than 5 minutes.
When I restarted one of vectors, it went back down to 5 minutes level.

image

I tried to confirm it is vector/rdkafka issue by matching with kafka-lag-exporter metrics and it shows correct value.

Another interesting thing I found is metrics for partition_id -1 being a negative value 😯
image

Configuration

No response

Version

0.39.0

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

@fpytloun fpytloun added the type: bug A code related bug. label Aug 22, 2024
@jszwedko
Copy link
Member

Interesting, thanks for this report @fpytloun. Given the values don't match up with kafka-lag-exporter it does seem like a potential bug in Vector or rust-rdkafka or librdkafka.

Vector just publishes the metrics returned by rust-rdkafka:

pub struct KafkaStatisticsReceived<'a> {
pub statistics: &'a rdkafka::Statistics,
pub expose_lag_metrics: bool,
}
impl InternalEvent for KafkaStatisticsReceived<'_> {
fn emit(self) {
gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
counter!("kafka_produced_messages_bytes_total")
.absolute(self.statistics.txmsg_bytes as u64);
counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
counter!("kafka_consumed_messages_bytes_total")
.absolute(self.statistics.rxmsg_bytes as u64);
if self.expose_lag_metrics {
for (topic_id, topic) in &self.statistics.topics {
for (partition_id, partition) in &topic.partitions {
gauge!(
"kafka_consumer_lag",
"topic_id" => topic_id.clone(),
"partition_id" => partition_id.to_string(),
)
.set(partition.consumer_lag as f64);
}
}
}
}
}

I wonder if the measurement librdkafka is using is different than what kafka-lag-exporter measures? librdkafka documents it as:

Difference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset.

https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md

If would be helpful, I could add a log message around here to log the data coming from librdkafka:

vector/src/kafka.rs

Lines 175 to 185 in fe2cc26

impl ClientContext for KafkaStatisticsContext {
fn stats(&self, statistics: Statistics) {
// This callback get executed on a separate thread within the rdkafka library, so we need
// to propagate the span here to attach the component tags to the emitted events.
let _entered = self.span.enter();
emit!(KafkaStatisticsReceived {
statistics: &statistics,
expose_lag_metrics: self.expose_lag_metrics,
});
}
}

Vector just ends up using the return values to emit metrics:

https://github.com/vectordotdev/vector/blob/master/src/internal_events/kafka.rs#L119-L152

@jszwedko jszwedko added domain: observability Anything related to monitoring/observing Vector source: kafka Anything `kafka` source related labels Aug 23, 2024
@fpytloun
Copy link
Contributor Author

fpytloun commented Dec 3, 2024

I am digging a bit deeper and I think this is related to consumer group rebalances and partition re-assignments. It seems that if rebalance occurs, original vector instance that was originally consuming topic partition still reports last lag value.
Here is an example in deployment where there are two regions and 3 vector replicas in each region. After Kafka hiccup and some reassignments of one of partitions there are two instances that expose metric for this topic partition, one is stale (until that pod is restarted to it stops exposing this metric) and another one shows normal value:

image

Fix is to stop exposing metric of topic partition that is no longer consumed by given vector instance.

Workaround is to pick non-negative (as there is still weird -1 value sometimes) min value of component_id,topic_id,partition_id` like this:

min(vector_kafka_consumer_lag > 0) by (component_id,topic_id,partition_id)

@jszwedko
Copy link
Member

jszwedko commented Dec 3, 2024

Aaah, interesting, nice find @fpytloun

@sam6258
Copy link
Contributor

sam6258 commented Dec 12, 2024

I think this is caused by #22006

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: observability Anything related to monitoring/observing Vector source: kafka Anything `kafka` source related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

3 participants