Skip to content

fix: Kafka context propagation #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 62 additions & 42 deletions src/instana/instrumentation/kafka/confluent_kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ def trace_kafka_produce(
span.set_attribute("kafka.access", "produce")

# context propagation
headers = args[6] if len(args) > 6 else kwargs.get("headers", {})
#
# As stated in the official documentation at
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-producer,
# headers can be either a list of (key, value) pairs or a
# dictionary. To maintain compatibility with the headers for the
# Kafka Python library, we will use a list of tuples.
headers = args[6] if len(args) > 6 else kwargs.get("headers", [])
tracer.inject(
span.context,
Format.KAFKA_HEADERS,
Expand All @@ -75,44 +81,63 @@ def trace_kafka_produce(
)

try:
kwargs["headers"] = headers
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

def trace_kafka_consume(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
instance: InstanaConfluentKafkaConsumer,
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> List[confluent_kafka.Message]:
if tracing_is_off():
return wrapped(*args, **kwargs)

def create_span(
span_type: str,
topic: Optional[str] = "",
headers: Optional[List[Tuple[str, bytes]]] = [],
exception: Optional[str] = None,
) -> None:
tracer, parent_span, _ = get_tracer_tuple()

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
span.set_attribute("kafka.access", "consume")
if topic:
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", span_type)

try:
res = wrapped(*args, **kwargs)
if isinstance(res, list) and len(res) > 0:
span.set_attribute("kafka.service", res[0].topic())
except Exception as exc:
span.record_exception(exc)
if exception:
span.record_exception(exception)

def trace_kafka_consume(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.consume],
instance: InstanaConfluentKafkaConsumer,
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> List[confluent_kafka.Message]:
if tracing_is_off():
return wrapped(*args, **kwargs)

res = None
exception = None

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
exception = exc
finally:
if res:
for message in res:
create_span("consume", message.topic(), message.headers())
else:
return res
create_span("consume", exception=exception)

return res

def trace_kafka_poll(
wrapped: Callable[..., InstanaConfluentKafkaConsumer.poll],
Expand All @@ -123,29 +148,24 @@ def trace_kafka_poll(
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
span.set_attribute("kafka.access", "poll")
res = None
exception = None

try:
res = wrapped(*args, **kwargs)
if res:
span.set_attribute("kafka.service", res.topic())
except Exception as exc:
span.record_exception(exc)
try:
res = wrapped(*args, **kwargs)
except Exception as exc:
exception = exc
finally:
if res:
create_span("poll", res.topic(), res.headers())
else:
return res
create_span(
"poll",
next(iter(instance.list_topics().topics)),
exception=exception,
)

return res

# Apply the monkey patch
confluent_kafka.Producer = InstanaConfluentKafkaProducer
Expand Down
121 changes: 74 additions & 47 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# (c) Copyright IBM Corp. 2025

try:
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
import inspect
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple

import kafka # noqa: F401
import wrapt
Expand Down Expand Up @@ -37,92 +38,118 @@ def trace_kafka_send(
span.set_attribute("kafka.access", "send")

# context propagation
headers = kwargs.get("headers", [])
tracer.inject(
span.context,
Format.KAFKA_HEADERS,
kwargs.get("headers", {}),
headers,
disable_w3c_trace_context=True,
)

try:
kwargs["headers"] = headers
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
def trace_kafka_consume(
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

def create_span(
span_type: str,
topic: Optional[str],
headers: Optional[List[Tuple[str, bytes]]] = [],
exception: Optional[str] = None,
) -> None:
tracer, parent_span, _ = get_tracer_tuple()

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
Format.KAFKA_HEADERS,
headers,
disable_w3c_trace_context=True,
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
topic = list(instance.subscription())[0]
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "consume")
if topic:
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", span_type)
if exception:
span.record_exception(exception)

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.__next__")
def trace_kafka_consume(
wrapped: Callable[..., "kafka.KafkaConsumer.__next__"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> "FutureRecordMetadata":
if tracing_is_off():
return wrapped(*args, **kwargs)

exception = None
res = None

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
exception = exc
finally:
if res:
create_span(
"consume",
res.topic if res else list(instance.subscription())[0],
res.headers,
)
else:
return res
create_span(
"consume", list(instance.subscription())[0], exception=exception
)

return res

@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
def trace_kafka_poll(
wrapped: Callable[..., "kafka.KafkaConsumer.poll"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> Dict[str, Any]:
) -> Optional[Dict[str, Any]]:
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

# The KafkaConsumer.consume() from the kafka-python-ng call the
# KafkaConsumer.poll() internally, so we do not consider it here.
if parent_span and parent_span.name == "kafka-consumer":
if any(
frame.function == "trace_kafka_consume"
for frame in inspect.getouterframes(inspect.currentframe(), 2)
):
return wrapped(*args, **kwargs)

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
topic = list(instance.subscription())[0]
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "poll")

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
exception = None
res = None

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
exception = exc
finally:
if res:
for partition, consumer_records in res.items():
for message in consumer_records:
create_span(
"poll",
partition.topic,
message.headers if hasattr(message, "headers") else [],
)
else:
return res
create_span(
"poll", list(instance.subscription())[0], exception=exception
)

return res

logger.debug("Instrumenting Kafka (kafka-python)")
except ImportError:
Expand Down
Loading
Loading