diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 28086e8281887..862613a22560c 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." "new-consumer", "Use the new consumer implementation." - "consumer.config", "Consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output @@ -83,10 +83,14 @@ def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANC def args(self, version): """Dictionary of arguments used to start the Consumer Performance script.""" args = { - 'topic': self.topic, - 'messages': self.messages + 'topic': self.topic } + if version.supports_command_config(): + args['num-records'] = self.messages + else: + args['messages'] = self.messages + if version < V_2_5_0: args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) else: @@ -115,7 +119,10 @@ def start_cmd(self, node): for key, value in self.args(node.version).items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE + if node.version.supports_command_config(): + cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE + else: + cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) @@ -133,11 +140,14 @@ def _worker(self, idx, node): self.security_config.setup_node(node) cmd = self.start_cmd(node) - self.logger.debug("Consumer performance %d command: %s", idx, cmd) + self.logger.error("Consumer performance %d command: %s", idx, cmd) last = None for line in node.account.ssh_capture(cmd): + self.logger.error("Consumer performance %d: %s", idx, line) last = line + self.logger.error("Consumer performance %d last line: %s", idx, last) + # Parse and save the last line's information if last is not None: parts = last.split(',') diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py b/tests/kafkatest/services/performance/share_consumer_performance.py index ccb0952458009..1e68e583705d0 100644 --- a/tests/kafkatest/services/performance/share_consumer_performance.py +++ b/tests/kafkatest/services/performance/share_consumer_performance.py @@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService): "socket-buffer-size", "The size of the tcp RECV size." - "consumer.config", "Consumer config properties file." + "command-config", "Config properties file." """ # Root directory for persistent output @@ -100,7 +100,7 @@ def start_cmd(self, node): for key, value in self.args().items(): cmd += " --%s %s" % (key, value) - cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE + cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE for key, value in self.settings.items(): cmd += " %s=%s" % (str(key), str(value)) diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 62def15d32478..0334af83aa132 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -48,6 +48,7 @@ import joptsimple.OptionSpec; import static joptsimple.util.RegexMatcher.regex; +import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs; public class ConsumerPerformance { private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class); @@ -61,7 +62,7 @@ static void run(String[] args, Function> co try { LOG.info("Starting consumer..."); ConsumerPerfOptions options = new ConsumerPerfOptions(args); - AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalRecordsRead = new AtomicLong(0); AtomicLong totalBytesRead = new AtomicLong(0); AtomicLong joinTimeMs = new AtomicLong(0); AtomicLong joinTimeMsInSingleRound = new AtomicLong(0); @@ -71,14 +72,14 @@ static void run(String[] args, Function> co try (Consumer consumer = consumerCreator.apply(options.props())) { long bytesRead = 0L; - long messagesRead = 0L; + long recordsRead = 0L; long lastBytesRead = 0L; - long lastMessagesRead = 0L; + long lastRecordsRead = 0L; long currentTimeMs = System.currentTimeMillis(); long joinStartMs = currentTimeMs; long startMs = currentTimeMs; - consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, - bytesRead, messagesRead, lastBytesRead, lastMessagesRead, + consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs, + bytesRead, recordsRead, lastBytesRead, lastRecordsRead, joinStartMs, joinTimeMsInSingleRound); long endMs = System.currentTimeMillis(); @@ -92,12 +93,12 @@ static void run(String[] args, Function> co options.dateFormat().format(endMs), totalMbRead, totalMbRead / elapsedSec, - totalMessagesRead.get(), - totalMessagesRead.get() / elapsedSec, + totalRecordsRead.get(), + totalRecordsRead.get() / elapsedSec, joinTimeMs.get(), fetchTimeInMs, totalMbRead / (fetchTimeInMs / 1000.0), - totalMessagesRead.get() / (fetchTimeInMs / 1000.0) + totalRecordsRead.get() / (fetchTimeInMs / 1000.0) ); } @@ -122,16 +123,16 @@ protected static void printHeader(boolean showDetailedStats) { private static void consume(Consumer consumer, ConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, AtomicLong joinTimeMs, long bytesRead, - long messagesRead, + long recordsRead, long lastBytesRead, - long lastMessagesRead, + long lastRecordsRead, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); long reportingIntervalMs = options.reportingIntervalMs(); boolean showDetailedStats = options.showDetailedStats(); @@ -149,55 +150,55 @@ private static void consume(Consumer consumer, long lastReportTimeMs = currentTimeMs; long lastConsumedTimeMs = currentTimeMs; - while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { + while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesRead += 1; + recordsRead += 1; if (record.key() != null) bytesRead += record.key().length; if (record.value() != null) bytesRead += record.value().length; if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) { if (showDetailedStats) - printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, + printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get()); joinTimeMsInSingleRound = new AtomicLong(0); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesRead; + lastRecordsRead = recordsRead; lastBytesRead = bytesRead; } } } - if (messagesRead < numMessages) - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead < numRecords) + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); - totalMessagesRead.set(messagesRead); + totalRecordsRead.set(recordsRead); totalBytesRead.set(bytesRead); } protected static void printConsumerProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat, long joinTimeMsInSingleRound) { - printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat); - printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound); + printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat); + printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound); System.out.println(); } private static void printBasicProgress(int id, long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat) { @@ -205,25 +206,25 @@ private static void printBasicProgress(int id, double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs; - double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0; + double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0; System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, - totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec); + totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec); } private static void printExtendedProgress(long bytesRead, long lastBytesRead, - long messagesRead, - long lastMessagesRead, + long recordsRead, + long lastRecordsRead, long startMs, long endMs, long joinTimeMsInSingleRound) { long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound; double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); - long intervalMessagesRead = messagesRead - lastMessagesRead; + long intervalRecordsRead = recordsRead - lastRecordsRead; double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs; - double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs; + double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs; System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, - fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec); + fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec); } public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { @@ -256,13 +257,18 @@ protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec includeOpt; private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; + private final OptionSpec commandPropertiesOpt; private final OptionSpec resetBeginningOffsetOpt; private final OptionSpec socketBufferSizeOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -291,26 +297,41 @@ public ConsumerPerfOptions(String[] args) { .describedAs("size") .ofType(Integer.class) .defaultsTo(1024 * 1024); + commandPropertiesOpt = parser.accepts("command-property", "Kafka consumer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --consumer.config.") + .withRequiredArg() + .describedAs("prop1=val1") + .ofType(String.class); resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message."); + "offset to consume from, start with the latest record present in the log rather than the earliest record."); socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") .withRequiredArg() .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + + "This option will be removed in a future version. Use --num-records instead.") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -326,7 +347,7 @@ public ConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); try { options = parser.parse(args); } catch (OptionException e) { @@ -335,8 +356,19 @@ public ConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); + + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } } } @@ -348,10 +380,23 @@ public String brokerHostsAndPorts() { return options.valueOf(bootstrapServerOpt); } + private Properties readProps(List commandProperties, String commandConfigFile) throws IOException { + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) + : new Properties(); + props.putAll(parseKeyValueArgs(commandProperties)); + return props; + } + public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) - : new Properties(); + List commandProperties = options.valuesOf(commandPropertiesOpt); + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = readProps(commandProperties, commandConfigFile); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString()); @@ -378,8 +423,10 @@ public Optional include() { : Optional.empty(); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public long reportingIntervalMs() { diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index 5d6179efaeb3b..51c667046683c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -55,6 +55,7 @@ import joptsimple.OptionSpec; import static joptsimple.util.RegexMatcher.regex; +import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs; public class ShareConsumerPerformance { private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class); @@ -67,7 +68,7 @@ static void run(String[] args, Function> shareConsumersMetrics = new ArrayList<>(); @@ -93,7 +94,7 @@ static void run(String[] args, Function> shareConsumers, ShareConsumerPerfOptions options, - AtomicLong totalMessagesRead, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, long startMs) throws ExecutionException, InterruptedException { - long numMessages = options.numMessages(); + long numRecords = options.numRecords(); long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); // Now start the benchmark. - AtomicLong messagesRead = new AtomicLong(0); + AtomicLong recordsRead = new AtomicLong(0); AtomicLong bytesRead = new AtomicLong(0); List shareConsumersConsumptionDetails = new ArrayList<>(); @@ -133,7 +134,7 @@ private static void consume(List> shareConsumers, ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); futures.add(executorService.submit(() -> { try { - consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, + consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead, bytesRead, options, shareConsumerConsumption, index + 1); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -171,22 +172,22 @@ private static void consume(List> shareConsumers, // Print stats for share consumer. double elapsedSec = (endMs - startMs) / 1_000.0; long fetchTimeInMs = endMs - startMs; - long messagesReadByConsumer = shareConsumersConsumptionDetails.get(index).messagesConsumed(); + long recordsReadByConsumer = shareConsumersConsumptionDetails.get(index).recordsConsumed(); long bytesReadByConsumer = shareConsumersConsumptionDetails.get(index).bytesConsumed(); - printStats(bytesReadByConsumer, messagesReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); + printStats(bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); } } - if (messagesRead.get() < numMessages) { - System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + if (recordsRead.get() < numRecords) { + System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); } - totalMessagesRead.set(messagesRead.get()); + totalRecordsRead.set(recordsRead.get()); totalBytesRead.set(bytesRead.get()); } - private static void consumeMessagesForSingleShareConsumer(ShareConsumer shareConsumer, - AtomicLong totalMessagesRead, + private static void consumeRecordsForSingleShareConsumer(ShareConsumer shareConsumer, + AtomicLong totalRecordsRead, AtomicLong totalBytesRead, ShareConsumerPerfOptions options, ShareConsumerConsumption shareConsumerConsumption, @@ -197,17 +198,17 @@ private static void consumeMessagesForSingleShareConsumer(ShareConsumer records = shareConsumer.poll(Duration.ofMillis(100)); currentTimeMs = System.currentTimeMillis(); if (!records.isEmpty()) lastConsumedTimeMs = currentTimeMs; for (ConsumerRecord record : records) { - messagesReadByConsumer += 1; - totalMessagesRead.addAndGet(1); + recordsReadByConsumer += 1; + totalRecordsRead.addAndGet(1); if (record.key() != null) { bytesReadByConsumer += record.key().length; totalBytesRead.addAndGet(record.key().length); @@ -218,13 +219,13 @@ private static void consumeMessagesForSingleShareConsumer(ShareConsumer= options.reportingIntervalMs()) { if (options.showDetailedStats()) - printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, messagesReadByConsumer, lastMessagesRead, + printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, recordsReadByConsumer, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, index); lastReportTimeMs = currentTimeMs; - lastMessagesRead = messagesReadByConsumer; + lastRecordsRead = recordsReadByConsumer; lastBytesRead = bytesReadByConsumer; } - shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer); + shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer); shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer); } } @@ -232,8 +233,8 @@ private static void consumeMessagesForSingleShareConsumer(ShareConsumer= 1. private static void printStats(long bytesRead, - long messagesRead, + long recordsRead, double elapsedSec, long fetchTimeInMs, long startMs, @@ -268,8 +269,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); return; @@ -279,8 +280,8 @@ private static void printStats(long bytesRead, dateFormat.format(endMs), totalMbRead, totalMbRead / elapsedSec, - messagesRead / elapsedSec, - messagesRead, + recordsRead / elapsedSec, + recordsRead, fetchTimeInMs ); } @@ -290,12 +291,17 @@ protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec topicOpt; private final OptionSpec groupIdOpt; private final OptionSpec fetchSizeOpt; + private final OptionSpec commandPropertiesOpt; private final OptionSpec socketBufferSizeOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec consumerConfigOpt; + private final OptionSpec commandConfigOpt; private final OptionSpec printMetricsOpt; private final OptionSpec showDetailedStatsOpt; private final OptionSpec recordFetchTimeoutOpt; + @Deprecated(since = "4.2", forRemoval = true) private final OptionSpec numMessagesOpt; + private final OptionSpec numRecordsOpt; private final OptionSpec reportingIntervalOpt; private final OptionSpec dateFormatOpt; private final OptionSpec hideHeaderOpt; @@ -322,24 +328,39 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("size") .ofType(Integer.class) .defaultsTo(1024 * 1024); + commandPropertiesOpt = parser.accepts("command-property", "Kafka share consumer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --consumer.config.") + .withRequiredArg() + .describedAs("prop1=val1") + .ofType(String.class); socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") .withRequiredArg() .describedAs("size") .ofType(Integer.class) .defaultsTo(2 * 1024 * 1024); - consumerConfigOpt = parser.accepts("consumer.config", "Share consumer config properties file.") + consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Share consumer config properties file. " + + "This option will be removed in a future version. Use --command-config instead.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Config properties file.") .withRequiredArg() .describedAs("config file") .ofType(String.class); printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval"); + "interval as configured by reporting-interval."); recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") .withOptionalArg() .describedAs("milliseconds") .ofType(Long.class) .defaultsTo(10_000L); - numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.") + numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " + + "This option will be removed in a future version. Use --num-records instead.") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.") .withRequiredArg() .describedAs("count") .ofType(Long.class); @@ -355,7 +376,7 @@ public ShareConsumerPerfOptions(String[] args) { .describedAs("date format") .ofType(String.class) .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); - hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats."); numThreadsOpt = parser.accepts("threads", "The number of share consumers to use for sharing the load.") .withRequiredArg() .describedAs("count") @@ -371,7 +392,18 @@ public ShareConsumerPerfOptions(String[] args) { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, bootstrapServerOpt); + + CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt); + + if (options.has(numMessagesOpt)) { + System.out.println("Warning: --messages is deprecated. Use --num-records instead."); + } + + if (options.has(consumerConfigOpt)) { + System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead."); + } } } @@ -383,10 +415,23 @@ public String brokerHostsAndPorts() { return options.valueOf(bootstrapServerOpt); } - public Properties props() throws IOException { - Properties props = (options.has(consumerConfigOpt)) - ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + private Properties readProps(List commandProperties, String commandConfigFile) throws IOException { + Properties props = commandConfigFile != null + ? Utils.loadProps(commandConfigFile) : new Properties(); + props.putAll(parseKeyValueArgs(commandProperties)); + return props; + } + + public Properties props() throws IOException { + List commandProperties = options.valuesOf(commandPropertiesOpt); + String commandConfigFile; + if (options.has(consumerConfigOpt)) { + commandConfigFile = options.valueOf(consumerConfigOpt); + } else { + commandConfigFile = options.valueOf(commandConfigOpt); + } + Properties props = readProps(commandProperties, commandConfigFile); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString()); @@ -403,8 +448,10 @@ public Set topic() { return Set.of(options.valueOf(topicOpt)); } - public long numMessages() { - return options.valueOf(numMessagesOpt); + public long numRecords() { + return options.has(numMessagesOpt) + ? options.valueOf(numMessagesOpt) + : options.valueOf(numRecordsOpt); } public int threads() { @@ -439,26 +486,26 @@ public long recordFetchTimeoutMs() { } } - // Helper class to know the final messages and bytes consumer by share consumer at the end of consumption. + // Helper class to know the final records and bytes consumed by share consumer at the end of consumption. private static class ShareConsumerConsumption { - private long messagesConsumed; + private long recordsConsumed; private long bytesConsumed; - public ShareConsumerConsumption(long messagesConsumed, long bytesConsumed) { - this.messagesConsumed = messagesConsumed; + public ShareConsumerConsumption(long recordsConsumed, long bytesConsumed) { + this.recordsConsumed = recordsConsumed; this.bytesConsumed = bytesConsumed; } - public long messagesConsumed() { - return messagesConsumed; + public long recordsConsumed() { + return recordsConsumed; } public long bytesConsumed() { return bytesConsumed; } - public void updateMessagesConsumed(long messagesConsumed) { - this.messagesConsumed = messagesConsumed; + public void updateRecordsConsumed(long recordsConsumed) { + this.recordsConsumed = recordsConsumed; } public void updateBytesConsumed(long bytesConsumed) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index df6c3a93966ab..497deb7808ddf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -74,7 +74,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -82,15 +82,64 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().get().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test - public void testConfigWithUnrecognizedOption() { + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + + @Test + public void testMessagesDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + + @Test + public void testNumOfRecordsWithMessagesPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--new-consumer" }; @@ -104,14 +153,14 @@ public void testConfigWithInclude() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--include", "test.*", - "--messages", "10" + "--num-records", "10" }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.include().get().toString().contains("test.*")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test @@ -120,7 +169,7 @@ public void testConfigWithTopicAndInclude() { "--bootstrap-server", "localhost:9092", "--topic", "test", "--include", "test.*", - "--messages", "10" + "--num-records", "10" }; String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); @@ -132,7 +181,7 @@ public void testConfigWithTopicAndInclude() { public void testConfigWithoutTopicAndInclude() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", - "--messages", "10" + "--num-records", "10" }; String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); @@ -140,9 +189,36 @@ public void testConfigWithoutTopicAndInclude() { assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]")); } + @Test + public void testCommandProperty() throws IOException { + Path configPath = tempDir.resolve("test_command_property_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--command-property", "client.id=consumer-2", + "--command-config", tempFile.getAbsolutePath(), + "--command-property", "prop=val" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("consumer-2", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + assertEquals("val", config.props().getProperty("prop")); + } + @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_client_id_override_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=consumer-1"); output.flush(); @@ -151,7 +227,29 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--consumer.config", tempFile.getAbsolutePath() }; @@ -160,12 +258,28 @@ public void testClientIdOverride() throws IOException { assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); } + @Test + public void testCommandConfigWithConsumerConfigPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ConsumerPerformance.ConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); @@ -178,7 +292,7 @@ public void testMetricsRetrievedBeforeConsumerClosed() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "0", + "--num-records", "0", "--print-metrics" }; diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java index a22a97f8211bc..6cffde627bb4a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -67,7 +67,7 @@ public void testConfigBootStrapServer() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", + "--num-records", "10", "--print-metrics" }; @@ -75,15 +75,65 @@ public void testConfigBootStrapServer() { assertEquals("localhost:9092", config.brokerHostsAndPorts()); assertTrue(config.topic().contains("test")); - assertEquals(10, config.numMessages()); + assertEquals(10, config.numRecords()); } @Test - public void testConfigWithUnrecognizedOption() { + public void testBootstrapServerNotPresent() { + String[] args = new String[]{ + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Missing required argument \"[bootstrap-server]\"")); + } + + @Test + public void testNumOfRecordsNotPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required:")); + } + + @Test + public void testMessagesDeprecated() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + assertEquals(10, config.numRecords()); + } + + @Test + public void testNumOfRecordsWithMessagesPresent() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", "--messages", "10", + "--num-records", "20" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains("Exactly one of the following arguments is required")); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", "--new-share-consumer" }; @@ -92,9 +142,36 @@ public void testConfigWithUnrecognizedOption() { assertTrue(err.contains("new-share-consumer is not a recognized option")); } + @Test + public void testCommandProperty() throws IOException { + Path configPath = tempDir.resolve("test_command_property_share_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--command-property", "client.id=consumer-2", + "--command-config", tempFile.getAbsolutePath(), + "--command-property", "prop=val" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("consumer-2", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + assertEquals("val", config.props().getProperty("prop")); + } + @Test public void testClientIdOverride() throws IOException { - File tempFile = Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile(); + Path configPath = tempDir.resolve("test_client_id_override_share_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { output.println("client.id=share-consumer-1"); output.flush(); @@ -103,8 +180,8 @@ public void testClientIdOverride() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10", - "--consumer.config", tempFile.getAbsolutePath() + "--num-records", "10", + "--command-config", tempFile.getAbsolutePath() }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); @@ -112,12 +189,51 @@ public void testClientIdOverride() throws IOException { assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); } + @Test + public void testConsumerConfigDeprecated() throws IOException { + Path configPath = tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf"); + Files.deleteIfExists(configPath); + File tempFile = Files.createFile(configPath).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=share-consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", tempFile.getAbsolutePath() + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = + new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testCommandConfigWithConsumerConfigPresent() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--num-records", "10", + "--consumer.config", "some-path", + "--command-config", "some-path" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> + new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"", + "[consumer.config]", "[command-config]"))); + } + @Test public void testDefaultClientId() throws IOException { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "10" + "--num-records", "10" }; ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); @@ -130,7 +246,7 @@ public void testMetricsRetrievedBeforeConsumerClosed() { String[] args = new String[]{ "--bootstrap-server", "localhost:9092", "--topic", "test", - "--messages", "0", + "--num-records", "0", "--print-metrics" };