Skip to content

Commit

Permalink
Monotonic Intervals (#284)
Browse files Browse the repository at this point in the history
* first version

* changed preprocessing structure

* flake8 stuff

* mk_average edits

* minor bug

* allmost green

* it is now painted black

* minor docstring edit
  • Loading branch information
koaning authored Feb 12, 2020
1 parent c14bc20 commit 6c8ffdd
Show file tree
Hide file tree
Showing 13 changed files with 1,538 additions and 931 deletions.
136 changes: 110 additions & 26 deletions doc/preprocessing.ipynb

Large diffs are not rendered by default.

403 changes: 403 additions & 0 deletions monotonic.ipynb

Large diffs are not rendered by default.

904 changes: 0 additions & 904 deletions sklego/preprocessing.py

This file was deleted.

7 changes: 7 additions & 0 deletions sklego/preprocessing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .intervalencoder import IntervalEncoder
from .randomadder import RandomAdder
from .patsytransformer import PatsyTransformer
from .pandastransformers import ColumnSelector, PandasTypeSelector, ColumnDropper
from .projections import InformationFilter, OrthogonalTransformer
from .repeatingbasis import RepeatingBasisFunction
from .columncapper import ColumnCapper
205 changes: 205 additions & 0 deletions sklego/preprocessing/columncapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import check_array
from sklearn.utils.validation import FLOAT_DTYPES, check_is_fitted


class ColumnCapper(TransformerMixin, BaseEstimator):
"""
Caps the values of columns according to the given quantile thresholds.
:type quantile_range: tuple or list, optional, default=(5.0, 95.0)
:param quantile_range: The quantile ranges to perform the capping. Their valus must
be in the interval [0; 100].
:type interpolation: str, optional, default='linear'
:param interpolation: The interpolation method to compute the quantiles when the
desired quantile lies between two data points `i` and `j`. The Available values
are:
* ``'linear'``: `i + (j - i) * fraction`, where `fraction` is the fractional part of\
the index surrounded by `i` and `j`.
* ``'lower'``: `i`.
* ``'higher'``: `j`.
* ``'nearest'``: `i` or `j` whichever is nearest.
* ``'midpoint'``: (`i` + `j`) / 2.
:type discard_infs: bool, optional, default=False
:param discard_infs: Whether to discard ``-np.inf`` and ``np.inf`` values or not. If
``False``, such values will be capped. If ``True``, they will be replaced by
``np.nan``.
.. note::
Setting ``discard_infs=True`` is important if the `inf` values are results
of divisions by 0, which are interpreted by ``pandas`` as ``-np.inf`` or
``np.inf`` depending on the signal of the numerator.
:type copy: bool, optional, default=True
:param copy: If False, try to avoid a copy and do inplace capping instead. This is not
guaranteed to always work inplace; e.g. if the data is not a NumPy array or scipy.sparse
CSR matrix, a copy may still be returned.
:raises:
``TypeError``, ``ValueError``
:Example:
>>> import pandas as pd
>>> import numpy as np
>>> from sklego.preprocessing import ColumnCapper
>>> df = pd.DataFrame({'a':[2, 4.5, 7, 9], 'b':[11, 12, np.inf, 14]})
>>> df
a b
0 2.0 11.0
1 4.5 12.0
2 7.0 inf
3 9.0 14.0
>>> capper = ColumnCapper()
>>> capper.fit_transform(df)
array([[ 2.375, 11.1 ],
[ 4.5 , 12. ],
[ 7. , 13.8 ],
[ 8.7 , 13.8 ]])
>>> capper = ColumnCapper(discard_infs=True) # Discarding infs
>>> df[['a', 'b']] = capper.fit_transform(df)
>>> df
a b
0 2.375 11.1
1 4.500 12.0
2 7.000 NaN
3 8.700 13.8
"""

def __init__(
self,
quantile_range=(5.0, 95.0),
interpolation="linear",
discard_infs=False,
copy=True,
):

self._check_quantile_range(quantile_range)
self._check_interpolation(interpolation)

self.quantile_range = quantile_range
self.interpolation = interpolation
self.discard_infs = discard_infs
self.copy = copy

def fit(self, X, y=None):
"""
Computes the quantiles for each column of ``X``.
:type X: pandas.DataFrame or numpy.ndarray
:param X: The column(s) from which the capping limit(s) will be computed.
:param y: Ignored.
:rtype: sklego.preprocessing.ColumnCapper
:returns: The fitted object.
:raises:
``ValueError`` if ``X`` contains non-numeric columns
"""
X = check_array(
X, copy=True, force_all_finite=False, dtype=FLOAT_DTYPES, estimator=self
)

# If X contains infs, we need to replace them by nans before computing quantiles
np.putmask(X, (X == np.inf) | (X == -np.inf), np.nan)

# There should be no column containing only nan cells at this point. If that's not the case,
# it means that the user asked ColumnCapper to fit some column containing only nan or inf cells.
nans_mask = np.isnan(X)
invalid_columns_mask = (
nans_mask.sum(axis=0) == X.shape[0]
) # Contains as many nans as rows
if invalid_columns_mask.any():
raise ValueError(
"ColumnCapper cannot fit columns containing only inf/nan values"
)

q = [quantile_limit / 100 for quantile_limit in self.quantile_range]
self.quantiles_ = np.nanquantile(
a=X, q=q, axis=0, overwrite_input=True, interpolation=self.interpolation
)

# Saving the number of columns to ensure coherence between fit and transform inputs
self.n_columns_ = X.shape[1]

return self

def transform(self, X):
"""
Performs the capping on the column(s) of ``X``.
:type X: pandas.DataFrame or numpy.ndarray
:param X: The column(s) for which the capping limit(s) will be applied.
:rtype: numpy.ndarray
:returns: ``X`` values with capped limits.
:raises:
``ValueError`` if the number of columns from ``X`` differs from the
number of columns when fitting
"""
check_is_fitted(self, "quantiles_")
X = check_array(
X,
copy=self.copy,
force_all_finite=False,
dtype=FLOAT_DTYPES,
estimator=self,
)

if X.shape[1] != self.n_columns_:
raise ValueError(
"X must have the same number of columns in fit and transform"
)

if self.discard_infs:
np.putmask(X, (X == np.inf) | (X == -np.inf), np.nan)

# Actually capping
X = np.minimum(X, self.quantiles_[1, :])
X = np.maximum(X, self.quantiles_[0, :])

return X

@staticmethod
def _check_quantile_range(quantile_range):
"""
Checks for the validity of quantile_range.
"""
if not isinstance(quantile_range, tuple) and not isinstance(
quantile_range, list
):
raise TypeError("quantile_range must be a tuple or a list")
if len(quantile_range) != 2:
raise ValueError(
"quantile_range must contain 2 elements: min_quantile and max_quantile"
)

min_quantile, max_quantile = quantile_range

for quantile in min_quantile, max_quantile:
if not isinstance(quantile, float) and not isinstance(quantile, int):
raise TypeError("min_quantile and max_quantile must be numbers")
if quantile < 0 or 100 < quantile:
raise ValueError("min_quantile and max_quantile must be in [0; 100]")

if min_quantile > max_quantile:
raise ValueError("min_quantile must be less than or equal to max_quantile")

@staticmethod
def _check_interpolation(interpolation):
"""
Checks for the validity of interpolation
"""
allowed_interpolations = ("linear", "lower", "higher", "midpoint", "nearest")
if interpolation not in allowed_interpolations:
raise ValueError(
"Available interpolation methods: {}".format(
", ".join(allowed_interpolations)
)
)
146 changes: 146 additions & 0 deletions sklego/preprocessing/intervalencoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import numpy as np
import cvxpy as cp
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import check_is_fitted


def _mk_monotonic_average(xs, ys, intervals, method="increasing", **kwargs):
"""
Creates smoothed averages of `ys` at the intervals given by `intervals`.
:param xs: all the datapoints of a feature (represents the x-axis)
:param ys: all the datapoints what we'd like to predict (represents the y-axis)
:param intervals: the intervals at which we'd like to get a good average value
:param method: the method that is used for smoothing, can be either `increasing` or `decreasing`.
:return:
An array as long as `intervals` that represents the average `y`-values at those intervals,
keeping the constraint in mind.
"""
x_internal = np.array([xs >= i for i in intervals]).T.astype(np.float)
betas = cp.Variable(x_internal.shape[1])
objective = cp.Minimize(cp.sum_squares(x_internal * betas - ys))
if method == "increasing":
constraints = [betas[i + 1] >= 0 for i in range(betas.shape[0] - 1)]
elif method == "decreasing":
constraints = [betas[i + 1] <= 0 for i in range(betas.shape[0] - 1)]
else:
raise ValueError(
f"method must be either `increasing` or `decreasing`, got: {method}"
)
prob = cp.Problem(objective, constraints)
prob.solve()
return betas.value.cumsum()


def _mk_average(xs, ys, intervals, method="average", span=1, **kwargs):
"""
Creates smoothed averages of `ys` at the intervals given by `intervals`.
:param xs: all the datapoints of a feature (represents the x-axis)
:param ys: all the datapoints what we'd like to predict (represents the y-axis)
:param intervals: the intervals at which we'd like to get a good average value
:param method: the method that is used for smoothing, can be either `average` or `normal`.
:param span: if the method is `average` then this is the span around the interval
that is used to determine the average `y`-value, if the method is `normal` the span
becomes the value of sigma that is used for weighted averaging
:return:
An array as long as `intervals` that represents the average `y`-values at those intervals.
"""
results = np.zeros(intervals.shape)
for idx, interval in enumerate(intervals):
if method == "average":
distances = 1 / (0.01 + np.abs(xs - interval))
predicate = (xs < (interval + span)) | (xs < (interval - span))
elif method == "normal":
distances = np.exp(-((xs - interval) ** 2) / span)
predicate = xs == xs
else:
raise ValueError("method needs to be either `average` or `normal`")
subset = ys[predicate]
dist_subset = distances[predicate]
results[idx] = np.average(subset, weights=dist_subset)
return results


class IntervalEncoder(TransformerMixin, BaseEstimator):
"""
The interval encoder bends features in `X` with regards to`y`.
We take each column in X separately and smooth it towards `y` using
the strategy that is defined in `method`.
Note that this allows us to make certain features strictly monotonic
in your machine learning model if you follow this with an appropriate
model.
:param n_chunks: the number of cuts that makes the interval
:param method: the interpolation method used, must be in
["average", "normal", "increasing", "decreasing"], default: "normal"
:param span: a hyperparameter for the interpolation method, if the
method is `normal` it resembles the width of the radial basis
function used to weigh the points. It is ignored if if the method is
"increasing" or "decreasing".
"""

def __init__(self, n_chunks=10, span=1, method="normal"):
self.span = span
self.method = method
self.n_chunks = n_chunks

def fit(self, X, y):
"""Fits the estimator"""
allowed_methods = ["average", "normal", "increasing", "decreasing"]
if self.method not in allowed_methods:
raise ValueError(
f"`method` must be in {allowed_methods}, got `{self.method}`"
)
if self.n_chunks <= 0:
raise ValueError(f"`n_chunks` must be >= 1, received {self.n_chunks}")
if self.span > 1.0:
raise ValueError(
f"Error, we expect 0 <= span <= 1, received span={self.span}"
)
if self.span < 0.0:
raise ValueError(
f"Error, we expect 0 <= span <= 1, received span={self.span}"
)

# these two matrices will have shape (columns, quantiles)
# quantiles indicate where the interval split occurs
X, y = check_X_y(X, y, estimator=self)
self.quantiles_ = np.zeros((X.shape[1], self.n_chunks))
# heights indicate what heights these intervals will have
self.heights_ = np.zeros((X.shape[1], self.n_chunks))
self.num_cols_ = X.shape[1]

average_func = (
_mk_average
if self.method in ["average", "normal"]
else _mk_monotonic_average
)

for col in range(X.shape[1]):
self.quantiles_[col, :] = np.quantile(
X[:, col], q=np.linspace(0, 1, self.n_chunks)
)
self.heights_[col, :] = average_func(
X[:, col],
y,
self.quantiles_[col, :],
span=self.span,
method=self.method,
)
return self

def transform(self, X):
"""
Transform each column such that it is bends smoothly towards y.
"""
check_is_fitted(self, ["quantiles_", "heights_", "num_cols_"])
X = check_array(X, estimator=self)
if X.shape[1] != self.num_cols_:
raise ValueError(
f"fitted on {self.num_cols_} features but received {X.shape[1]}"
)
transformed = np.zeros(X.shape)
for col in range(transformed.shape[1]):
transformed[:, col] = np.interp(
X[:, col], self.quantiles_[col, :], self.heights_[col, :]
)
return transformed
Loading

0 comments on commit 6c8ffdd

Please sign in to comment.