librdkafka may be configured to emit internal metrics at a fixed interval
by setting the statistics.interval.ms
configuration property to a value > 0
and registering a stats_cb
(or similar, depending on language).
The stats are provided as a JSON object string.
Note: The metrics returned may not be completely consistent between
brokers, toppars and totals, due to the internal asynchronous
nature of librdkafka.
E.g., the top level tx
total may be less than the sum of
the broker tx
values which it represents.
All fields that contain sizes are are in bytes unless otherwise noted.
{
<Top-level fields>
"brokers": {
<brokers fields>,
"toppars": { <toppars fields> }
},
"topics": {
<topic fields>,
"partitions": {
<partitions fields>
}
}
[, "cgrp": { <cgrp fields> } ]
[, "eos": { <eos fields> } ]
}
Fields are represented as follows:
- string - UTF8 string.
- int - Integer counter (64 bits wide). Ever increasing.
- int gauge - Integer gauge (64 bits wide). Will be reset to 0 on each stats emit.
- object - Nested JSON object.
- bool -
true
orfalse
.
Field | Type | Example | Description |
---|---|---|---|
name | string | "rdkafka#producer-1" |
Handle instance name |
client_id | string | "rdkafka" |
The configured (or default) client.id |
type | string | "producer" |
Instance type (producer or consumer) |
ts | int | 12345678912345 | librdkafka's internal monotonic clock (micro seconds) |
time | int | Wall clock time in seconds since the epoch | |
replyq | int gauge | Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll() | |
msg_cnt | int gauge | Current number of messages in producer queues | |
msg_size | int gauge | Current total size of messages in producer queues | |
msg_max | int | Threshold: maximum number of messages allowed allowed on the producer queues | |
msg_size_max | int | Threshold: maximum total size of messages allowed on the producer queues | |
tx | int | Total number of requests sent to Kafka brokers | |
tx_bytes | int | Total number of bytes transmitted to Kafka brokers | |
rx | int | Total number of responses received from Kafka brokers | |
rx_bytes | int | Total number of bytes received from Kafka brokers | |
txmsgs | int | Total number of messages transmitted (produced) to Kafka brokers | |
txmsg_bytes | int | Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers | |
rxmsgs | int | Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers. | |
rxmsg_bytes | int | Total number of message bytes (including framing) received from Kafka brokers | |
simple_cnt | int gauge | Internal tracking of legacy vs new consumer API state | |
metadata_cache_cnt | int gauge | Number of topics in the metadata cache. | |
brokers | object | Dict of brokers, key is broker name, value is object. See brokers below | |
topics | object | Dict of topics, key is topic name, value is object. See topics below | |
cgrp | object | Consumer group metrics. See cgrp below | |
eos | object | EOS / Idempotent producer state and metrics. See eos below |
Per broker statistics.
Field | Type | Example | Description |
---|---|---|---|
name | string | "example.com:9092/13" |
Broker hostname, port and broker id |
nodeid | int | 13 | Broker id (-1 for bootstraps) |
nodename | string | "example.com:9092" |
Broker hostname |
source | string | "configured" |
Broker source (learned, configured, internal, logical) |
state | string | "UP" |
Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE) |
stateage | int gauge | Time since last broker state change (microseconds) | |
outbuf_cnt | int gauge | Number of requests awaiting transmission to broker | |
outbuf_msg_cnt | int gauge | Number of messages awaiting transmission to broker | |
waitresp_cnt | int gauge | Number of requests in-flight to broker awaiting response | |
waitresp_msg_cnt | int gauge | Number of messages in-flight to broker awaitign response | |
tx | int | Total number of requests sent | |
txbytes | int | Total number of bytes sent | |
txerrs | int | Total number of transmission errors | |
txretries | int | Total number of request retries | |
req_timeouts | int | Total number of requests timed out | |
rx | int | Total number of responses received | |
rxbytes | int | Total number of bytes received | |
rxerrs | int | Total number of receive errors | |
rxcorriderrs | int | Total number of unmatched correlation ids in response (typically for timed out requests) | |
rxpartial | int | Total number of partial MessageSets received. The broker may return partial responses if the full MessageSet could not fit in remaining Fetch response size. | |
req | object | Request type counters. Object key is the request name, value is the number of requests sent. | |
zbuf_grow | int | Total number of decompression buffer size increases | |
buf_grow | int | Total number of buffer size increases (deprecated, unused) | |
wakeups | int | Broker thread poll wakeups | |
connects | int | Number of connection attempts, including successful and failed, and name resolution failures. | |
disconnects | int | Number of disconnects (triggered by broker, network, load-balancer, etc.). | |
int_latency | object | Internal producer queue latency in microseconds. See Window stats below | |
outbuf_latency | object | Internal request queue latency in microseconds. This is the time between a request is enqueued on the transmit (outbuf) queue and the time the request is written to the TCP socket. Additional buffering and latency may be incurred by the TCP stack and network. See Window stats below | |
rtt | object | Broker latency / round-trip time in microseconds. See Window stats below | |
throttle | object | Broker throttling time in milliseconds. See Window stats below | |
toppars | object | Partitions handled by this broker handle. Key is "topic-partition". See brokers.toppars below |
Rolling window statistics. The values are in microseconds unless otherwise stated.
Field | Type | Example | Description |
---|---|---|---|
min | int gauge | Smallest value | |
max | int gauge | Largest value | |
avg | int gauge | Average value | |
sum | int gauge | Sum of values | |
cnt | int gauge | Number of values sampled | |
stddev | int gauge | Standard deviation (based on histogram) | |
hdrsize | int gauge | Memory size of Hdr Histogram | |
p50 | int gauge | 50th percentile | |
p75 | int gauge | 75th percentile | |
p90 | int gauge | 90th percentile | |
p95 | int gauge | 95th percentile | |
p99 | int gauge | 99th percentile | |
p99_99 | int gauge | 99.99th percentile | |
outofrange | int gauge | Values skipped due to out of histogram range |
Topic partition assigned to broker.
Field | Type | Example | Description |
---|---|---|---|
topic | string | "mytopic" |
Topic name |
partition | int | 3 | Partition id |
Field | Type | Example | Description |
---|---|---|---|
topic | string | "myatopic" |
Topic name |
age | int gauge | Age of client's topic object (milliseconds) | |
metadata_age | int gauge | Age of metadata from broker for this topic (milliseconds) | |
batchsize | object | Batch sizes in bytes. See Window stats· | |
batchcnt | object | Batch message counts. See Window stats· | |
partitions | object | Partitions dict, key is partition id. See partitions below. |
Field | Type | Example | Description |
---|---|---|---|
partition | int | 3 | Partition Id (-1 for internal UA/UnAssigned partition) |
broker | int | The id of the broker that messages are currently being fetched from | |
leader | int | Current leader broker id | |
desired | bool | Partition is explicitly desired by application | |
unknown | bool | Partition not seen in topic metadata from broker | |
msgq_cnt | int gauge | Number of messages waiting to be produced in first-level queue | |
msgq_bytes | int gauge | Number of bytes in msgq_cnt | |
xmit_msgq_cnt | int gauge | Number of messages ready to be produced in transmit queue | |
xmit_msgq_bytes | int gauge | Number of bytes in xmit_msgq | |
fetchq_cnt | int gauge | Number of pre-fetched messages in fetch queue | |
fetchq_size | int gauge | Bytes in fetchq | |
fetch_state | string | "active" |
Consumer fetch state for this partition (none, stopping, stopped, offset-query, offset-wait, active). |
query_offset | int gauge | Current/Last logical offset query | |
next_offset | int gauge | Next offset to fetch | |
app_offset | int gauge | Offset of last message passed to application + 1 | |
stored_offset | int gauge | Offset to be committed | |
committed_offset | int gauge | Last committed offset | |
eof_offset | int gauge | Last PARTITION_EOF signaled offset | |
lo_offset | int gauge | Partition's low watermark offset on broker | |
hi_offset | int gauge | Partition's high watermark offset on broker | |
ls_offset | int gauge | Partition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0. | |
consumer_lag | int gauge | Difference between (hi_offset or ls_offset) - max(app_offset, committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset. | |
txmsgs | int | Total number of messages transmitted (produced) | |
txbytes | int | Total number of bytes transmitted for txmsgs | |
rxmsgs | int | Total number of messages consumed, not including ignored messages (due to offset, etc). | |
rxbytes | int | Total number of bytes received for rxmsgs | |
msgs | int | Total number of messages received (consumer, same as rxmsgs), or total number of messages produced (possibly not yet transmitted) (producer). | |
rx_ver_drops | int | Dropped outdated messages | |
msgs_inflight | int gauge | Current number of messages in-flight to/from broker | |
next_ack_seq | int gauge | Next expected acked sequence (idempotent producer) | |
next_err_seq | int gauge | Next expected errored sequence (idempotent producer) | |
acked_msgid | int | Last acked internal message id (idempotent producer) |
Field | Type | Example | Description |
---|---|---|---|
state | string | "up" | Local consumer group handler's state. |
stateage | int gauge | Time elapsed since last state change (milliseconds). | |
joinstate | string | "assigned" | Local consumer group handler's join state. |
rebalance_age | int gauge | Time elapsed since last rebalance (assign or revoke) (milliseconds). | |
rebalance_cnt | int | Total number of rebalances (assign or revoke). | |
rebalance_reason | string | Last rebalance reason, or empty string. | |
assignment_size | int gauge | Current assignment's partition count. |
Field | Type | Example | Description |
---|---|---|---|
idemp_state | string | "Assigned" | Current idempotent producer id state. |
idemp_stateage | int gauge | Time elapsed since last idemp_state change (milliseconds). | |
txn_state | string | "InTransaction" | Current transactional producer state. |
txn_stateage | int gauge | Time elapsed since last txn_state change (milliseconds). | |
txn_may_enq | bool | Transactional state allows enqueuing (producing) new messages. | |
producer_id | int gauge | The currently assigned Producer ID (or -1). | |
producer_epoch | int gauge | The current epoch (or -1). | |
epoch_cnt | int | The number of Producer ID assignments since start. |
This (prettified) example output is from a short-lived producer using the following command:
rdkafka_performance -b localhost -P -t test -T 1000 -Y 'cat >> stats.json'
.
Note: this output is prettified using jq .
, the JSON object emitted by librdkafka does not contain line breaks.
{
"name": "rdkafka#producer-1",
"client_id": "rdkafka",
"type": "producer",
"ts": 5016483227792,
"time": 1527060869,
"replyq": 0,
"msg_cnt": 22710,
"msg_size": 704010,
"msg_max": 500000,
"msg_size_max": 1073741824,
"simple_cnt": 0,
"metadata_cache_cnt": 1,
"brokers": {
"localhost:9092/2": {
"name": "localhost:9092/2",
"nodeid": 2,
"nodename": "localhost:9092",
"source": "learned",
"state": "UP",
"stateage": 9057234,
"outbuf_cnt": 0,
"outbuf_msg_cnt": 0,
"waitresp_cnt": 0,
"waitresp_msg_cnt": 0,
"tx": 320,
"txbytes": 84283332,
"txerrs": 0,
"txretries": 0,
"req_timeouts": 0,
"rx": 320,
"rxbytes": 15708,
"rxerrs": 0,
"rxcorriderrs": 0,
"rxpartial": 0,
"zbuf_grow": 0,
"buf_grow": 0,
"wakeups": 591067,
"int_latency": {
"min": 86,
"max": 59375,
"avg": 23726,
"sum": 5694616664,
"stddev": 13982,
"p50": 28031,
"p75": 36095,
"p90": 39679,
"p95": 43263,
"p99": 48639,
"p99_99": 59391,
"outofrange": 0,
"hdrsize": 11376,
"cnt": 240012
},
"rtt": {
"min": 1580,
"max": 3389,
"avg": 2349,
"sum": 79868,
"stddev": 474,
"p50": 2319,
"p75": 2543,
"p90": 3183,
"p95": 3199,
"p99": 3391,
"p99_99": 3391,
"outofrange": 0,
"hdrsize": 13424,
"cnt": 34
},
"throttle": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"stddev": 0,
"p50": 0,
"p75": 0,
"p90": 0,
"p95": 0,
"p99": 0,
"p99_99": 0,
"outofrange": 0,
"hdrsize": 17520,
"cnt": 34
},
"toppars": {
"test-1": {
"topic": "test",
"partition": 1
}
}
},
"localhost:9093/3": {
"name": "localhost:9093/3",
"nodeid": 3,
"nodename": "localhost:9093",
"source": "learned",
"state": "UP",
"stateage": 9057209,
"outbuf_cnt": 0,
"outbuf_msg_cnt": 0,
"waitresp_cnt": 0,
"waitresp_msg_cnt": 0,
"tx": 310,
"txbytes": 84301122,
"txerrs": 0,
"txretries": 0,
"req_timeouts": 0,
"rx": 310,
"rxbytes": 15104,
"rxerrs": 0,
"rxcorriderrs": 0,
"rxpartial": 0,
"zbuf_grow": 0,
"buf_grow": 0,
"wakeups": 607956,
"int_latency": {
"min": 82,
"max": 58069,
"avg": 23404,
"sum": 5617432101,
"stddev": 14021,
"p50": 27391,
"p75": 35839,
"p90": 39679,
"p95": 42751,
"p99": 48639,
"p99_99": 58111,
"outofrange": 0,
"hdrsize": 11376,
"cnt": 240016
},
"rtt": {
"min": 1704,
"max": 3572,
"avg": 2493,
"sum": 87289,
"stddev": 559,
"p50": 2447,
"p75": 2895,
"p90": 3375,
"p95": 3407,
"p99": 3583,
"p99_99": 3583,
"outofrange": 0,
"hdrsize": 13424,
"cnt": 35
},
"throttle": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"stddev": 0,
"p50": 0,
"p75": 0,
"p90": 0,
"p95": 0,
"p99": 0,
"p99_99": 0,
"outofrange": 0,
"hdrsize": 17520,
"cnt": 35
},
"toppars": {
"test-0": {
"topic": "test",
"partition": 0
}
}
},
"localhost:9094/4": {
"name": "localhost:9094/4",
"nodeid": 4,
"nodename": "localhost:9094",
"source": "learned",
"state": "UP",
"stateage": 9057207,
"outbuf_cnt": 0,
"outbuf_msg_cnt": 0,
"waitresp_cnt": 0,
"waitresp_msg_cnt": 0,
"tx": 1,
"txbytes": 25,
"txerrs": 0,
"txretries": 0,
"req_timeouts": 0,
"rx": 1,
"rxbytes": 272,
"rxerrs": 0,
"rxcorriderrs": 0,
"rxpartial": 0,
"zbuf_grow": 0,
"buf_grow": 0,
"wakeups": 4,
"int_latency": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"stddev": 0,
"p50": 0,
"p75": 0,
"p90": 0,
"p95": 0,
"p99": 0,
"p99_99": 0,
"outofrange": 0,
"hdrsize": 11376,
"cnt": 0
},
"rtt": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"stddev": 0,
"p50": 0,
"p75": 0,
"p90": 0,
"p95": 0,
"p99": 0,
"p99_99": 0,
"outofrange": 0,
"hdrsize": 13424,
"cnt": 0
},
"throttle": {
"min": 0,
"max": 0,
"avg": 0,
"sum": 0,
"stddev": 0,
"p50": 0,
"p75": 0,
"p90": 0,
"p95": 0,
"p99": 0,
"p99_99": 0,
"outofrange": 0,
"hdrsize": 17520,
"cnt": 0
},
"toppars": {}
}
},
"topics": {
"test": {
"topic": "test",
"metadata_age": 9060,
"batchsize": {
"min": 99,
"max": 391805,
"avg": 272593,
"sum": 18808985,
"stddev": 180408,
"p50": 393215,
"p75": 393215,
"p90": 393215,
"p95": 393215,
"p99": 393215,
"p99_99": 393215,
"outofrange": 0,
"hdrsize": 14448,
"cnt": 69
},
"batchcnt": {
"min": 1,
"max": 10000,
"avg": 6956,
"sum": 480028,
"stddev": 4608,
"p50": 10047,
"p75": 10047,
"p90": 10047,
"p95": 10047,
"p99": 10047,
"p99_99": 10047,
"outofrange": 0,
"hdrsize": 8304,
"cnt": 69
},
"partitions": {
"0": {
"partition": 0,
"broker": 3,
"leader": 3,
"desired": false,
"unknown": false,
"msgq_cnt": 1,
"msgq_bytes": 31,
"xmit_msgq_cnt": 0,
"xmit_msgq_bytes": 0,
"fetchq_cnt": 0,
"fetchq_size": 0,
"fetch_state": "none",
"query_offset": 0,
"next_offset": 0,
"app_offset": -1001,
"stored_offset": -1001,
"commited_offset": -1001,
"committed_offset": -1001,
"eof_offset": -1001,
"lo_offset": -1001,
"hi_offset": -1001,
"consumer_lag": -1,
"txmsgs": 2150617,
"txbytes": 66669127,
"rxmsgs": 0,
"rxbytes": 0,
"msgs": 2160510,
"rx_ver_drops": 0
},
"1": {
"partition": 1,
"broker": 2,
"leader": 2,
"desired": false,
"unknown": false,
"msgq_cnt": 0,
"msgq_bytes": 0,
"xmit_msgq_cnt": 0,
"xmit_msgq_bytes": 0,
"fetchq_cnt": 0,
"fetchq_size": 0,
"fetch_state": "none",
"query_offset": 0,
"next_offset": 0,
"app_offset": -1001,
"stored_offset": -1001,
"commited_offset": -1001,
"committed_offset": -1001,
"eof_offset": -1001,
"lo_offset": -1001,
"hi_offset": -1001,
"consumer_lag": -1,
"txmsgs": 2150136,
"txbytes": 66654216,
"rxmsgs": 0,
"rxbytes": 0,
"msgs": 2159735,
"rx_ver_drops": 0
},
"-1": {
"partition": -1,
"broker": -1,
"leader": -1,
"desired": false,
"unknown": false,
"msgq_cnt": 0,
"msgq_bytes": 0,
"xmit_msgq_cnt": 0,
"xmit_msgq_bytes": 0,
"fetchq_cnt": 0,
"fetchq_size": 0,
"fetch_state": "none",
"query_offset": 0,
"next_offset": 0,
"app_offset": -1001,
"stored_offset": -1001,
"commited_offset": -1001,
"committed_offset": -1001,
"eof_offset": -1001,
"lo_offset": -1001,
"hi_offset": -1001,
"consumer_lag": -1,
"txmsgs": 0,
"txbytes": 0,
"rxmsgs": 0,
"rxbytes": 0,
"msgs": 1177,
"rx_ver_drops": 0
}
}
}
},
"tx": 631,
"tx_bytes": 168584479,
"rx": 631,
"rx_bytes": 31084,
"txmsgs": 4300753,
"txmsg_bytes": 133323343,
"rxmsgs": 0,
"rxmsg_bytes": 0
}