Skip to content

Commit

Permalink
update wasserstein and linting
Browse files Browse the repository at this point in the history
  • Loading branch information
nikml committed May 30, 2024
1 parent b8b237f commit b5a7009
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 45 deletions.
4 changes: 1 addition & 3 deletions nannyml/drift/univariate/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
chunker : Chunker
The `Chunker` used to split the data sets into a lists of chunks.
thresholds: dict
Defaults to::
{
Expand All @@ -136,8 +135,7 @@ def __init__(
The `chi2` method does not support custom thresholds for now. Additional research is required to determine
how to transition from its current p-value based implementation.
computation_params : dict
computation_params: dict
Defaults to::
{
Expand Down
92 changes: 54 additions & 38 deletions nannyml/drift/univariate/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# License: Apache Software License 2.0

""" This module contains the different drift detection method implementations.
"""This module contains the different drift detection method implementations.
The :class:`~nannyml.drift.univariate.methods.MethodFactory` will convert the drift detection method names
into an instance of the base :class:`~nannyml.drift.univariate.methods.Method` class.
Expand Down Expand Up @@ -62,10 +62,8 @@ def __init__(
computation_params : dict, default=None
A dictionary specifying parameter names and values to be used in the computation of the
drift method.
upper_threshold : float, default=None
An optional upper threshold for the data quality metric.
lower_threshold : float, default=None
An optional lower threshold for the data quality metric.
threshold : Threshold
Threshold class defining threshold strategy.
upper_threshold_limit : float, default=None
An optional upper threshold limit for the data quality metric.
lower_threshold_limit : float, default=0
Expand Down Expand Up @@ -257,6 +255,7 @@ class JensenShannonDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Jensen-Shannon method."""
super().__init__(
display_name='Jensen-Shannon distance',
column_name='jensen_shannon',
Expand Down Expand Up @@ -339,6 +338,7 @@ class KolmogorovSmirnovStatistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Kolmogorov-Smirnov method."""
super().__init__(
display_name='Kolmogorov-Smirnov statistic',
column_name='kolmogorov_smirnov',
Expand Down Expand Up @@ -405,7 +405,7 @@ def _calculate(self, data: pd.Series):
chunk_rel_freqs = chunk_proba_in_qts / len(data)
rel_freq_lower_than_edges = len(data[data < self._qts[0]]) / len(data)
chunk_rel_freqs = rel_freq_lower_than_edges + np.cumsum(chunk_rel_freqs)
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs))
stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) # type: ignore
else:
stat, _ = ks_2samp(self._reference_data, data)

Expand All @@ -420,6 +420,7 @@ class Chi2Statistic(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Chi2-contingency method."""
super().__init__(
display_name='Chi2 statistic',
column_name='chi2',
Expand All @@ -444,6 +445,16 @@ def __init__(self, **kwargs) -> None:
self._fitted = False

def fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self:
"""Fits Chi2 Method on reference data.
Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting a Method. Must have target data available.
timestamps: Optional[pd.Series], default=None
A series containing the reference data Timestamps
"""
super().fit(reference_data, timestamps)

# Thresholding is based on p-values. Ignoring all custom thresholding and disable plotting a threshold
Expand All @@ -470,6 +481,16 @@ def _calculate(self, data: pd.Series):
return stat

def alert(self, value: float):
"""Evaluates if an alert has occurred for Chi2 on the current chunk data.
For Chi2 alerts are based on p-values rather than the actual method values like
in all other Univariate drift methods.
Parameters
----------
value: float
The method value for a given chunk
"""
return self._p_value < 0.05

def _calc_chi2(self, data: pd.Series):
Expand All @@ -491,6 +512,7 @@ class LInfinityDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize L-Infinity Distance method."""
super().__init__(
display_name='L-Infinity distance',
column_name='l_infinity',
Expand Down Expand Up @@ -537,6 +559,7 @@ class WassersteinDistance(Method):
"""

def __init__(self, **kwargs) -> None:
"""Initialize Wasserstein Distance method."""
super().__init__(
display_name='Wasserstein distance',
column_name='wasserstein',
Expand Down Expand Up @@ -579,6 +602,9 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None
reference_proba_in_bins, self._bin_edges = np.histogram(reference_data, bins=self.n_bins)
self._ref_rel_freqs = reference_proba_in_bins / len(reference_data)
self._bin_width = self._bin_edges[1] - self._bin_edges[0]
self._ref_min = self._bin_edges[0]
self._ref_max = self._bin_edges[-1]
self._ref_cdf = np.cumsum(self._ref_rel_freqs)

self._fitted = True
self._reference_size = len(reference_data)
Expand All @@ -596,42 +622,31 @@ def _calculate(self, data: pd.Series):
if (
self.calculation_method == 'auto' and self._reference_size >= 10_000
) or self.calculation_method == 'estimated':
min_chunk = np.min(data)

if min_chunk < self._bin_edges[0]:
extra_bins_left = (min_chunk - self._bin_edges[0]) / self._bin_width
extra_bins_left = np.ceil(extra_bins_left)
data_histogram, _ = np.histogram(data, bins=self._bin_edges)
data_histogram = data_histogram / len(data)
data_smaller = data[data < self._ref_min]
data_bigger = data[data > self._ref_max]
sample_size = len(data)
del data

if len(data_smaller) > 0:
amount_smaller = len(data_smaller) / sample_size
term_smaller = wasserstein_distance(data_smaller, np.full(len(data_smaller), self._ref_min))
term_smaller = term_smaller * amount_smaller
else:
extra_bins_left = 0
term_smaller, amount_smaller = 0, 0

max_chunk = np.max(data)

if max_chunk > self._bin_edges[-1]:
extra_bins_right = (max_chunk - self._bin_edges[-1]) / self._bin_width
extra_bins_right = np.ceil(extra_bins_right)
if len(data_bigger) > 0:
amount_bigger = len(data_bigger) / sample_size
term_bigger = wasserstein_distance(data_bigger, np.full(len(data_bigger), self._ref_max))
term_bigger = term_bigger * amount_bigger
else:
extra_bins_right = 0

left_edges_to_prepand = np.arange(
min_chunk - self._bin_width, self._bin_edges[0] - self._bin_width, self._bin_width
)
right_edges_to_append = np.arange(
self._bin_edges[-1] + self._bin_width, max_chunk + self._bin_width, self._bin_width
)

updated_edges = np.concatenate([left_edges_to_prepand, self._bin_edges, right_edges_to_append])
updated_ref_binned_pdf = np.concatenate(
[np.zeros(len(left_edges_to_prepand)), self._ref_rel_freqs, np.zeros(len(right_edges_to_append))]
)

chunk_histogram, _ = np.histogram(data, bins=updated_edges)

chunk_binned_pdf = chunk_histogram / len(data)

ref_binned_cdf = np.cumsum(updated_ref_binned_pdf)
chunk_binned_cdf = np.cumsum(chunk_binned_pdf)
term_bigger, amount_bigger = 0, 0

distance = np.sum(np.abs(ref_binned_cdf - chunk_binned_cdf) * self._bin_width)
data_cdf = np.cumsum(data_histogram)
data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side
term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width)
distance = term_within + term_smaller + term_bigger
else:
distance = wasserstein_distance(self._reference_data, data)

Expand All @@ -644,6 +659,7 @@ class HellingerDistance(Method):
"""Calculates the Hellinger Distance between two distributions."""

def __init__(self, **kwargs) -> None:
"""Initialize Hellinger Distance method."""
super().__init__(
display_name='Hellinger distance',
column_name='hellinger',
Expand Down
11 changes: 7 additions & 4 deletions nannyml/drift/univariate/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def __init__(
analysis_data: pd.DataFrame = None,
reference_data: pd.DataFrame = None,
):
"""
"""Initialize resuts class.
Parameters
----------
results_data: pd.DataFrame
Expand Down Expand Up @@ -112,6 +113,7 @@ def __init__(

@property
def methods(self) -> List[Method]:
"""Methods used during calculation."""
return cast(List[Method], self.metrics)

def _filter(
Expand Down Expand Up @@ -167,9 +169,9 @@ def _get_result_property(self, property_name: str) -> List[pd.Series]:
return continuous_values + categorical_values

def keys(self) -> List[Key]:
"""
Creates a list of keys for continuos and categorial columns where each Key is a `namedtuple('Key',
'properties display_names')`
"""Creates a list of keys for continuos and categorial columns.
Each Key is a `namedtuple('Key', 'properties display_names')`
"""
continuous_keys = [
Key(properties=(column, method.column_name), display_names=(column, method.display_name))
Expand Down Expand Up @@ -204,6 +206,7 @@ def plot(
- 'distribution'
plots feature distribution per :class:`~nannyml.chunk.Chunk`.
Joyplot for continuous features, stacked bar charts for categorical features.
Returns
-------
fig: :class:`plotly.graph_objs._figure.Figure`
Expand Down

0 comments on commit b5a7009

Please sign in to comment.