Skip to content

feat: Kafka FUP Phase I #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ dependencies = [
"opentelemetry-api>=1.27.0",
"opentelemetry-semantic-conventions>=0.48b0",
"typing_extensions>=4.12.2",
"pyyaml>=6.0.2",
]

[project.entry-points."instana"]
string = "instana:load"

[project.optional-dependencies]
dev = [
"pytest",
"pytest-cov",
"pytest-mock",
"pre-commit>=3.0.0",
"ruff"
"pytest",
"pytest-cov",
"pytest-mock",
"pre-commit>=3.0.0",
"ruff",
]

[project.urls]
Expand Down
42 changes: 30 additions & 12 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from instana.options import StandardOptions
from instana.util import to_json
from instana.util.runtime import get_py_source
from instana.util.span_utils import get_operation_specifier
from instana.util.span_utils import get_operation_specifiers
from instana.version import VERSION


Expand Down Expand Up @@ -351,13 +351,18 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
"""
filtered_spans = []
endpoint = ""
for span in spans:
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
service = span.n
operation_specifier = get_operation_specifier(service)
endpoint = span.data[service][operation_specifier]
if isinstance(endpoint, str) and self.__is_service_or_endpoint_ignored(
service, endpoint
operation_specifier_key, service_specifier_key = (
get_operation_specifiers(service)
)
if service == "kafka":
endpoint = span.data[service][service_specifier_key]
method = span.data[service][operation_specifier_key]
if isinstance(method, str) and self.__is_endpoint_ignored(
service, method, endpoint
):
continue
else:
Expand All @@ -366,15 +371,28 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
filtered_spans.append(span)
return filtered_spans

def __is_service_or_endpoint_ignored(
self, service: str, endpoint: str = ""
def __is_endpoint_ignored(
self,
service: str,
method: str = "",
endpoint: str = "",
) -> bool:
"""Check if the given service and endpoint combination should be ignored."""

return (
service.lower() in self.options.ignore_endpoints
or f"{service.lower()}.{endpoint.lower()}" in self.options.ignore_endpoints
)
service = service.lower()
method = method.lower()
endpoint = endpoint.lower()
filter_rules = [
f"{service}.{method}", # service.method
f"{service}.*", # service.*
]

if service == "kafka" and endpoint:
filter_rules += [
f"{service}.{method}.{endpoint}", # service.method.endpoint
f"{service}.*.{endpoint}", # service.*.endpoint
f"{service}.{method}.*", # service.method.*
]
return any(rule in self.options.ignore_endpoints for rule in filter_rules)

def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
"""
Expand Down
26 changes: 17 additions & 9 deletions src/instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from typing import Any, Dict

from instana.log import logger
from instana.util.config import parse_ignored_endpoints
from instana.util.config import (
parse_ignored_endpoints,
parse_ignored_endpoints_from_yaml,
)
from instana.util.runtime import determine_service_name
from instana.configurator import config

Expand All @@ -44,18 +47,23 @@ def __init__(self, **kwds: Dict[str, Any]) -> None:
str(os.environ["INSTANA_EXTRA_HTTP_HEADERS"]).lower().split(";")
)

if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
os.environ["INSTANA_IGNORE_ENDPOINTS"]
if "INSTANA_IGNORE_ENDPOINTS_PATH" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints_from_yaml(
os.environ["INSTANA_IGNORE_ENDPOINTS_PATH"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
if "INSTANA_IGNORE_ENDPOINTS" in os.environ:
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
os.environ["INSTANA_IGNORE_ENDPOINTS"]
)
else:
if (
isinstance(config.get("tracing"), dict)
and "ignore_endpoints" in config["tracing"]
):
self.ignore_endpoints = parse_ignored_endpoints(
config["tracing"]["ignore_endpoints"],
)

if os.environ.get("INSTANA_ALLOW_EXIT_AS_ROOT", None) == "1":
self.allow_exit_as_root = True
Expand Down
91 changes: 72 additions & 19 deletions src/instana/util/config.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import itertools
from typing import Any, Dict, List, Union

from instana.log import logger
from instana.util.config_reader import ConfigReader


def parse_service_pair(pair: str) -> List[str]:
"""
Parses a pair string to prepare a list of ignored endpoints.

@param pair: String format:
- "service1:endpoint1,endpoint2" or "service1:endpoint1" or "service1"
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- "service1:method1,method2" or "service1:method1" or "service1"
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
pair_list = []
if ":" in pair:
service, endpoints = pair.split(":", 1)
service, methods = pair.split(":", 1)
service = service.strip()
endpoint_list = [ep.strip() for ep in endpoints.split(",") if ep.strip()]
method_list = [ep.strip() for ep in methods.split(",") if ep.strip()]

for endpoint in endpoint_list:
pair_list.append(f"{service}.{endpoint}")
for method in method_list:
pair_list.append(f"{service}.{method}")
else:
pair_list.append(pair)
pair_list.append(f"{pair}.*")
return pair_list


Expand All @@ -28,8 +31,8 @@ def parse_ignored_endpoints_string(params: str) -> List[str]:
Parses a string to prepare a list of ignored endpoints.

@param params: String format:
- "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- "service1:method1,method2;service2:method3" or "service1;service2"
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
ignore_endpoints = []
if params:
Expand All @@ -46,18 +49,45 @@ def parse_ignored_endpoints_dict(params: Dict[str, Any]) -> List[str]:
Parses a dictionary to prepare a list of ignored endpoints.

@param params: Dict format:
- {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- {"service1": ["method1", "method2"], "service2": ["method3"]}
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
ignore_endpoints = []

for service, endpoints in params.items():
if not endpoints: # filtering all service
ignore_endpoints.append(service.lower())
for service, methods in params.items():
if not methods: # filtering all service
ignore_endpoints.append(f"{service.lower()}.*")
else: # filtering specific endpoints
for endpoint in endpoints:
ignore_endpoints.append(f"{service.lower()}.{endpoint.lower()}")
ignore_endpoints = parse_endpoints_of_service(
ignore_endpoints, service, methods
)

return ignore_endpoints


def parse_endpoints_of_service(
ignore_endpoints: List[str],
service: str,
methods: Union[str, List[str]],
) -> List[str]:
"""
Parses endpoints of each service.

@param ignore_endpoints: A list of rules for endpoints to be filtered.
@param service: The name of the service to be filtered.
@param methods: A list of specific endpoints of the service to be filtered.
"""
if service == "kafka" and isinstance(methods, list):
for rule in methods:
for method, endpoint in itertools.product(
rule["methods"], rule["endpoints"]
):
ignore_endpoints.append(
f"{service.lower()}.{method.lower()}.{endpoint.lower()}"
)
else:
for method in methods:
ignore_endpoints.append(f"{service.lower()}.{method.lower()}")
return ignore_endpoints


Expand All @@ -66,9 +96,9 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
Parses input to prepare a list for ignored endpoints.

@param params: Can be either:
- String: "service1:endpoint1,endpoint2;service2:endpoint3" or "service1;service2"
- Dict: {"service1": ["endpoint1", "endpoint2"], "service2": ["endpoint3"]}
@return: List of strings in format ["service1.endpoint1", "service1.endpoint2", "service2"]
- String: "service1:method1,method2;service2:method3" or "service1;service2"
- Dict: {"service1": ["method1", "method2"], "service2": ["method3"]}
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*"]
"""
try:
if isinstance(params, str):
Expand All @@ -80,3 +110,26 @@ def parse_ignored_endpoints(params: Union[Dict[str, Any], str]) -> List[str]:
except Exception as e:
logger.debug("Error parsing ignored endpoints: %s", str(e))
return []


def parse_ignored_endpoints_from_yaml(file_path: str) -> List[str]:
"""
Parses configuration yaml file and prepares a list of ignored endpoints.

@param file_path: Path of the file as a string
@return: List of strings in format ["service1.method1", "service1.method2", "service2.*", "kafka.method.topic", "kafka.*.topic", "kafka.method.*"]
"""
config_reader = ConfigReader(file_path)
ignore_endpoints_dict = None
if "tracing" in config_reader.data:
ignore_endpoints_dict = config_reader.data["tracing"].get("ignore-endpoints")
elif "com.instana.tracing" in config_reader.data:
logger.debug('Please use "tracing" instead of "com.instana.tracing"')
ignore_endpoints_dict = config_reader.data["com.instana.tracing"].get(
"ignore-endpoints"
)
if ignore_endpoints_dict:
ignored_endpoints = parse_ignored_endpoints(ignore_endpoints_dict)
return ignored_endpoints
else:
return []
19 changes: 19 additions & 0 deletions src/instana/util/config_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from instana.log import logger
import yaml


class ConfigReader:
def __init__(self, file_path: str) -> None:
self.file_path = file_path
self.data = None
self.load_file()

def load_file(self) -> None:
"""Loads and parses the YAML file"""
try:
with open(self.file_path, "r") as file:
self.data = yaml.safe_load(file)
except FileNotFoundError:
logger.error(f"Configuration file has not found: {self.file_path}")
except yaml.YAMLError as e:
logger.error(f"Error parsing YAML file: {e}")
16 changes: 10 additions & 6 deletions src/instana/util/span_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# (c) Copyright IBM Corp. 2025

from typing import Optional
from typing import Tuple


def get_operation_specifier(span_name: str) -> Optional[str]:
def get_operation_specifiers(span_name: str) -> Tuple[str, str]:
"""Get the specific operation specifier for the given span."""
operation_specifier = ""
operation_specifier_key = ""
service_specifier_key = ""
if span_name == "redis":
operation_specifier = "command"
operation_specifier_key = "command"
elif span_name == "dynamodb":
operation_specifier = "op"
return operation_specifier
operation_specifier_key = "op"
elif span_name == "kafka":
operation_specifier_key = "access"
service_specifier_key = "service"
return operation_specifier_key, service_specifier_key
30 changes: 10 additions & 20 deletions tests/agent/test_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,31 +692,21 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None:
assert "should_send_snapshot_data: True" in caplog.messages

def test_is_service_or_endpoint_ignored(self) -> None:
self.agent.options.ignore_endpoints.append("service1")
self.agent.options.ignore_endpoints.append("service2.endpoint1")
self.agent.options.ignore_endpoints.append("service1.*")
self.agent.options.ignore_endpoints.append("service2.method1")

# ignore all endpoints of service1
assert self.agent._HostAgent__is_service_or_endpoint_ignored("service1")
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "endpoint1"
)
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "endpoint2"
)
assert self.agent._HostAgent__is_endpoint_ignored("service1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "method2")

# case-insensitive
assert self.agent._HostAgent__is_service_or_endpoint_ignored("SERVICE1")
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service1", "ENDPOINT1"
)
assert self.agent._HostAgent__is_endpoint_ignored("SERVICE1")
assert self.agent._HostAgent__is_endpoint_ignored("service1", "METHOD1")

# ignore only endpoint1 of service2
assert self.agent._HostAgent__is_service_or_endpoint_ignored(
"service2", "endpoint1"
)
assert not self.agent._HostAgent__is_service_or_endpoint_ignored(
"service2", "endpoint2"
)
assert self.agent._HostAgent__is_endpoint_ignored("service2", "method1")
assert not self.agent._HostAgent__is_endpoint_ignored("service2", "method2")

# don't ignore other services
assert not self.agent._HostAgent__is_service_or_endpoint_ignored("service3")
assert not self.agent._HostAgent__is_endpoint_ignored("service3")
2 changes: 1 addition & 1 deletion tests/clients/boto3/test_boto3_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_ignore_dynamodb(self) -> None:
assert dynamodb_span not in filtered_spans

def test_ignore_create_table(self) -> None:
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb.createtable"
os.environ["INSTANA_IGNORE_ENDPOINTS"] = "dynamodb:createtable"
agent.options = StandardOptions()

with tracer.start_as_current_span("test"):
Expand Down
Loading
Loading