Skip to content

Commit 63c45fb

Browse files
committed
Use shuffle when nunique is calculated
1 parent f833a7b commit 63c45fb

File tree

3 files changed

+57
-33
lines changed

3 files changed

+57
-33
lines changed

mars/dataframe/groupby/aggregation.py

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from ... import opcodes as OperandDef
2626
from ...config import options
2727
from ...core.custom_log import redirect_custom_log
28-
from ...core import ENTITY_TYPE, OutputType
28+
from ...core import ENTITY_TYPE, OutputType, recursive_tile
2929
from ...core.context import get_context
3030
from ...core.operand import OperandStage
3131
from ...serialization.serializables import (
@@ -64,6 +64,8 @@
6464

6565
_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)
6666

67+
_FUNCS_PREFER_SHUFFLE = {"nunique"}
68+
6769

6870
class SizeRecorder:
6971
def __init__(self):
@@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin):
163165
method = StringField("method")
164166
use_inf_as_na = BoolField("use_inf_as_na")
165167

168+
map_on_shuffle = AnyField("map_on_shuffle")
169+
166170
# for chunk
167171
combine_size = Int32Field("combine_size")
168172
chunk_store_limit = Int64Field("chunk_store_limit")
@@ -421,10 +425,29 @@ def _tile_with_shuffle(
421425
in_df: TileableType,
422426
out_df: TileableType,
423427
func_infos: ReductionSteps,
428+
agg_chunks: List[ChunkType] = None,
424429
):
425-
# First, perform groupby and aggregation on each chunk.
426-
agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos)
427-
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
430+
if op.map_on_shuffle is None:
431+
op.map_on_shuffle = all(
432+
agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs
433+
)
434+
435+
if not op.map_on_shuffle:
436+
groupby_params = op.groupby_params.copy()
437+
selection = groupby_params.pop("selection", None)
438+
groupby = in_df.groupby(**groupby_params)
439+
if selection:
440+
groupby = groupby[selection]
441+
result = groupby.transform(
442+
op.raw_func, _call_agg=True, index=out_df.index_value
443+
)
444+
return (yield from recursive_tile(result))
445+
else:
446+
# First, perform groupby and aggregation on each chunk.
447+
agg_chunks = agg_chunks or cls._gen_map_chunks(
448+
op, in_df.chunks, out_df, func_infos
449+
)
450+
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
428451

429452
@classmethod
430453
def _perform_shuffle(
@@ -624,8 +647,10 @@ def _tile_auto(
624647
else:
625648
# otherwise, use shuffle
626649
logger.debug("Choose shuffle method for groupby operand %s", op)
627-
return cls._perform_shuffle(
628-
op, chunks + left_chunks, in_df, out_df, func_infos
650+
return (
651+
yield from cls._tile_with_shuffle(
652+
op, in_df, out_df, func_infos, chunks + left_chunks
653+
)
629654
)
630655

631656
@classmethod
@@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"):
638663
func_infos = cls._compile_funcs(op, in_df)
639664

640665
if op.method == "auto":
641-
if len(in_df.chunks) <= op.combine_size:
666+
if set(op.func) & _FUNCS_PREFER_SHUFFLE:
667+
return (
668+
yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)
669+
)
670+
elif len(in_df.chunks) <= op.combine_size:
642671
return cls._tile_with_tree(op, in_df, out_df, func_infos)
643672
else:
644673
return (yield from cls._tile_auto(op, in_df, out_df, func_infos))
645674
if op.method == "shuffle":
646-
return cls._tile_with_shuffle(op, in_df, out_df, func_infos)
675+
return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos))
647676
elif op.method == "tree":
648677
return cls._tile_with_tree(op, in_df, out_df, func_infos)
649678
else: # pragma: no cover
@@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"):
10751104
pd.reset_option("mode.use_inf_as_na")
10761105

10771106

1078-
def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
1107+
def agg(
1108+
groupby,
1109+
func=None,
1110+
method="auto",
1111+
combine_size=None,
1112+
map_on_shuffle=None,
1113+
*args,
1114+
**kwargs,
1115+
):
10791116
"""
10801117
Aggregate using one or more operations on grouped data.
10811118
@@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
10911128
in distributed mode and use 'tree' in local mode.
10921129
combine_size : int
10931130
The number of chunks to combine when method is 'tree'
1094-
1131+
map_on_shuffle : bool
1132+
When not specified, will decide whether to perform aggregation on the
1133+
map stage of shuffle (currently no aggregation when there is custom
1134+
reduction in functions). Otherwise, whether to call map on map stage
1135+
of shuffle is determined by the value.
10951136
10961137
Returns
10971138
-------
@@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
11381179
combine_size=combine_size or options.combine_size,
11391180
chunk_store_limit=options.chunk_store_limit,
11401181
use_inf_as_na=use_inf_as_na,
1182+
map_on_shuffle=map_on_shuffle,
11411183
)
11421184
return agg_op(groupby)

mars/dataframe/groupby/tests/test_groupby.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -476,24 +476,3 @@ def test_groupby_fill():
476476
assert len(r.chunks) == 4
477477
assert r.shape == (len(s1),)
478478
assert r.chunks[0].shape == (np.nan,)
479-
480-
481-
def test_groupby_nunique():
482-
df1 = pd.DataFrame(
483-
[
484-
[1, 1, 10],
485-
[1, 1, np.nan],
486-
[1, 1, np.nan],
487-
[1, 2, np.nan],
488-
[1, 2, 20],
489-
[1, 2, np.nan],
490-
[1, 3, np.nan],
491-
[1, 3, np.nan],
492-
],
493-
columns=["one", "two", "three"],
494-
)
495-
mdf = md.DataFrame(df1, chunk_size=3)
496-
497-
r = tile(mdf.groupby(["one", "two"]).nunique())
498-
assert len(r.chunks) == 1
499-
assert isinstance(r.chunks[0].op, DataFrameGroupByAgg)

mars/dataframe/reduction/nunique.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from ...config import options
2626
from ...serialization.serializables import BoolField
2727
from ...utils import lazy_import
28-
from ..arrays import ArrowListArray, ArrowListDtype
28+
from ..arrays import ArrowListArray
2929
from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction
3030

3131
cp = lazy_import("cupy", globals=globals(), rename="cp")
@@ -58,6 +58,8 @@ def _drop_duplicates(self, value, explode=False, agg=False):
5858
value = value.values
5959

6060
if explode:
61+
if len(value) == 0:
62+
return [xp.array([], dtype=object)]
6163
value = xp.concatenate(value)
6264

6365
value = xdf.unique(value)
@@ -79,7 +81,8 @@ def _drop_duplicates(self, value, explode=False, agg=False):
7981
def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
8082
xp, xdf = self._get_modules()
8183
if isinstance(in_data, xdf.Series):
82-
unique_values = self._drop_duplicates(in_data)
84+
# unique_values = self._drop_duplicates(in_data)
85+
unique_values = [in_data.values]
8386
return xdf.Series(unique_values, name=in_data.name, dtype=object)
8487
else:
8588
if self._axis == 0:

0 commit comments

Comments
 (0)