Skip to content

Conversation

apoorvmittal10
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 commented Aug 21, 2025

The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:

if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
     lastAcquiredOffset = lastOffset;
}

While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.

Reviewers: Andrew Schofield aschofield@confluent.io, Abhinav Dixit
adixit@confluent.io, Chirag Wadhwa cwadhwa@confluent.io

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Aug 21, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Aug 21, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've done an initial review and the testing looks comprehensive. I want to take another pass on the SharePartition code.


// Create a single batch record that covers the entire range from 10 to 30 of initial read gap.
// The records in the batch are from 10 to 49.
MemoryRecords records = memoryRecords(40, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: One thing that makes this a bit harder than necessary to review is the inconsistency in the conventions about the offset ranges. For example, this could read memoryRecords(10,49) which would align with the firstOffset, lastOffset convention used in the persister. Not something that needs to be fixed on this PR, but potentially something to refactor later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I ll refactor in subsequent PR.

assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but we have filled the existing gaps, whatever is left is not actually a gap, right? So ideally the code should have been done in a way that this is null, wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialReadGapOffset is not intelligent enough to detect all individual gaps in the cachedState. Instead, it remains active until the cached state has been checked for gaps until the offset initialReadGapOffset.endOffset. Additionally, its purpose is to only track the gaps introduced by the persister.readState request. So, if we acquire new batches post initialReadGapOffset.endOffset which makes initialReadGapOffset as null, after this point new gaps maybe introduced to the cached state as per the natural gaps in the underlying partition, but initialReadGapOffset is not responsible for them.

So as per the example above, in the subsequent fetch requests, when the remaining records 29 and 30 are acquired again, the code will have reached initialReadGapOffset.endOffset (which is 30 in this case), and initialReadGapOffset will then be set to null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the endOffset is 30 but gapOffset is still 28. Also it will happen when endOffset is past gap's endOffset. So when anything further new acquired then the gap tracking will become null.

Copy link
Contributor

@adixitconfluent adixitconfluent Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, the variable initialGapOffsetReadWindow is a better variable name, if it makes sense @apoorvmittal10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, we are gonna make the variable name change in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I ll open next PR with following changes:

`persisterReadResultGapWindow` as variable name and class name as `GapWindow`

Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR. Left some minor comments.

// If the initial read gap offset window is active then it's not guaranteed that the
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
// last offset is greater than the last offset for acquisition, else there could be
// a situation where the batch overlaps with the initial read gap offset window batch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I think a small example here would be better, like the one that was provided above for gapStartOffset update change in nextFetchOffset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good sugestion, done.

MEMBER_ID,
BATCH_SIZE,
1,
DEFAULT_FETCH_OFFSET,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe it should not be DEFAULT_FETCH_OFFSET, but 10, since that is the nextFetchOffset in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good to set to the one where fetch happened, done.

MEMBER_ID,
BATCH_SIZE,
10,
DEFAULT_FETCH_OFFSET,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, here it should be 15 as per the new nextFetchOffset value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

MEMBER_ID,
BATCH_SIZE,
1,
DEFAULT_FETCH_OFFSET,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, here it should be 31

assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state());
assertEquals(30L, sharePartition.endOffset());
assertNotNull(sharePartition.initialReadGapOffset());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialReadGapOffset is not intelligent enough to detect all individual gaps in the cachedState. Instead, it remains active until the cached state has been checked for gaps until the offset initialReadGapOffset.endOffset. Additionally, its purpose is to only track the gaps introduced by the persister.readState request. So, if we acquire new batches post initialReadGapOffset.endOffset which makes initialReadGapOffset as null, after this point new gaps maybe introduced to the cached state as per the natural gaps in the underlying partition, but initialReadGapOffset is not responsible for them.

So as per the example above, in the subsequent fetch requests, when the remaining records 29 and 30 are acquired again, the code will have reached initialReadGapOffset.endOffset (which is 30 in this case), and initialReadGapOffset will then be set to null.

Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@apoorvmittal10 apoorvmittal10 merged commit 49ee1fb into apache:trunk Aug 26, 2025
25 checks passed
@apoorvmittal10 apoorvmittal10 deleted the KAFKA-19632 branch August 26, 2025 12:51
apoorvmittal10 added a commit that referenced this pull request Aug 27, 2025
As per the suggestion by @adixitconfluent and @chirag-wadhwa5,

[here](#20395 (comment)),
I have refactored the code with variable and method names.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa
<cwadhwa@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants