-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51690][SS] Change the protocol of ListState.put()/get()/appendList() from Arrow to simple custom protocol #50488
base: master
Are you sure you want to change the base?
Conversation
cc. @anishshri-db @bogao007 for review, thanks! |
@@ -143,7 +131,7 @@ def append_list(self, state_name: str, values: List[Tuple]) -> None: | |||
|
|||
self._stateful_processor_api_client._send_proto_message(message.SerializeToString()) | |||
|
|||
self._stateful_processor_api_client._send_arrow_state(self.schema, values) | |||
self._stateful_processor_api_client._send_list_state(self.schema, values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we add a TODO for other places we might change this in the future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I can double confirm with benchmarking and address the other parts. This part I did the benchmark and we have to be backed by the number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we use the same framework for MapState as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, just some nits.
@@ -41,6 +41,7 @@ def __init__( | |||
# A dictionary to store the mapping between list state name and a tuple of pandas DataFrame | |||
# and the index of the last row that was read. | |||
self.pandas_df_dict: Dict[str, Tuple["PandasDataFrameLike", int]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is pandas_df_dict
still being used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I missed that this is "list" state client. My bad.
@@ -143,7 +131,7 @@ def append_list(self, state_name: str, values: List[Tuple]) -> None: | |||
|
|||
self._stateful_processor_api_client._send_proto_message(message.SerializeToString()) | |||
|
|||
self._stateful_processor_api_client._send_arrow_state(self.schema, values) | |||
self._stateful_processor_api_client._send_list_state(self.schema, values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we use the same framework for MapState as well
@HyukjinKwon Could you please take a look? Thanks! |
What changes were proposed in this pull request?
This PR proposes to get rid of usage for Arrow on sending multiple elements of ListState and replace it with simple custom protocol.
The custom protocol we are proposing is super simple and widely used already.
Note that this PR only makes change to ListState - we are aware that there are more usages of Arrow in other state types or other functionality (timer). We want to improve over time via benchmarking and addressing if it shows the latency implication.
Why are the changes needed?
For small number of elements, Arrow does not perform very well compared to the custom protocol. In the benchmark, we have three elements to exchange between Python worker and JVM, and replacing Arrow with custom protocol could cut the elapsed time on state interaction by 1/3.
Given the natural performance diff between Scala version of transformWithState and PySpark version of transformWithStateInPandas, I think users must use the Scala version to handle noticeable volume of workloads. We can position PySpark version to aim for more lightweight workloads - we can revisit if we see the opposite demands.
Does this PR introduce any user-facing change?
No, it's an internal change.
How was this patch tested?
Existing UT, with modification about mock expectation.
Was this patch authored or co-authored using generative AI tooling?
No.