-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19632: Handle overlap batch on partition re-assignment #20395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
356681f
to
b05ef87
Compare
b05ef87
to
d41b564
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I've done an initial review and the testing looks comprehensive. I want to take another pass on the SharePartition code.
|
||
// Create a single batch record that covers the entire range from 10 to 30 of initial read gap. | ||
// The records in the batch are from 10 to 49. | ||
MemoryRecords records = memoryRecords(40, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: One thing that makes this a bit harder than necessary to review is the inconsistency in the conventions about the offset ranges. For example, this could read memoryRecords(10,49)
which would align with the firstOffset, lastOffset
convention used in the persister. Not something that needs to be fixed on this PR, but potentially something to refactor later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I ll refactor in subsequent PR.
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); | ||
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); | ||
assertEquals(30L, sharePartition.endOffset()); | ||
assertNotNull(sharePartition.initialReadGapOffset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but we have filled the existing gaps, whatever is left is not actually a gap, right? So ideally the code should have been done in a way that this is null, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initialReadGapOffset
is not intelligent enough to detect all individual gaps in the cachedState. Instead, it remains active until the cached state has been checked for gaps until the offset initialReadGapOffset.endOffset
. Additionally, its purpose is to only track the gaps introduced by the persister.readState request. So, if we acquire new batches post initialReadGapOffset.endOffset
which makes initialReadGapOffset as null, after this point new gaps maybe introduced to the cached state as per the natural gaps in the underlying partition, but initialReadGapOffset
is not responsible for them.
So as per the example above, in the subsequent fetch requests, when the remaining records 29 and 30 are acquired again, the code will have reached initialReadGapOffset.endOffset
(which is 30 in this case), and initialReadGapOffset
will then be set to null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the endOffset is 30 but gapOffset is still 28. Also it will happen when endOffset is past gap's endOffset. So when anything further new acquired then the gap tracking will become null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps, the variable initialGapOffsetReadWindow
is a better variable name, if it makes sense @apoorvmittal10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, we are gonna make the variable name change in a different PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I ll open next PR with following changes:
`persisterReadResultGapWindow` as variable name and class name as `GapWindow`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR. Left some minor comments.
// If the initial read gap offset window is active then it's not guaranteed that the | ||
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's | ||
// last offset is greater than the last offset for acquisition, else there could be | ||
// a situation where the batch overlaps with the initial read gap offset window batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I think a small example here would be better, like the one that was provided above for gapStartOffset update change in nextFetchOffset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good sugestion, done.
MEMBER_ID, | ||
BATCH_SIZE, | ||
1, | ||
DEFAULT_FETCH_OFFSET, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I believe it should not be DEFAULT_FETCH_OFFSET, but 10, since that is the nextFetchOffset in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good to set to the one where fetch happened, done.
MEMBER_ID, | ||
BATCH_SIZE, | ||
10, | ||
DEFAULT_FETCH_OFFSET, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, here it should be 15 as per the new nextFetchOffset value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
MEMBER_ID, | ||
BATCH_SIZE, | ||
1, | ||
DEFAULT_FETCH_OFFSET, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, here it should be 31
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); | ||
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); | ||
assertEquals(30L, sharePartition.endOffset()); | ||
assertNotNull(sharePartition.initialReadGapOffset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initialReadGapOffset
is not intelligent enough to detect all individual gaps in the cachedState. Instead, it remains active until the cached state has been checked for gaps until the offset initialReadGapOffset.endOffset
. Additionally, its purpose is to only track the gaps introduced by the persister.readState request. So, if we acquire new batches post initialReadGapOffset.endOffset
which makes initialReadGapOffset as null, after this point new gaps maybe introduced to the cached state as per the natural gaps in the underlying partition, but initialReadGapOffset
is not responsible for them.
So as per the example above, in the subsequent fetch requests, when the remaining records 29 and 30 are acquired again, the code will have reached initialReadGapOffset.endOffset
(which is 30 in this case), and initialReadGapOffset
will then be set to null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
As per the suggestion by @adixitconfluent and @chirag-wadhwa5, [here](#20395 (comment)), I have refactored the code with variable and method names. Reviewers: Andrew Schofield <aschofield@confluent.io>, Chirag Wadhwa <cwadhwa@confluent.io>
The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:
While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.
Reviewers: Andrew Schofield aschofield@confluent.io, Abhinav Dixit
adixit@confluent.io, Chirag Wadhwa cwadhwa@confluent.io