Skip to content

Commit 68e0ca2

Browse files
Deniza Topalovadeni-topalova
Deniza Topalova
authored andcommitted
adding an option to make routing-key part of RabbitMQ transaction names
address PR comments - add the same logic to the sender side
1 parent 0c45a57 commit 68e0ca2

File tree

6 files changed

+93
-29
lines changed

6 files changed

+93
-29
lines changed

CHANGELOG.asciidoc

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
3838
===== Bug fixes
3939
* Restore compatibility with Java 7 - {pull}3657[#3657]
4040
41+
[float]
42+
===== Features
43+
* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636]
44+
4145
[[release-notes-1.50.0]]
4246
==== 1.50.0 - 2024/05/28
4347

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ChannelInstrumentation.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
2525
import co.elastic.apm.agent.tracer.ElasticContext;
2626
import co.elastic.apm.agent.tracer.Span;
27+
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
2728
import com.rabbitmq.client.AMQP;
2829
import com.rabbitmq.client.Channel;
2930
import com.rabbitmq.client.Connection;
@@ -137,10 +138,9 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,
137138
if (exitSpan != null) {
138139

139140
exchange = normalizeExchangeName(exchange);
140-
141+
String transactionNameSuffix = normalizeExchangeName(resolveTransactionNameSuffix(exchange, routingKey));
141142
exitSpan.withAction("send")
142-
.withName("RabbitMQ SEND to ").appendToName(exchange);
143-
143+
.withName("RabbitMQ SEND to ").appendToName(transactionNameSuffix);
144144
}
145145

146146
properties = propagateTraceContext(tracer.currentContext(), properties);
@@ -174,6 +174,15 @@ private static AMQP.BasicProperties propagateTraceContext(ElasticContext<?> toPr
174174
return properties.builder().headers(headersWithContext).build();
175175
}
176176

177+
private static String resolveTransactionNameSuffix(String exchange, String routingKey) {
178+
179+
if (MessagingConfiguration.RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
180+
return routingKey;
181+
} else {
182+
return exchange;
183+
}
184+
}
185+
177186
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
178187
public static void afterBasicPublish(@Advice.Enter @Nullable Object[] enterArray,
179188
@Advice.Thrown @Nullable Throwable throwable) {

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/main/java/co/elastic/apm/agent/rabbitmq/ConsumerInstrumentation.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import co.elastic.apm.agent.tracer.Transaction;
2222
import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter;
23+
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
2324
import co.elastic.apm.agent.tracer.metadata.Message;
2425
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
2526
import com.rabbitmq.client.AMQP;
@@ -106,8 +107,9 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
106107
return null;
107108
}
108109

110+
String transactionNameSuffix = getExchangeOrRoutingKey(envelope);
109111
transaction.withType("messaging")
110-
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(exchange));
112+
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(transactionNameSuffix));
111113

112114
transaction.setFrameworkName("RabbitMQ");
113115

@@ -129,5 +131,17 @@ public static void afterHandleDelivery(@Advice.Enter @Nullable final Object tran
129131
.end();
130132
}
131133
}
134+
135+
private static String getExchangeOrRoutingKey(Envelope envelope) {
136+
if (null == envelope) {
137+
return null;
138+
}
139+
140+
if (RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
141+
return envelope.getRoutingKey();
142+
} else {
143+
return envelope.getExchange();
144+
}
145+
}
132146
}
133147
}

apm-agent-plugins/apm-rabbitmq/apm-rabbitmq-plugin/src/test/java/co/elastic/apm/agent/rabbitmq/RabbitMQIT.java

+50-17
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import co.elastic.apm.agent.impl.transaction.Transaction;
5757
import co.elastic.apm.agent.common.util.WildcardMatcher;
5858
import co.elastic.apm.agent.testutils.TestContainersUtils;
59+
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
5960
import com.rabbitmq.client.AMQP;
6061
import com.rabbitmq.client.Channel;
6162
import com.rabbitmq.client.Connection;
@@ -98,7 +99,7 @@ public class RabbitMQIT extends AbstractInstrumentationTest {
9899
private static final String IMAGE = "rabbitmq:3.7-management-alpine";
99100
private static final RabbitMQContainer container = new RabbitMQContainer(IMAGE);
100101

101-
private static final String ROUTING_KEY = "test.key";
102+
private static final String TEST_ROUTING_KEY = "test.key";
102103

103104
private static final byte[] MSG = "Testing APM!".getBytes();
104105

@@ -203,7 +204,8 @@ void headersCaptureSanitize() throws IOException, InterruptedException {
203204
), true);
204205
}
205206

206-
private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders, boolean expectTracingHeaders) throws IOException, InterruptedException {
207+
private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders,
208+
boolean expectTracingHeaders) throws IOException, InterruptedException {
207209
performTest(
208210
propertiesMap(headersMap),
209211
false,
@@ -228,6 +230,15 @@ void ignoreExchangeName() throws IOException, InterruptedException {
228230
});
229231
}
230232

233+
@Test
234+
void routingKeyInTransactionName() throws IOException, InterruptedException {
235+
MessagingConfiguration messagingConfiguration = config.getConfig(MessagingConfiguration.class);
236+
doReturn(RabbitMQNamingMode.ROUTING_KEY).when(messagingConfiguration).getRabbitMQNamingMode();
237+
238+
performTest(emptyProperties(), false, randString("exchange"), "different-routing-key",
239+
(mt, ms) -> {});
240+
}
241+
231242
private void performTest(@Nullable AMQP.BasicProperties properties) throws IOException, InterruptedException {
232243
performTest(properties, false, randString("exchange"), (mt, ms) -> {
233244
});
@@ -238,17 +249,27 @@ private void performTest(@Nullable AMQP.BasicProperties properties,
238249
String channelName,
239250
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {
240251

252+
performTest(properties, shouldIgnore, channelName, TEST_ROUTING_KEY, messageCheck);
253+
}
254+
255+
private void performTest(@Nullable AMQP.BasicProperties properties,
256+
boolean shouldIgnore,
257+
String channelName,
258+
String routingKey,
259+
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {
260+
241261
Channel channel = connection.createChannel();
242262
String exchange = createExchange(channel, channelName);
243-
String queue = createQueue(channel, exchange);
263+
String queue = createQueue(channel, exchange, routingKey);
244264

245265
CountDownLatch messageReceived = new CountDownLatch(1);
246266

247267
channel.basicConsume(queue, new DefaultConsumer(channel) {
248268
// using an anonymous class to ensure class matching is properly applied
249269

250270
@Override
251-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
271+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
272+
byte[] body) throws IOException {
252273
assertThat(properties).isNotNull();
253274
Map<String, Object> headers = properties.getHeaders();
254275

@@ -264,7 +285,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
264285

265286
Transaction rootTransaction = startTestRootTransaction("Rabbit-Test Root Transaction");
266287

267-
channel.basicPublish(exchange, ROUTING_KEY, properties, MSG);
288+
channel.basicPublish(exchange, routingKey, properties, MSG);
268289

269290
endRootTransaction(rootTransaction);
270291

@@ -285,17 +306,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
285306
return;
286307
}
287308

288-
289309
// 2 transactions, 1 span expected
290310
getReporter().awaitTransactionCount(2);
291311
getReporter().awaitSpanCount(1);
292312

293313
Transaction childTransaction = getNonRootTransaction(rootTransaction, getReporter().getTransactions());
294314

295-
checkTransaction(childTransaction, exchange);
315+
String transactionNameSuffix = !routingKey.equals(TEST_ROUTING_KEY) ? routingKey: exchange;
316+
checkTransaction(childTransaction, exchange, transactionNameSuffix, "RabbitMQ");
296317

297318
Span span = getReporter().getSpans().get(0);
298-
checkSendSpan(span, exchange);
319+
checkSendSpan(span, exchange, transactionNameSuffix);
299320

300321
// span should be child of the first transaction
301322
checkParentChild(rootTransaction, span);
@@ -306,12 +327,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
306327
Message spanMessage = span.getContext().getMessage();
307328
Message transactionMessage = childTransaction.getContext().getMessage();
308329

309-
310330
// test-specific assertions on captured message
311331
messageCheck.accept(transactionMessage, spanMessage);
312-
313332
}
314333

334+
315335
@Test
316336
void testPollingWithinTransactionNoMessage() throws IOException {
317337
Channel channel = connection.createChannel();
@@ -395,7 +415,7 @@ void testPollingIgnoreExchangeName() throws IOException {
395415
private String declareAndBindQueue(String queue, String exchange, Channel channel) {
396416
try {
397417
channel.queueDeclare(queue, false, false, false, null);
398-
channel.queueBind(queue, exchange, ROUTING_KEY);
418+
channel.queueBind(queue, exchange, TEST_ROUTING_KEY);
399419
return queue;
400420
} catch (IOException e) {
401421
throw new IllegalStateException(e);
@@ -413,7 +433,7 @@ private void pollingTest(boolean withinTransaction, boolean withResult, Supplier
413433
}
414434

415435
if (withResult) {
416-
channel.basicPublish(exchange, ROUTING_KEY, emptyProperties(), MSG);
436+
channel.basicPublish(exchange, TEST_ROUTING_KEY, emptyProperties(), MSG);
417437
}
418438
channel.basicGet(queue, true);
419439

@@ -573,9 +593,9 @@ static Transaction getNonRootTransaction(Transaction rootTransaction, List<Trans
573593
return childTransaction;
574594
}
575595

576-
private String createQueue(Channel channel, String exchange) throws IOException {
596+
private String createQueue(Channel channel, String exchange, String routingKey) throws IOException {
577597
String queueName = channel.queueDeclare().getQueue();
578-
channel.queueBind(queueName, exchange, ROUTING_KEY);
598+
channel.queueBind(queueName, exchange, routingKey);
579599
return queueName;
580600
}
581601

@@ -618,9 +638,13 @@ private static void checkTransaction(Transaction transaction, String exchange) {
618638
}
619639

620640
static void checkTransaction(Transaction transaction, String exchange, String frameworkName) {
641+
checkTransaction(transaction, exchange, exchange, frameworkName);
642+
}
643+
644+
static void checkTransaction(Transaction transaction, String exchange, String transactionNameSuffix, String frameworkName) {
621645
assertThat(transaction.getType()).isEqualTo("messaging");
622646
assertThat(transaction.getNameAsString())
623-
.isEqualTo("RabbitMQ RECEIVE from %s", exchange.isEmpty() ? "<default>" : exchange);
647+
.isEqualTo("RabbitMQ RECEIVE from %s", transactionNameSuffix.isEmpty() ? "<default>" : transactionNameSuffix);
624648
assertThat(transaction.getFrameworkName()).isEqualTo(frameworkName);
625649

626650
assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS);
@@ -682,14 +706,23 @@ private static HashMap<String, String> getHeadersMap(Message message) {
682706
}
683707

684708
private static void checkSendSpan(Span span, String exchange) {
685-
checkSendSpan(span, exchange, connection.getAddress().getHostAddress(), connection.getPort());
709+
checkSendSpan(span, exchange, exchange, connection.getAddress().getHostAddress(), connection.getPort());
710+
}
711+
712+
private static void checkSendSpan(Span span, String exchange, String spanNameSuffix) {
713+
checkSendSpan(span, exchange, spanNameSuffix, connection.getAddress().getHostAddress(), connection.getPort());
686714
}
687715

688716
static void checkSendSpan(Span span, String exchange, String host, int port) {
717+
checkSendSpan(span, exchange, exchange, host, port);
718+
}
719+
720+
static void checkSendSpan(Span span, String exchange, String spanNameSuffix, String host, int port) {
689721
String exchangeName = exchange.isEmpty() ? "<default>" : exchange;
722+
String spanName = spanNameSuffix.isEmpty() ? "<default>" : spanNameSuffix;
690723
checkSpanCommon(span,
691724
"send",
692-
String.format("RabbitMQ SEND to %s", exchangeName),
725+
String.format("RabbitMQ SEND to %s", spanName),
693726
exchangeName,
694727
true
695728
);

apm-agent-tracer/src/main/java/co/elastic/apm/agent/tracer/configuration/MessagingConfiguration.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
111111
private final ConfigurationOption<RabbitMQNamingMode> rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class)
112112
.key("rabbitmq_naming_mode")
113113
.configurationCategory(MESSAGING_CATEGORY)
114-
.description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.\n" +
115-
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp."
114+
.description("Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.\n" +
115+
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client."
116116
)
117117
.dynamic(true)
118118
.tags("added[1.46.0]")
@@ -187,5 +187,9 @@ public enum RabbitMQNamingMode {
187187
* Use queue in transaction names
188188
*/
189189
QUEUE,
190+
/**
191+
* Use routing key in transaction names
192+
*/
193+
ROUTING_KEY
190194
}
191195
}

docs/configuration.asciidoc

+6-6
Original file line numberDiff line numberDiff line change
@@ -2513,12 +2513,12 @@ Starting from version 1.43.0, the classes that are part of the 'application_pack
25132513
[[config-rabbitmq-naming-mode]]
25142514
==== `rabbitmq_naming_mode` (added[1.46.0])
25152515

2516-
Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
2517-
Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
2516+
Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
2517+
Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.
25182518

25192519
<<configuration-dynamic, image:./images/dynamic-config.svg[] >>
25202520

2521-
Valid options: `EXCHANGE`, `QUEUE`
2521+
Valid options: `EXCHANGE`, `QUEUE`, `ROUTING_KEY`
25222522

25232523
[options="header"]
25242524
|============
@@ -4703,10 +4703,10 @@ Example: `5ms`.
47034703
#
47044704
# jms_listener_packages=
47054705
4706-
# Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
4707-
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
4706+
# Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
4707+
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.
47084708
#
4709-
# Valid options: EXCHANGE, QUEUE
4709+
# Valid options: EXCHANGE, QUEUE, ROUTING_KEY
47104710
# This setting can be changed at runtime
47114711
# Type: RabbitMQNamingMode
47124712
# Default value: EXCHANGE

0 commit comments

Comments
 (0)