From a772b81080bfa367c8625da43761ca3462bdb42b Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Thu, 27 Jun 2024 19:20:47 +0300 Subject: [PATCH 1/5] remove drop_duplicates step from DC --- .../domain_classifier/calculator.py | 55 +++++++++++-------- tests/drift/test_multiv_dc.py | 23 ++++++++ 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/nannyml/drift/multivariate/domain_classifier/calculator.py b/nannyml/drift/multivariate/domain_classifier/calculator.py index 39dbbc01..657c74f5 100644 --- a/nannyml/drift/multivariate/domain_classifier/calculator.py +++ b/nannyml/drift/multivariate/domain_classifier/calculator.py @@ -15,7 +15,7 @@ """ import warnings -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd @@ -27,7 +27,7 @@ from sklearn.preprocessing import OrdinalEncoder from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type -from nannyml.chunk import Chunker +from nannyml.chunk import Chunker, Chunk from nannyml.drift.multivariate.domain_classifier.result import Result from nannyml.exceptions import InvalidArgumentsException @@ -200,6 +200,7 @@ def __init__( # # sampling error # self._sampling_error_components: Tuple = () self.result: Optional[Result] = None + self._am_fitted: bool = False @log_usage(UsageEvent.DC_CALC_FIT) def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): @@ -225,11 +226,24 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): if column_name not in self.categorical_column_names: self.categorical_column_names.append(column_name) - self._reference_X = reference_data[self.feature_column_names] + # get timestamp column from chunker incase the calculator is initialized with a chunker without directly + # been provided the timestamp column name + if self.chunker.timestamp_column_name: + if self.chunker.timestamp_column_name not in list(reference_data.columns): + raise InvalidArgumentsException( + f"timestamp column '{self.chunker.timestamp_column_name}' not in columns: {list(reference_data.columns)}." # noqa: E501 + ) + self._reference_X = reference_data.sort_values( + by=[self.chunker.timestamp_column_name] + ).reset_index(drop=True)[self.feature_column_names] + else: + self._reference_X = reference_data[self.feature_column_names] self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' + self._am_fitted = True + return self @log_usage(UsageEvent.DC_CALC_RUN) @@ -252,7 +266,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: 'end_date': chunk.end_datetime, 'period': 'analysis', # 'sampling_error': sampling_error(self._sampling_error_components, chunk.data), - 'classifier_auroc_value': self._calculate_chunk(data=chunk.data), + 'classifier_auroc_value': self._calculate_chunk(chunk=chunk), } for chunk in chunks ] @@ -262,7 +276,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: res.columns = multilevel_index res = res.reset_index(drop=True) - if self.result is None: + if not self._am_fitted: self._set_metric_thresholds(res) res = self._populate_alert_thresholds(res) self.result = Result( @@ -278,16 +292,20 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: self.result.data = pd.concat([self.result.data, res], ignore_index=True) return self.result - def _calculate_chunk(self, data: pd.DataFrame): - - chunk_X = data[self.feature_column_names] - reference_X = self._reference_X - chunk_y = np.ones(len(chunk_X)) - reference_y = np.zeros(len(reference_X)) - X = pd.concat([reference_X, chunk_X], ignore_index=True) - y = np.concatenate([reference_y, chunk_y]) + def _calculate_chunk(self, chunk: Chunk): - X, y = drop_matching_duplicate_rows(X, y, self.feature_column_names) + if self._am_fitted: + chunk_X = chunk.data[self.feature_column_names] + reference_X = self._reference_X + chunk_y = np.ones(len(chunk_X)) + reference_y = np.zeros(len(reference_X)) + X = pd.concat([reference_X, chunk_X], ignore_index=True) + y = np.concatenate([reference_y, chunk_y]) + else: + # Use information from chunk indices to identify reference chunk's location + X = self._reference_X + y = np.zeros(len(X)) + y[chunk.start_index : chunk.end_index + 1] = 1 df_X_transformed = preprocess_categorical_features( X, self.continuous_column_names, self.categorical_column_names @@ -366,15 +384,6 @@ def tune_hyperparams(self, X: pd.DataFrame, y: np.ndarray): self.hyperparameters = {**automl.model.estimator.get_params()} -def drop_matching_duplicate_rows(X: pd.DataFrame, y: np.ndarray, subset: List[str]) -> Tuple[pd.DataFrame, np.ndarray]: - X['__target__'] = y - X = X.drop_duplicates(subset=subset, keep='last').reset_index(drop=True) - y = X['__target__'] - X.drop('__target__', axis=1, inplace=True) - - return X, y - - def preprocess_categorical_features( X: pd.DataFrame, continuous_column_names: List[str], categorical_column_names: List[str] ) -> pd.DataFrame: diff --git a/tests/drift/test_multiv_dc.py b/tests/drift/test_multiv_dc.py index 42bb0e57..70e948fb 100644 --- a/tests/drift/test_multiv_dc.py +++ b/tests/drift/test_multiv_dc.py @@ -49,3 +49,26 @@ def test_default_cdd_run(binary_classification_data): 0.9136, ] assert list(results.to_df().loc[:, ("domain_classifier_auroc", "alert")]) == [False, False, False, True, True] + + +def test_cdd_run_w_timestamp(binary_classification_data): + """Test a default run of DC.""" + ( + reference, + analysis, + ) = binary_classification_data + calc = DomainClassifierCalculator( + feature_column_names=column_names1, + chunk_size=5_000, + timestamp_column_name='timestamp' + ) + calc.fit(reference) + results = calc.calculate(analysis) + assert list(results.to_df().loc[:, ("domain_classifier_auroc", "value")].round(4)) == [ + 0.5020, + 0.5002, + 0.5174, + 0.9108, + 0.9136, + ] + assert list(results.to_df().loc[:, ("domain_classifier_auroc", "alert")]) == [False, False, False, True, True] From 53b6bd2d4dcac8ecf896f8bae51c78b72e9765d8 Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Thu, 27 Jun 2024 19:39:31 +0300 Subject: [PATCH 2/5] linting issues --- nannyml/drift/multivariate/domain_classifier/calculator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nannyml/drift/multivariate/domain_classifier/calculator.py b/nannyml/drift/multivariate/domain_classifier/calculator.py index 657c74f5..34fcac46 100644 --- a/nannyml/drift/multivariate/domain_classifier/calculator.py +++ b/nannyml/drift/multivariate/domain_classifier/calculator.py @@ -288,7 +288,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: ) else: res = self._populate_alert_thresholds(res) - self.result = self.result.filter(period='reference') + self.result = self.result.filter(period='reference') # type: ignore self.result.data = pd.concat([self.result.data, res], ignore_index=True) return self.result @@ -369,6 +369,7 @@ def _populate_alert_thresholds(self, result_data: pd.DataFrame) -> pd.DataFrame: return result_data def tune_hyperparams(self, X: pd.DataFrame, y: np.ndarray): + """Train an LGBM model while also performing hyperparameter tuning.""" with warnings.catch_warnings(): # Ingore lightgbm's UserWarning: Using categorical_feature in Dataset. # We explicitly use that feature, don't spam the user @@ -387,6 +388,7 @@ def tune_hyperparams(self, X: pd.DataFrame, y: np.ndarray): def preprocess_categorical_features( X: pd.DataFrame, continuous_column_names: List[str], categorical_column_names: List[str] ) -> pd.DataFrame: + """Preprodess categorical features.""" X_cont = X[continuous_column_names] enc = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1) From 9543e017d0682943f2cf04b5dbb5b0bb77739937 Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Fri, 28 Jun 2024 13:01:23 +0300 Subject: [PATCH 3/5] make extra dc test better --- tests/drift/test_multiv_dc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/drift/test_multiv_dc.py b/tests/drift/test_multiv_dc.py index 70e948fb..b9c59084 100644 --- a/tests/drift/test_multiv_dc.py +++ b/tests/drift/test_multiv_dc.py @@ -62,7 +62,7 @@ def test_cdd_run_w_timestamp(binary_classification_data): chunk_size=5_000, timestamp_column_name='timestamp' ) - calc.fit(reference) + calc.fit(reference.sample(frac=1).reset_index(drop=True)) results = calc.calculate(analysis) assert list(results.to_df().loc[:, ("domain_classifier_auroc", "value")].round(4)) == [ 0.5020, From a2520f95738de6cd7c1fbd4e8906c505c484f5ab Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Mon, 1 Jul 2024 10:41:00 +0100 Subject: [PATCH 4/5] Rename `_am_fitted` to `_is_fitted` --- .../drift/multivariate/domain_classifier/calculator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nannyml/drift/multivariate/domain_classifier/calculator.py b/nannyml/drift/multivariate/domain_classifier/calculator.py index 34fcac46..b4470eba 100644 --- a/nannyml/drift/multivariate/domain_classifier/calculator.py +++ b/nannyml/drift/multivariate/domain_classifier/calculator.py @@ -200,7 +200,7 @@ def __init__( # # sampling error # self._sampling_error_components: Tuple = () self.result: Optional[Result] = None - self._am_fitted: bool = False + self._is_fitted: bool = False @log_usage(UsageEvent.DC_CALC_FIT) def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): @@ -242,7 +242,7 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' - self._am_fitted = True + self._is_fitted = True return self @@ -276,7 +276,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: res.columns = multilevel_index res = res.reset_index(drop=True) - if not self._am_fitted: + if not self._is_fitted: self._set_metric_thresholds(res) res = self._populate_alert_thresholds(res) self.result = Result( @@ -294,7 +294,7 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: def _calculate_chunk(self, chunk: Chunk): - if self._am_fitted: + if self._is_fitted: chunk_X = chunk.data[self.feature_column_names] reference_X = self._reference_X chunk_y = np.ones(len(chunk_X)) From 3aaed3de1e90311a8066efb9a2d07d3999640d4a Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Mon, 1 Jul 2024 10:47:44 +0100 Subject: [PATCH 5/5] Added some comments on the workings --- .../multivariate/domain_classifier/calculator.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/nannyml/drift/multivariate/domain_classifier/calculator.py b/nannyml/drift/multivariate/domain_classifier/calculator.py index b4470eba..dbd04ebb 100644 --- a/nannyml/drift/multivariate/domain_classifier/calculator.py +++ b/nannyml/drift/multivariate/domain_classifier/calculator.py @@ -226,8 +226,12 @@ def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): if column_name not in self.categorical_column_names: self.categorical_column_names.append(column_name) - # get timestamp column from chunker incase the calculator is initialized with a chunker without directly - # been provided the timestamp column name + # Get timestamp column from chunker incase the calculator is initialized with a chunker without directly + # been provided the timestamp column name. + # + # The reference data will be sorted according to the timestamp column (when available) to mimic + # Chunker behavior. This means the reference data will be "aligned" with chunked reference data. + # This way we can use chunk indices on the internal reference data copy. if self.chunker.timestamp_column_name: if self.chunker.timestamp_column_name not in list(reference_data.columns): raise InvalidArgumentsException( @@ -293,7 +297,6 @@ def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: return self.result def _calculate_chunk(self, chunk: Chunk): - if self._is_fitted: chunk_X = chunk.data[self.feature_column_names] reference_X = self._reference_X @@ -302,7 +305,10 @@ def _calculate_chunk(self, chunk: Chunk): X = pd.concat([reference_X, chunk_X], ignore_index=True) y = np.concatenate([reference_y, chunk_y]) else: - # Use information from chunk indices to identify reference chunk's location + # Use information from chunk indices to identify reference chunk's location. This is possible because + # both the internal reference data copy and the chunk data were sorted by timestamp, so these + # indices align. This way we eliminate the need to combine these two data frames and drop duplicate rows, + # which is a costly operation. X = self._reference_X y = np.zeros(len(X)) y[chunk.start_index : chunk.end_index + 1] = 1