"""Utility and validation functions
"""
from __future__ import print_function, division
import lightgbm as lgb
import matplotlib as mpl
import numpy as np
import pandas as pd
from pkg_resources import resource_filename
from matplotlib import pyplot as plt
from sklearn.impute import SimpleImputer
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.datasets import fetch_openml
from sklearn.datasets import load_breast_cancer
from sklearn.utils import Bunch
import joblib
qualitative_colors = [
"#7F3C8D",
"#11A579",
"#3969AC",
"#F2B701",
"#E73F74",
"#80BA5A",
"#E68310",
"#008695",
"#CF1C90",
"#F97B72",
]
#####################
# #
# Utilities #
# #
#####################
[docs]def concat_or_group(col, x, max_length=25):
"""
Concatenate unique values from a column or return a group value.
Parameters
----------
col : str
The name of the column to process.
x : pd.DataFrame
The DataFrame containing the data.
max_length : int, optional
The maximum length for concatenated strings, beyond which grouping is performed,
by default 40.
Returns
-------
str
A concatenated string of unique values if the length is less than `max_length`,
otherwise, a unique group value from the specified column.
Notes
-----
If the concatenated string length is greater than or equal to `max_length`, this
function returns the unique group value from the column with a "_g" suffix.
Examples
--------
>>> data = {
>>> 'Category_g': [1, 1, 2, 2, 3],
>>> 'Category': ['AAAAAAAAAAAAAAA', 'Bovoh', 'Ccccccccccccccc', 'D', 'E']}
>>> cat_bin_dict = {}
>>> col = 'Category'
>>> cat_bin_dict[col] = (
>>> X[[f"{col}_g", col]]
>>> .groupby(f"{col}_g")
>>> .apply(lambda x: concat_or_group(col, x))
>>> .to_dict()
>>> )
>>> print(cat_bin_dict)
>>> {'Category': {1: 'gr_1', 2: 'gr_2', 3: 'E'}}
"""
unique_values = x[col].unique()
concat_str = " / ".join(map(str, unique_values))
return (
concat_str
if len(concat_str) < max_length
else concat_str[:7] + "/.../" + concat_str[-7:]
)
[docs]def reset_plot():
"""Reset plot style"""
# plt.rcParams = plt.rcParamsDefault
mpl.rcParams.update(plt.rcParamsDefault)
[docs]def set_my_plt_style(height=3, width=5, linewidth=2):
"""This set the style of matplotlib to fivethirtyeight with some modifications (colours, axes)
Parameters
----------
linewidth: int, default=2
line width
height: int, default=3
fig height in inches (yeah they're still struggling with the metric system)
width: int, default=5
fig width in inches (yeah they're still struggling with the metric system)
"""
plt.style.use("fivethirtyeight")
my_colors_list = qualitative_colors
myorder = [2, 3, 4, 1, 0, 6, 5, 8, 9, 7]
my_colors_list = [my_colors_list[i] for i in myorder]
bckgnd_color = "#f5f5f5"
params = {
"figure.figsize": (width, height),
"axes.prop_cycle": plt.cycler(color=my_colors_list),
"axes.facecolor": bckgnd_color,
"patch.edgecolor": bckgnd_color,
"figure.facecolor": bckgnd_color,
"axes.edgecolor": bckgnd_color,
"savefig.edgecolor": bckgnd_color,
"savefig.facecolor": bckgnd_color,
"grid.color": "#9e9e9e",
"lines.linewidth": linewidth,
} # plt.cycler(color=my_colors_list)
mpl.rcParams.update(params)
[docs]def create_dtype_dict(df: pd.DataFrame, dic_keys: str = "col_names") -> dict:
"""Create a custom dictionary of data type for adding suffixes
to column names in the plotting utility for association matrix.
Parameters
----------
df : pd.DataFrame
The dataframe used for computing the association matrix.
dic_keys : str
Either "col_names" or "dtypes" for returning either a dictionary
with column names or dtypes as keys.
Returns
-------
dict
A dictionary with either column names or dtypes as keys.
Raises
------
ValueError
If `dic_keys` is not either "col_names" or "dtypes".
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("df should be a pandas DataFrame")
categorical_cols = df.select_dtypes(include=["object", "category", "bool"]).columns
time_cols = df.select_dtypes(
include=["datetime", "timedelta", "datetimetz"]
).columns
numerical_interval_cols = df.select_dtypes(
["Interval[float]", "Interval[int]"]
).columns
numerical_cols = df.select_dtypes(include=np.number).columns
remaining_cols = (
df.columns.difference(categorical_cols)
.difference(numerical_cols)
.difference(time_cols)
.difference(numerical_interval_cols)
)
if dic_keys == "col_names":
cat_dict = dict.fromkeys(categorical_cols, "cat")
num_dict = dict.fromkeys(numerical_cols, "num")
num_interval_dict = dict.fromkeys(numerical_interval_cols, "num_interval")
time_dict = dict.fromkeys(time_cols, "time")
remaining_dict = dict.fromkeys(remaining_cols, "unk")
return {
**cat_dict,
**num_dict,
**num_interval_dict,
**time_dict,
**remaining_dict,
}
if dic_keys == "dtypes":
return {
"cat": categorical_cols.tolist(),
"num": numerical_cols.tolist(),
"num_interval": numerical_interval_cols.tolist(),
"time": time_cols.tolist(),
"unk": remaining_cols.tolist(),
}
raise ValueError("dic_keys should be either 'col_names' or 'dtypes'")
[docs]def get_pandas_cat_codes(X):
"""
Converts categorical and time features in a pandas DataFrame into numerical codes.
Parameters
----------
X : pandas DataFrame
The input DataFrame containing categorical and/or time features.
Returns
-------
X : pandas DataFrame
The modified input DataFrame with categorical and time features replaced by numerical codes.
obj_feat : list or None
List of column names that were converted to numerical codes. Returns None if no categorical or time features found.
cat_idx : list or None
List of column indices for the columns in obj_feat. Returns None if no categorical or time features found.
"""
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
obj_feat = dtypes_dic["cat"] + dtypes_dic["time"] + dtypes_dic["unk"]
if obj_feat:
for obj_column in obj_feat:
column = X[obj_column].astype("str").astype("category")
# performs label encoding
_, inverse = np.unique(column, return_inverse=True)
X[obj_column] = inverse
cat_idx = [X.columns.get_loc(col) for col in obj_feat]
else:
obj_feat = None
cat_idx = None
return X, obj_feat, cat_idx
def validate_sample_weight(sample_weight):
"""Ensures sample_weight parameter is a numpy array."""
if isinstance(sample_weight, pd.Series):
return sample_weight.values
elif isinstance(sample_weight, np.ndarray):
return sample_weight
elif sample_weight is None:
return None
else:
raise ValueError("sample_weight must be an array-like object or None.")
[docs]def validate_sample_weight(sample_weight):
"""
Validate the sample_weight parameter.
Parameters
----------
sample_weight : array-like or None
Input sample weights.
Returns
-------
np.ndarray or None
If sample_weight is a Pandas Series, its values are returned as a
numpy array. If sample_weight is already a numpy array, it is
returned unmodified. If sample_weight is None, None is returned.
Raises
------
ValueError
If sample_weight is not an array-like object or None.
"""
if isinstance(sample_weight, pd.Series):
return sample_weight.values
elif isinstance(sample_weight, np.ndarray):
return sample_weight
elif sample_weight is None:
return None
else:
raise ValueError("sample_weight must be an array-like object or None.")
[docs]def check_if_tree_based(model):
"""check if estimator is tree based
Parameters
----------
model : object
the estimator to check
Returns
-------
condition : boolean
if tree based or not
"""
tree_based_models = [
"lightgbm",
"lgbm",
"xgboost",
"xgb",
"catboost",
"forest",
"boosting",
"tree",
]
return any(m in model.__class__.__name__.lower() for m in tree_based_models)
[docs]def is_lightgbm(estimator):
"""check if estimator is lightgbm
Parameters
----------
model : object
the estimator to check
Returns
-------
condition : boolean
if lgbm based or not
"""
is_lgb = "lgbm" in estimator.__class__.__name__.lower()
return is_lgb
[docs]def is_catboost(estimator):
"""check if estimator is catboost
Parameters
----------
model : object
the estimator to check
Returns
-------
condition : boolean
if catboost based or not
"""
is_cat = "catboost" in estimator.__class__.__name__.lower()
return is_cat
[docs]def is_xgboost(estimator):
"""check if estimator is xgboost
Parameters
----------
model : object
the estimator to check
Returns
-------
condition : boolean
if xgboost based or not
"""
is_xgb = "xgb" in estimator.__class__.__name__.lower()
return is_xgb
[docs]def LightForestRegressor(n_feat, n_estimators=10):
"""lightGBM implementation of the Random Forest regressor with the
ideal number of features, according to Elements of statistical learning
Parameters
----------
n_feat: int
the number of predictors (nbr of columns of the X matrix)
n_estimators : int, optional
the number of trees/estimators, by default 10
Returns
-------
lightgbm regressor
sklearn random forest estimator based on lightgbm
"""
feat_frac = n_feat / (3 * n_feat)
return lgb.LGBMRegressor(
verbose=-1,
force_col_wise=True,
n_estimators=n_estimators,
subsample=0.632,
colsample_bytree=feat_frac,
boosting_type="rf",
subsample_freq=1,
)
[docs]def LightForestClassifier(n_feat, n_estimators=10):
"""lightGBM implementation of the Random Forest classifier with the
ideal number of features, according to Elements of statistical learning
Parameters
----------
n_feat: int
the number of predictors (nbr of columns of the X matrix)
n_estimators : int, optional
the number of trees/estimators, by default 10
Returns
-------
lightgbm classifier
sklearn random forest estimator based on lightgbm
"""
feat_frac = np.sqrt(n_feat) / n_feat
return lgb.LGBMClassifier(
verbose=-1,
force_col_wise=True,
n_estimators=n_estimators,
subsample=0.632,
colsample_bytree=feat_frac,
boosting_type="rf",
subsample_freq=1,
)
[docs]def is_list_of_str(str_list):
"""Check if ``str_list`` is a list of strings.
Parameters
----------
str_list : list or None
The list to check.
Returns
-------
bool
True if the list is a list of strings, False otherwise.
"""
if (
str_list is not None
and isinstance(str_list, list)
and all(isinstance(s, str) for s in str_list)
):
return True
else:
return False
[docs]def is_list_of_bool(bool_list):
"""Check if ``bool_list`` is not a list of Booleans
Parameters
----------
bool_list : list of bool
the list we want to check for
Returns
-------
bool
True if list of Booleans, else False
"""
if (
bool_list is not None
and isinstance(bool_list, list)
and all(isinstance(s, bool) for s in bool_list)
):
return True
else:
return False
[docs]def is_list_of_int(int_list):
"""Check if ``int_list`` is not a list of integers
Parameters
----------
int_list : list of int
the list we want to check for
Returns
-------
bool
True if list of integers, else False
"""
if (
int_list is not None
and isinstance(int_list, list)
and all(isinstance(s, int) for s in int_list)
):
return True
else:
return False
[docs]def _get_titanic_data():
"""Load Titanic data and add dummies (random predictors, numeric and categorical) and
a genuine one, for benchmarking purpose. Classification (binary)
Returns
-------
object
Bunch sklearn, extension of dictionary
"""
# Fetch Titanic data and add random cat and numbers
# Example taken from https://scikit-learn.org/stable/auto_examples/inspection/
# plot_permutation_importance.html#sphx-glr-auto-examples-inspection-plot-permutation-importance-py
X, y = fetch_openml(
"titanic", version=1, as_frame=True, return_X_y=True, parser="auto"
)
rng = np.random.RandomState(seed=42)
nice_guys = ["Rick", "Bender", "Cartman", "Morty", "Fry", "Vador", "Thanos"]
X["random_cat"] = np.random.choice(nice_guys, X.shape[0])
X["random_num"] = rng.randn(X.shape[0])
X["family_size"] = X["parch"] + X["sibsp"]
X.drop(["parch", "sibsp"], axis=1, inplace=True)
X["is_alone"] = np.where(X["family_size"] > 1, 0, 1)
X["title"] = (
X["name"].str.split(", ", expand=True)[1].str.split(".", expand=True)[0]
)
X.loc[X["title"] == "Miss", "title"] = "Mrs"
title_counts = X["title"].value_counts()
rare_titles = title_counts[title_counts < 10].index
X.loc[X["title"].isin(rare_titles), "title"] = "rare"
categorical_columns = [
"pclass",
"sex",
"embarked",
"random_cat",
"is_alone",
"title",
]
numerical_columns = ["age", "family_size", "fare", "random_num"]
X = X[categorical_columns + numerical_columns]
# Preprocessing
categorical_pipe = make_pipeline(
SimpleImputer(strategy="constant", fill_value="missing")
)
numerical_pipe = make_pipeline(SimpleImputer(strategy="mean"))
preprocessor = make_column_transformer(
(categorical_pipe, categorical_columns),
(numerical_pipe, numerical_columns),
)
X = preprocessor.fit_transform(X)
# Encode categorical variables
X = pd.DataFrame(X, columns=categorical_columns + numerical_columns)
X[categorical_columns] = X[categorical_columns].astype(str)
X[numerical_columns] = X[numerical_columns].astype(float)
# Create sample weights
sample_weight = np.random.uniform(0, 1, len(y))
return Bunch(
data=X,
target=y,
sample_weight=sample_weight,
categorical=categorical_columns,
)
[docs]def _get_cancer_data():
"""Load breast cancer data and add dummies (random predictors) and a genuine one, for benchmarking purpose
Classification (binary)
Returns
-------
object
Bunch sklearn, extension of dictionary
"""
rng = np.random.RandomState(seed=42)
data = load_breast_cancer()
X, y = data.data, data.target
X = pd.DataFrame(X)
X.columns = data.feature_names
X["random_num1"] = rng.randn(X.shape[0])
X["random_num2"] = np.random.poisson(1, X.shape[0])
z = y.astype(int)
X["genuine_num"] = z * np.abs(
np.random.normal(0, 0.1, X.shape[0])
) + np.random.normal(0, 0.1, X.shape[0])
y = pd.Series(y)
return Bunch(data=X, target=y, sample_weight=None, categorical=None)
[docs]def _load_boston_data():
"""Load Boston data and add dummies (random predictors, numeric and categorical) and
a genuine one, for benchmarking purpose. Regression (positive domain).
Returns
-------
object
Bunch sklearn, extension of dictionary
"""
data_file_name = resource_filename(__name__, "dataset/data/boston_bunch.joblib")
return joblib.load(data_file_name)
[docs]def _load_housing(as_frame: bool = False):
"""Load the California housing data. See here
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_california_housing.html
for the downloadable version.
Parameters
----------
as_frame :
return a pandas dataframe? if not then a "Bunch" (enhanced dictionary) is returned (default ``True``)
Returns
-------
pd.DataFrame or Bunch
the dataset
"""
fdescr_name = resource_filename(__name__, "dataset/descr/housing.rst")
with open(fdescr_name) as f:
descr_text = f.read()
data_file_name = resource_filename(__name__, "dataset/data/housing.zip")
data = pd.read_csv(data_file_name)
feature_names = [
"MedInc",
"HouseAge",
"AveRooms",
"AveBedrms",
"Population",
"AveOccup",
"Latitude",
"Longitude",
]
if as_frame:
return data
else:
return Bunch(
data=data[feature_names].values,
target=data["target"].values,
feature_names=feature_names,
DESCR=descr_text,
filename=data_file_name,
)
[docs]def plot_y_vs_X(X, y, ncols=2, figsize=(10, 10)):
"""Plot target vs relevant and non-relevant predictors
Parameters
----------
X : pd.DataFrame
The DataFrame of the predictors.
y : np.array
The target.
ncols : int, optional
The number of columns in the facet plot. Default is 2.
figsize : tuple, optional
The figure size. Default is (10, 10).
Returns
-------
plt.figure
The univariate plots y vs pred_i.
"""
n_cols_to_plot = X.shape[1]
n_rows = int(np.ceil(n_cols_to_plot / ncols))
# Create figure and axes
f, axs = plt.subplots(nrows=n_rows, ncols=ncols, figsize=figsize)
for i, col in enumerate(X.columns):
row = i // ncols
col = i % ncols
axs[row, col].scatter(X[col], y, alpha=0.1)
axs[row, col].set_title(col)
# Hide unused subplots
for i in range(n_cols_to_plot, n_rows * ncols):
row = i // ncols
col = i % ncols
axs[row, col].set_axis_off()
# Adjust spacing between subplots
plt.tight_layout()
return f
[docs]def load_data(name="Titanic"):
"""Load some toy data set to test the All Relevant Feature Selection methods.
Dummies (random) predictors are added and ARFS should be able to filter them out.
The Titanic predictors are encoded (needed for scikit estimators).
Titanic and cancer are for binary classification, they contain synthetic random (dummies) predictors and a
noisy but genuine synthetic predictor. Hopefully, a good All Relevant FS should be able to detect all the
predictors genuinely related to the target.
Boston is for regression, this data set contains
Parameters
----------
name : str, optional
the name of the data set. Titanic is for classification with sample_weight,
Boston for regression and cancer for classification (without sample weight), by default 'Titanic'
Returns
-------
Bunch
extension of dictionary, accessible by key
Raises
------
ValueError
if the dataset name is invalid
"""
if name == "Titanic":
return _get_titanic_data()
elif name == "Boston":
return _load_boston_data()
elif name == "cancer":
return _get_cancer_data()
elif name == "housing":
return _load_housing(as_frame=False)
else:
raise ValueError(
"`name should be in ['Titanic', 'Boston', 'cancer', 'housing']`"
)
[docs]def _make_corr_dataset_regression(size=1000):
"""Generate an artificial dataset for regression tasks with columns that
are correlated, have no variance, large cardinality, numerical and categorical.
Parameters
----------
size : int, optional
number of rows to generate, by default 1000
Returns
-------
pd.DataFrame, pd.Series, pd.Series
the predictors matrix, the target and the weights
"""
# generate weights
w = np.random.beta(a=1, b=0.5, size=size)
# set seed for reproducibility
np.random.seed(42)
# generate target variable
sigma = 0.2
y = np.random.normal(1, sigma, size)
# generate correlated features
z = y - np.random.normal(1, sigma / 5, size) + np.random.normal(1, sigma / 5, size)
X = pd.DataFrame(
{
"var0": z,
"var1": y * np.abs(np.random.normal(0, sigma * 2, size))
+ np.random.normal(0, sigma / 10, size),
"var2": -y + np.random.normal(0, sigma, size),
"var3": y**2 + np.random.normal(0, sigma, size),
"var4": np.sqrt(y) + np.random.gamma(1, 0.2, size),
"var5": np.random.normal(0, 1, size),
"var6": np.random.poisson(1, size),
"var7": np.random.binomial(1, 0.3, size),
"var8": np.random.normal(0, 1, size),
"var9": np.random.poisson(1, size),
"var10": np.ones(size),
"var11": np.concatenate(
[
np.arange(start=0, stop=int(size / 2), step=1),
np.arange(start=0, stop=int(size / 2), step=1),
]
),
"var12": y**3 + np.abs(np.random.normal(0, 1, size)),
}
)
# introduce missing values
idx_nan = np.random.choice(size, int(round(size / 2)), replace=False)
X.loc[idx_nan, "var12"] = np.nan
# set column names and types
X.columns = ["var" + str(i) for i in range(13)]
X["var11"] = X["var11"].astype("category")
X["nice_guys"] = np.random.choice(
[
"Rick",
"Bender",
"Cartman",
"Morty",
"Fry",
"Vador",
"Thanos",
"Bejita",
"Cell",
"Tinkywinky",
"Lecter",
"Alien",
"Terminator",
"Drago",
"Dracula",
"Krueger",
"Geoffrey",
"Goldfinder",
"Blackbeard",
"Excel",
"SAS",
"Bias",
"Variance",
"Scrum",
"Human",
"Garry",
"Coldplay",
"Imaginedragons",
"Platist",
"Creationist",
"Gruber",
"KeyserSoze",
"Luthor",
"Klaue",
"Bane",
"MarkZ",
],
size,
)
return X, y, w
[docs]def _make_corr_dataset_classification(size=1000):
"""
Generate an artificial dataset for classification tasks. Some columns are correlated,
have no variance, large cardinality, numerical and categorical.
Parameters:
size (int): The number of rows to generate. Default is 1000.
Returns:
tuple: A tuple containing the predictors matrix, the target, and the weights.
"""
# Generate weights
w = np.random.beta(a=1, b=0.5, size=size)
# Fix the seed and generate the target
np.random.seed(42)
y = np.random.binomial(1, 0.5, size)
# Generate the predictors matrix
X = np.zeros((size, 13))
z = y - np.random.binomial(1, 0.1, size) + np.random.binomial(1, 0.1, size)
z[z == -1] = 0
z[z == 2] = 1
# Generate 5 relevant features, with positive and negative correlation to the target
X[:, 0] = z
X[:, 1] = y * np.abs(np.random.normal(0, 1, size)) + np.random.normal(0, 0.1, size)
X[:, 2] = -y + np.random.normal(0, 1, size)
X[:, 3] = y**2 + np.random.normal(0, 1, size)
X[:, 4] = np.sqrt(y) + np.random.binomial(2, 0.1, size)
# Generate 5 irrelevant features
X[:, 5:10] = np.random.normal(0, 1, size=(size, 5))
# Generate a column with zero variance
X[:, 10] = np.ones(size)
# Generate a column with high cardinality
X[:, 11] = np.arange(start=0, stop=size, step=1)
# Generate a column with a lot of missing values
idx_nan = np.random.choice(size, int(round(size / 2)), replace=False)
X[:, 12] = y**3 + np.abs(np.random.normal(0, 1, size))
X[idx_nan, 12] = np.nan
# Make the predictors matrix a pandas DataFrame
column_names = ["var" + str(i) for i in range(13)]
column_names[11] = "dummy"
X = pd.DataFrame(X, columns=column_names)
X["dummy"] = X["dummy"].astype("category")
# Add a column of random values from a list
nice_guys = [
"Rick",
"Bender",
"Cartman",
"Morty",
"Fry",
"Vador",
"Thanos",
"Bejita",
"Cell",
"Tinkywinky",
"Lecter",
"Alien",
"Terminator",
"Drago",
"Dracula",
"Krueger",
"Geoffrey",
"Goldfinder",
"Blackbeard",
"Excel",
"SAS",
"Bias",
"Variance",
"Scrum",
"Human",
"Garry",
"Coldplay",
"Imaginedragons",
"Platist",
"Creationist",
"Gruber",
"KeyserSoze",
"Luthor",
"Klaue",
"Bane",
"MarkZ",
]
return X, y, w