Skip to content

Commit a978074

Browse files
committed
Revert "KAFKA-13722: remove usage of old ProcessorContext (#18292)"
This reverts commit f13a22a.
1 parent 2dbed66 commit a978074

34 files changed

+94
-108
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public void close() {
213213
private ValueAndTimestamp<KeyValue<? extends K1, ? extends V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
214214
return ValueAndTimestamp.make(
215215
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
216-
valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp()
216+
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp()
217217
);
218218
}
219219
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
8484

8585
@Override
8686
public <KIn, VIn> void forward(final KIn key, final VIn value) {
87-
forward(new Record<>(key, value, recordContext().timestamp(), headers()));
87+
forward(new Record<>(key, value, timestamp(), headers()));
8888
}
8989

9090
/**

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
120120
final Record<Object, Object> toProcess = new Record<>(
121121
deserialized.key(),
122122
deserialized.value(),
123-
processorContext.recordContext().timestamp(),
124-
processorContext.recordContext().headers()
123+
processorContext.timestamp(),
124+
processorContext.headers()
125125
);
126126
((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
127127
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public <K, V> void forward(final K key,
190190
final Record<K, V> toForward = new Record<>(
191191
key,
192192
value,
193-
recordContext.timestamp(),
193+
timestamp(),
194194
headers()
195195
);
196196
forward(toForward);
@@ -204,7 +204,7 @@ public <K, V> void forward(final K key,
204204
final Record<K, V> toForward = new Record<>(
205205
key,
206206
value,
207-
toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(),
207+
toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
208208
headers()
209209
);
210210
forward(toForward, toInternal.child());
@@ -250,11 +250,11 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
250250
// old API processors wouldn't see the timestamps or headers of upstream
251251
// new API processors. But then again, from the perspective of those old-API
252252
// processors, even consulting the timestamp or headers when the record context
253-
// is undefined is itself not well-defined. Plus, I don't think we need to worry
253+
// is undefined is itself not well defined. Plus, I don't think we need to worry
254254
// too much about heterogeneous applications, in which the upstream processor is
255255
// implementing the new API and the downstream one is implementing the old API.
256256
// So, this seems like a fine compromise for now.
257-
if (recordContext != null && (record.timestamp() != recordContext.timestamp() || record.headers() != recordContext.headers())) {
257+
if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) {
258258
recordContext = new ProcessorRecordContext(
259259
record.timestamp(),
260260
recordContext.offset(),

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ public final class ProcessorContextUtils {
3434

3535
private ProcessorContextUtils() {}
3636

37+
/**
38+
* Should be removed as part of KAFKA-10217
39+
*/
40+
public static StreamsMetricsImpl metricsImpl(final ProcessorContext context) {
41+
return (StreamsMetricsImpl) context.metrics();
42+
}
43+
3744
/**
3845
* Should be removed as part of KAFKA-10217
3946
*/
@@ -68,7 +75,7 @@ public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(
6875
final ProcessorContext<K, V> context
6976
) {
7077
if (context instanceof InternalProcessorContext) {
71-
return (InternalProcessorContext<K, V>) context;
78+
return (InternalProcessorContext) context;
7279
} else {
7380
throw new IllegalArgumentException(
7481
"This component requires internal features of Kafka Streams and must be disabled for unit tests."

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,13 @@ public void process(final Record<KIn, VIn> record) {
209209
// (instead of `RuntimeException`) to work well with those languages
210210
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
211211
null, // only required to pass for DeserializationExceptionHandler
212-
internalProcessorContext.recordContext().topic(),
213-
internalProcessorContext.recordContext().partition(),
214-
internalProcessorContext.recordContext().offset(),
215-
internalProcessorContext.recordContext().headers(),
212+
internalProcessorContext.topic(),
213+
internalProcessorContext.partition(),
214+
internalProcessorContext.offset(),
215+
internalProcessorContext.headers(),
216216
internalProcessorContext.currentNode().name(),
217217
internalProcessorContext.taskId(),
218-
internalProcessorContext.recordContext().timestamp(),
218+
internalProcessorContext.timestamp());
219219
internalProcessorContext.recordContext().sourceRawKey(),
220220
internalProcessorContext.recordContext().sourceRawValue()
221221
);

streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ public void process(final Record<KIn, VIn> record) {
8585
final ProcessorRecordContext contextForExtraction =
8686
new ProcessorRecordContext(
8787
timestamp,
88-
context.recordContext().offset(),
89-
context.recordContext().partition(),
90-
context.recordContext().topic(),
88+
context.offset(),
89+
context.partition(),
90+
context.topic(),
9191
record.headers()
9292
);
9393

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -866,8 +866,8 @@ private void doProcess(final long wallClockTime) {
866866
final Record<Object, Object> toProcess = new Record<>(
867867
record.key(),
868868
record.value(),
869-
processorContext.recordContext().timestamp(),
870-
processorContext.recordContext().headers()
869+
processorContext.timestamp(),
870+
processorContext.headers()
871871
);
872872
maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);
873873

streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
5555
protected final Optional<KeySchema> indexKeySchema;
5656
private final long retentionPeriod;
5757

58-
protected InternalProcessorContext<?, ?> internalProcessorContext;
58+
protected InternalProcessorContext internalProcessorContext;
5959
private Sensor expiredRecordSensor;
6060
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
6161
protected boolean consistencyEnabled = false;

streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,12 @@ private void putInternal(final Bytes key,
272272
key,
273273
new LRUCacheEntry(
274274
value,
275-
internalContext.recordContext().headers(),
275+
internalContext.headers(),
276276
true,
277-
internalContext.recordContext().offset(),
278-
internalContext.recordContext().timestamp(),
279-
internalContext.recordContext().partition(),
280-
internalContext.recordContext().topic(),
277+
internalContext.offset(),
278+
internalContext.timestamp(),
279+
internalContext.partition(),
280+
internalContext.topic(),
281281
internalContext.recordContext().sourceRawKey(),
282282
internalContext.recordContext().sourceRawValue()
283283
)

0 commit comments

Comments
 (0)