Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to make routing-key part of RabbitMQ transaction/span names #3636

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -38,6 +38,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
===== Bug fixes
* Restore compatibility with Java 7 - {pull}3657[#3657]

[float]
===== Features
* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636]

[[release-notes-1.50.0]]
==== 1.50.0 - 2024/05/28

Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.tracer.ElasticContext;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -137,10 +138,9 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,
if (exitSpan != null) {

exchange = normalizeExchangeName(exchange);

String transactionNameSuffix = normalizeExchangeName(resolveTransactionNameSuffix(exchange, routingKey));
exitSpan.withAction("send")
.withName("RabbitMQ SEND to ").appendToName(exchange);

.withName("RabbitMQ SEND to ").appendToName(transactionNameSuffix);
}

properties = propagateTraceContext(tracer.currentContext(), properties);
@@ -174,6 +174,15 @@ private static AMQP.BasicProperties propagateTraceContext(ElasticContext<?> toPr
return properties.builder().headers(headersWithContext).build();
}

private static String resolveTransactionNameSuffix(String exchange, String routingKey) {

if (MessagingConfiguration.RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return routingKey;
} else {
return exchange;
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void afterBasicPublish(@Advice.Enter @Nullable Object[] enterArray,
@Advice.Thrown @Nullable Throwable throwable) {
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import co.elastic.apm.agent.tracer.Transaction;
import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import co.elastic.apm.agent.tracer.metadata.Message;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import com.rabbitmq.client.AMQP;
@@ -106,8 +107,9 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
return null;
}

String transactionNameSuffix = getExchangeOrRoutingKey(envelope);
transaction.withType("messaging")
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(exchange));
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(transactionNameSuffix));

transaction.setFrameworkName("RabbitMQ");

@@ -129,5 +131,17 @@ public static void afterHandleDelivery(@Advice.Enter @Nullable final Object tran
.end();
}
}

private static String getExchangeOrRoutingKey(Envelope envelope) {
if (null == envelope) {
return null;
}

if (RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return envelope.getRoutingKey();
} else {
return envelope.getExchange();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.testutils.TestContainersUtils;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -98,7 +99,7 @@ public class RabbitMQIT extends AbstractInstrumentationTest {
private static final String IMAGE = "rabbitmq:3.7-management-alpine";
private static final RabbitMQContainer container = new RabbitMQContainer(IMAGE);

private static final String ROUTING_KEY = "test.key";
private static final String TEST_ROUTING_KEY = "test.key";

private static final byte[] MSG = "Testing APM!".getBytes();

@@ -203,7 +204,8 @@ void headersCaptureSanitize() throws IOException, InterruptedException {
), true);
}

private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders, boolean expectTracingHeaders) throws IOException, InterruptedException {
private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders,
boolean expectTracingHeaders) throws IOException, InterruptedException {
performTest(
propertiesMap(headersMap),
false,
@@ -228,6 +230,15 @@ void ignoreExchangeName() throws IOException, InterruptedException {
});
}

@Test
void routingKeyInTransactionName() throws IOException, InterruptedException {
MessagingConfiguration messagingConfiguration = config.getConfig(MessagingConfiguration.class);
doReturn(RabbitMQNamingMode.ROUTING_KEY).when(messagingConfiguration).getRabbitMQNamingMode();

performTest(emptyProperties(), false, randString("exchange"), "different-routing-key",
(mt, ms) -> {});
}

private void performTest(@Nullable AMQP.BasicProperties properties) throws IOException, InterruptedException {
performTest(properties, false, randString("exchange"), (mt, ms) -> {
});
@@ -238,17 +249,27 @@ private void performTest(@Nullable AMQP.BasicProperties properties,
String channelName,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

performTest(properties, shouldIgnore, channelName, TEST_ROUTING_KEY, messageCheck);
}

private void performTest(@Nullable AMQP.BasicProperties properties,
boolean shouldIgnore,
String channelName,
String routingKey,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

Channel channel = connection.createChannel();
String exchange = createExchange(channel, channelName);
String queue = createQueue(channel, exchange);
String queue = createQueue(channel, exchange, routingKey);

CountDownLatch messageReceived = new CountDownLatch(1);

channel.basicConsume(queue, new DefaultConsumer(channel) {
// using an anonymous class to ensure class matching is properly applied

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
assertThat(properties).isNotNull();
Map<String, Object> headers = properties.getHeaders();

@@ -264,7 +285,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp

Transaction rootTransaction = startTestRootTransaction("Rabbit-Test Root Transaction");

channel.basicPublish(exchange, ROUTING_KEY, properties, MSG);
channel.basicPublish(exchange, routingKey, properties, MSG);

endRootTransaction(rootTransaction);

@@ -285,17 +306,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
return;
}


// 2 transactions, 1 span expected
getReporter().awaitTransactionCount(2);
getReporter().awaitSpanCount(1);

Transaction childTransaction = getNonRootTransaction(rootTransaction, getReporter().getTransactions());

checkTransaction(childTransaction, exchange);
String transactionNameSuffix = !routingKey.equals(TEST_ROUTING_KEY) ? routingKey: exchange;
checkTransaction(childTransaction, exchange, transactionNameSuffix, "RabbitMQ");

Span span = getReporter().getSpans().get(0);
checkSendSpan(span, exchange);
checkSendSpan(span, exchange, transactionNameSuffix);

// span should be child of the first transaction
checkParentChild(rootTransaction, span);
@@ -306,12 +327,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
Message spanMessage = span.getContext().getMessage();
Message transactionMessage = childTransaction.getContext().getMessage();


// test-specific assertions on captured message
messageCheck.accept(transactionMessage, spanMessage);

}


@Test
void testPollingWithinTransactionNoMessage() throws IOException {
Channel channel = connection.createChannel();
@@ -395,7 +415,7 @@ void testPollingIgnoreExchangeName() throws IOException {
private String declareAndBindQueue(String queue, String exchange, Channel channel) {
try {
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, ROUTING_KEY);
channel.queueBind(queue, exchange, TEST_ROUTING_KEY);
return queue;
} catch (IOException e) {
throw new IllegalStateException(e);
@@ -413,7 +433,7 @@ private void pollingTest(boolean withinTransaction, boolean withResult, Supplier
}

if (withResult) {
channel.basicPublish(exchange, ROUTING_KEY, emptyProperties(), MSG);
channel.basicPublish(exchange, TEST_ROUTING_KEY, emptyProperties(), MSG);
}
channel.basicGet(queue, true);

@@ -573,9 +593,9 @@ static Transaction getNonRootTransaction(Transaction rootTransaction, List<Trans
return childTransaction;
}

private String createQueue(Channel channel, String exchange) throws IOException {
private String createQueue(Channel channel, String exchange, String routingKey) throws IOException {
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, ROUTING_KEY);
channel.queueBind(queueName, exchange, routingKey);
return queueName;
}

@@ -618,9 +638,13 @@ private static void checkTransaction(Transaction transaction, String exchange) {
}

static void checkTransaction(Transaction transaction, String exchange, String frameworkName) {
checkTransaction(transaction, exchange, exchange, frameworkName);
}

static void checkTransaction(Transaction transaction, String exchange, String transactionNameSuffix, String frameworkName) {
assertThat(transaction.getType()).isEqualTo("messaging");
assertThat(transaction.getNameAsString())
.isEqualTo("RabbitMQ RECEIVE from %s", exchange.isEmpty() ? "<default>" : exchange);
.isEqualTo("RabbitMQ RECEIVE from %s", transactionNameSuffix.isEmpty() ? "<default>" : transactionNameSuffix);
assertThat(transaction.getFrameworkName()).isEqualTo(frameworkName);

assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS);
@@ -682,14 +706,23 @@ private static HashMap<String, String> getHeadersMap(Message message) {
}

private static void checkSendSpan(Span span, String exchange) {
checkSendSpan(span, exchange, connection.getAddress().getHostAddress(), connection.getPort());
checkSendSpan(span, exchange, exchange, connection.getAddress().getHostAddress(), connection.getPort());
}

private static void checkSendSpan(Span span, String exchange, String spanNameSuffix) {
checkSendSpan(span, exchange, spanNameSuffix, connection.getAddress().getHostAddress(), connection.getPort());
}

static void checkSendSpan(Span span, String exchange, String host, int port) {
checkSendSpan(span, exchange, exchange, host, port);
}

static void checkSendSpan(Span span, String exchange, String spanNameSuffix, String host, int port) {
String exchangeName = exchange.isEmpty() ? "<default>" : exchange;
String spanName = spanNameSuffix.isEmpty() ? "<default>" : spanNameSuffix;
checkSpanCommon(span,
"send",
String.format("RabbitMQ SEND to %s", exchangeName),
String.format("RabbitMQ SEND to %s", spanName),
exchangeName,
true
);
Original file line number Diff line number Diff line change
@@ -111,8 +111,8 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
private final ConfigurationOption<RabbitMQNamingMode> rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class)
.key("rabbitmq_naming_mode")
.configurationCategory(MESSAGING_CATEGORY)
.description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp."
.description("Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client."
)
.dynamic(true)
.tags("added[1.46.0]")
@@ -187,5 +187,9 @@ public enum RabbitMQNamingMode {
* Use queue in transaction names
*/
QUEUE,
/**
* Use routing key in transaction names
*/
ROUTING_KEY
}
}
12 changes: 6 additions & 6 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
@@ -2513,12 +2513,12 @@ Starting from version 1.43.0, the classes that are part of the 'application_pack
[[config-rabbitmq-naming-mode]]
==== `rabbitmq_naming_mode` (added[1.46.0])

Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.

<<configuration-dynamic, image:./images/dynamic-config.svg[] >>

Valid options: `EXCHANGE`, `QUEUE`
Valid options: `EXCHANGE`, `QUEUE`, `ROUTING_KEY`

[options="header"]
|============
@@ -4703,10 +4703,10 @@ Example: `5ms`.
#
# jms_listener_packages=

# Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
# Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.
#
# Valid options: EXCHANGE, QUEUE
# Valid options: EXCHANGE, QUEUE, ROUTING_KEY
# This setting can be changed at runtime
# Type: RabbitMQNamingMode
# Default value: EXCHANGE