-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support es sink partial update #20048
base: main
Are you sure you want to change the base?
Conversation
@@ -114,6 +117,7 @@ impl ElasticSearchOpenSearchFormatter { | |||
} | |||
|
|||
pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<Vec<BuildBulkPara>> { | |||
let mut update_map: HashMap<String, RowRef<'_>> = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this map is unnecessary because UpdateDelete
and UpdateInsert
must be adjacent to each other so we only need to cache one row for the check.
&& let Some(delete_column) = delete_column | ||
&& insert_column == delete_column | ||
{ | ||
col_indices.retain(|&x| x != index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more efficient to populate the new col_indices
proactively instead of calling retain
on the full list for each column. Example:
if let Some(insert_column) = insert_column
&& let Some(delete_column) = delete_column
&& insert_column == delete_column {
// do nothing
} else {
modified_col_indices.push_back(index);
}
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
We will send all the columns every time we update, which will cause the load of es to become high, after this pr, es sink will judge the columns of update, and only send the modified columns
Checklist
Documentation
Release note