Skip to content

Commit f833a7b

Browse files
committed
Use numpy methods to accelerate
1 parent 2c6e530 commit f833a7b

File tree

3 files changed

+57
-33
lines changed

3 files changed

+57
-33
lines changed

mars/dataframe/groupby/aggregation.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
ReductionCompiler,
5252
ReductionSteps,
5353
ReductionAggStep,
54+
CustomReduction,
5455
)
5556
from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs
5657
from ..utils import parse_index, build_concatenated_rows_frame, is_cudf
@@ -695,7 +696,7 @@ def _pack_inputs(agg_funcs: List[ReductionAggStep], in_data):
695696
return out_dict
696697

697698
@staticmethod
698-
def _do_custom_agg_single(op, custom_reduction, input_obj):
699+
def _do_custom_agg_single(op, custom_reduction: CustomReduction, input_obj):
699700
if op.stage == OperandStage.map:
700701
if custom_reduction.pre_with_agg:
701702
apply_fun = custom_reduction.pre
@@ -705,9 +706,12 @@ def apply_fun(obj):
705706
return custom_reduction.agg(custom_reduction.pre(obj))
706707

707708
elif op.stage == OperandStage.agg:
709+
if custom_reduction.post_with_agg:
710+
apply_fun = custom_reduction.post
711+
else:
708712

709-
def apply_fun(obj):
710-
return custom_reduction.post(custom_reduction.agg(obj))
713+
def apply_fun(obj):
714+
return custom_reduction.post(custom_reduction.agg(obj))
711715

712716
else:
713717
apply_fun = custom_reduction.agg
@@ -716,7 +720,7 @@ def apply_fun(obj):
716720
return (res,)
717721

718722
@staticmethod
719-
def _do_custom_agg_multiple(op, custom_reduction, *input_objs):
723+
def _do_custom_agg_multiple(op, custom_reduction: CustomReduction, *input_objs):
720724
xdf = cudf if op.gpu else pd
721725
results = []
722726
out = op.outputs[0]

mars/dataframe/reduction/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,8 @@ class CustomReduction:
655655

656656
# set to True when pre() already performs aggregation
657657
pre_with_agg = False
658+
# set to True when post() already performs aggregation
659+
post_with_agg = False
658660

659661
def __init__(self, name=None, is_gpu=None):
660662
self.name = name or "<custom>"

mars/dataframe/reduction/nunique.py

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import numpy as np
1516
import pandas as pd
1617

1718
try:
@@ -27,82 +28,99 @@
2728
from ..arrays import ArrowListArray, ArrowListDtype
2829
from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction
2930

31+
cp = lazy_import("cupy", globals=globals(), rename="cp")
3032
cudf = lazy_import("cudf", globals=globals())
3133

3234

3335
class NuniqueReduction(CustomReduction):
3436
pre_with_agg = True
37+
post_with_agg = True
3538

3639
def __init__(
37-
self, name="unique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False
40+
self, name="nunique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False
3841
):
3942
super().__init__(name, is_gpu=is_gpu)
4043
self._axis = axis
4144
self._dropna = dropna
4245
self._use_arrow_dtype = use_arrow_dtype
4346

44-
def _drop_duplicates(self, xdf, value, explode=False):
47+
def _get_modules(self):
48+
if not self.is_gpu():
49+
return np, pd
50+
else: # pragma: no cover
51+
return cp, cudf
52+
53+
def _drop_duplicates(self, value, explode=False, agg=False):
54+
xp, xdf = self._get_modules()
55+
if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"):
56+
value = value.to_numpy()
57+
else:
58+
value = value.values
59+
4560
if explode:
46-
value = value.explode()
61+
value = xp.concatenate(value)
4762

48-
if not self._use_arrow_dtype or xdf is cudf:
49-
return [value.drop_duplicates().to_numpy()]
63+
value = xdf.unique(value)
64+
65+
if not agg:
66+
if not self._use_arrow_dtype or xp is cp:
67+
return [value]
68+
else:
69+
try:
70+
return ArrowListArray([value])
71+
except pa.ArrowInvalid:
72+
# fallback due to diverse dtypes
73+
return [value]
5074
else:
51-
try:
52-
return ArrowListArray([value.drop_duplicates().to_numpy()])
53-
except pa.ArrowInvalid:
54-
# fallback due to diverse dtypes
55-
return [value.drop_duplicates().to_numpy()]
75+
if self._dropna:
76+
return xp.sum(xdf.notna(value))
77+
return len(value)
5678

5779
def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
58-
xdf = cudf if self.is_gpu() else pd
80+
xp, xdf = self._get_modules()
5981
if isinstance(in_data, xdf.Series):
60-
unique_values = self._drop_duplicates(xdf, in_data)
61-
return xdf.Series(unique_values, name=in_data.name)
82+
unique_values = self._drop_duplicates(in_data)
83+
return xdf.Series(unique_values, name=in_data.name, dtype=object)
6284
else:
6385
if self._axis == 0:
6486
data = dict()
6587
for d, v in in_data.iteritems():
66-
data[d] = self._drop_duplicates(xdf, v)
67-
df = xdf.DataFrame(data)
88+
data[d] = self._drop_duplicates(v)
89+
df = xdf.DataFrame(data, copy=False, dtype=object)
6890
else:
6991
df = xdf.DataFrame(columns=[0])
7092
for d, v in in_data.iterrows():
71-
df.loc[d] = self._drop_duplicates(xdf, v)
93+
df.loc[d] = self._drop_duplicates(v)
7294
return df
7395

7496
def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
75-
xdf = cudf if self.is_gpu() else pd
97+
xp, xdf = self._get_modules()
7698
if isinstance(in_data, xdf.Series):
77-
unique_values = self._drop_duplicates(xdf, in_data, explode=True)
78-
return xdf.Series(unique_values, name=in_data.name)
99+
unique_values = self._drop_duplicates(in_data, explode=True)
100+
return xdf.Series(unique_values, name=in_data.name, dtype=object)
79101
else:
80102
if self._axis == 0:
81103
data = dict()
82104
for d, v in in_data.iteritems():
83-
if self._use_arrow_dtype and xdf is not cudf:
84-
v = pd.Series(v.to_numpy())
85-
data[d] = self._drop_duplicates(xdf, v, explode=True)
86-
df = xdf.DataFrame(data)
105+
data[d] = self._drop_duplicates(v, explode=True)
106+
df = xdf.DataFrame(data, copy=False, dtype=object)
87107
else:
88108
df = xdf.DataFrame(columns=[0])
89109
for d, v in in_data.iterrows():
90-
df.loc[d] = self._drop_duplicates(xdf, v, explode=True)
110+
df.loc[d] = self._drop_duplicates(v, explode=True)
91111
return df
92112

93113
def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
94-
xdf = cudf if self.is_gpu() else pd
114+
xp, xdf = self._get_modules()
95115
if isinstance(in_data, xdf.Series):
96-
return in_data.explode().nunique(dropna=self._dropna)
116+
return self._drop_duplicates(in_data, explode=True, agg=True)
97117
else:
98118
in_data_iter = (
99119
in_data.iteritems() if self._axis == 0 else in_data.iterrows()
100120
)
101121
data = dict()
102122
for d, v in in_data_iter:
103-
if isinstance(v.dtype, ArrowListDtype):
104-
v = xdf.Series(v.to_numpy())
105-
data[d] = v.explode().nunique(dropna=self._dropna)
123+
data[d] = self._drop_duplicates(v, explode=True, agg=True)
106124
return xdf.Series(data)
107125

108126

0 commit comments

Comments
 (0)