Skip to content

Commit 32b9a99

Browse files
awellesspvillard31
authored andcommitted
NIFI-14543 Add record counter in ConsumeBoxEnterpriseEvents
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9920.
1 parent 5aaf365 commit 32b9a99

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class ConsumeBoxEnterpriseEvents extends AbstractProcessor {
7474

7575
private static final int LIMIT = 500;
7676

77+
static final String COUNTER_RECORDS_PROCESSED = "Records Processed";
78+
7779
static final PropertyDescriptor EVENT_TYPES = new PropertyDescriptor.Builder()
7880
.name("Event Types")
7981
.description("A comma separated list of Enterprise Events to consume. If not set, all Events are consumed." +
@@ -220,6 +222,7 @@ private void writeLogAsRecords(final EventLog eventLog, final ProcessSession ses
220222
throw new ProcessException("Failed to write Box Event into a FlowFile", e);
221223
}
222224

225+
session.adjustCounter(COUNTER_RECORDS_PROCESSED, eventLog.getSize(), false);
223226
session.putAttribute(flowFile, "record.count", String.valueOf(eventLog.getSize()));
224227
session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
225228
session.transfer(flowFile, REL_SUCCESS);

nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ void testConsumeEvents(
9292
.toList();
9393

9494
assertEquals(expectedEventIds, eventIds);
95+
96+
assertEquals(eventIds.size(), testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
9597
}
9698

9799
static List<Arguments> dataFor_testConsumeEvents() {
@@ -129,6 +131,7 @@ void testGracefulTermination() throws InterruptedException {
129131
assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS), "Processor did not stop gracefully");
130132

131133
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS, consumedEvents.get());
134+
assertEquals(consumedEvents.get(), testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
132135
} finally {
133136
// We can't use try with resources, as Executors use a shutdown method
134137
// which indefinitely waits for submitted tasks.

0 commit comments

Comments
 (0)