"""LassoFeatureSelection Submodule
This module provides LASSO-based feature selection, specifically designed for use with Generalized Linear Models (GLM).
The Lasso Regularized GLM introduces an L1 regularization penalty (Lasso regularization),
encouraging some coefficients to become exactly zero during the model fitting process.
This regularization effectively removes irrelevant features from the model, making it a
powerful tool for feature selection, particularly in datasets with numerous variables.
Module Structure:
-----------------
- `EnetGLM`: class serves as a scikit-learn wrapper for the regularized statsmodels GLM, providing seamless integration with scikit-learn's ecosystem.
- `weighted_cross_val_score`: function allows users to pass weights to the model and define a custom scoring metric.
- `grid_search_cv`: function performs a weighted LASSO grid search to find the best Lasso parameter for the model.
- `LassoFeatureSelection`: class is the core feature selection class, estimating the Lasso parameter through
the grid search process, enabling efficient and effective feature selection.
With this submodule, users can easily leverage Lasso Regularized GLMs and conduct feature selection,
improving model performance and interpretability in various datasets.
"""
import pandas as pd
import numpy as np
import statsmodels.api as sm
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin
from sklearn.base import clone
from sklearn.utils import check_array
from sklearn.model_selection import StratifiedKFold, KFold
from joblib import Parallel, delayed
from typing import Union, Optional
def _map_family_link(family: str = "gaussian", link: Optional[str] = None):
family_mapping = {
"gaussian": sm.families.Gaussian,
"binomial": sm.families.Binomial,
"poisson": sm.families.Poisson,
"gamma": sm.families.Gamma,
"negativebinomial": sm.families.NegativeBinomial,
"tweedie": sm.families.Tweedie,
}
link_mapping = {
"identity": sm.genmod.families.links.Identity(),
"log": sm.genmod.families.links.Log(),
"logit": sm.genmod.families.links.Logit(),
"probit": sm.genmod.families.links.Probit(),
"cloglog": sm.genmod.families.links.CLogLog(),
"inverse_squared": sm.genmod.families.links.InverseSquared(),
}
if link is not None:
objective = family_mapping[family](link_mapping[link])
else:
objective = family_mapping[family]()
return objective
[docs]class EnetGLM(BaseEstimator, RegressorMixin):
"""
Elastic Net Generalized Linear Model.
Parameters
----------
family : str, (default="gaussian")
The distributional assumption of the model. It can be any of the statsmodels distribution:
"gaussian", "binomial", "poisson", "gamma", "negativebinomial", "tweedie"
link : str, optional
the GLM link function. It can be any of the: "identity", "log", "logit", "probit", "cloglog", "inverse_squared"
alpha : float, optional (default=0.0)
The elastic net mixing parameter. 0 <= alpha <= 1.
alpha = 0 is equivalent to ridge regression, alpha = 1 is equivalent to lasso regression.
L1_wt : float, optional (default=0.0)
The weight of the L1 penalty term. 0 <= L1_wt <= 1.
The `L1_wt` parameter represents the weight of the L1 penalty term in the model and
should be within the range 0 to 1. A value of 0 corresponds to ridge regression,
while a value of 1 corresponds to lasso regression. However, for obtaining statistics,
`L1_wt` should be set to a value greater than 0. If it is set to 0.0, statsmodels returns
a ridge regularized wrapper without refitting the model, making the statistics unavailable
and breaking the class. Nevertheless, you can set `L1_wt` to a very small value, such as 1e-9,
to obtain close-to-ridge behavior while still obtaining the necessary statistics.
fit_intercept : bool, optional (default=True)
Whether to fit an intercept term in the model.
"""
[docs] def __init__(
self,
family: str = "gaussian",
link: Optional[str] = None,
alpha: float = 0.0,
L1_wt: float = 1e-6,
fit_intercept: bool = True,
):
"""
Initialize self.
Parameters
----------
family :
The distributional assumption of the model.
link:
the GLM link function
alpha :
The penalty weight. If a scalar, the same penalty weight applies to all variables in the model.
If a vector, it must have the same length as params, and contains a penalty weight for each coefficient.
L1_wt :
The `L1_wt` parameter represents the weight of the L1 penalty term in the model and
should be within the range 0 to 1. A value of 0 corresponds to ridge regression,
while a value of 1 corresponds to lasso regression. However, for obtaining statistics,
`L1_wt` should be set to a value greater than 0. If it is set to 0.0, statsmodels returns
a ridge regularized wrapper without refitting the model, making the statistics unavailable
and breaking the class. Nevertheless, you can set `L1_wt` to a very small value, such as 1e-9,
to obtain close-to-ridge behavior while still obtaining the necessary statistics.
fit_intercept :
Whether to fit an intercept term in the model.
"""
self.family = family
self.link = link
self.alpha = alpha
self.L1_wt = L1_wt
self.model = None
self.result = None
self.fit_intercept = fit_intercept
self.objective = _map_family_link(family=family, link=link)
[docs] def fit(
self,
X: pd.DataFrame,
y: Union[np.ndarray, pd.Series],
sample_weight: Optional[Union[np.ndarray, pd.Series]] = None,
):
"""
Fit the model to the data.
Notes
-----
In statsmodels and GLMs in general, you can use either an offset or a weight to account for
differences in exposure between observations. However, if you choose to use an offset,
you need to pass the number of cases (ncl) instead of the frequency and set the offset to
the logarithm of the exposure due to the log link function. It is recommended to use the frequency
and the weights instead of the offset because this ensures that all models have the same inputs.
To use the frequency and the weights, you can fit the model using the following code:
```python
self.model = sm.GLM(endog=y, exog=X, var_weights=sample_weight, family=self.family)
```
This is equivalent to using the exposure and the log of the exposure internally, which can be done using the following code:
```python
self.model = sm.GLM(endog=y, exog=sm.add_constant(X), exposure=sample_weight, family=sm.families.Poisson())
self.result = self.model.fit()
```
Parameters
----------
X :
array-like, shape (n_samples, n_features)
The input data.
y :
array-like, shape (n_samples,)
The target values.
sample_weight : array-like, shape (n_samples,), optional (default=None)
Sample weights.
Returns
-------
self : object
Returns self.
"""
# see the if kwargs.get("L1_wt", 1) == 0 condition in
# https://www.statsmodels.org/dev/_modules/statsmodels/genmod/generalized_linear_model.html#GLM.fit_regularized
# workaround to get the statistics
if self.alpha == 0.0:
self.alpha = 1e-9
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
X.columns = [f"pred_{i}" for i in range(X.shape[1])]
if self.fit_intercept:
X = sm.add_constant(X)
X = X.rename(columns={"const": "Intercept"})
else:
X = drop_existing_sm_constant_from_df(X)
self.n_features_in_ = X.shape[1]
self.model = sm.GLM(
endog=y,
exog=X,
var_weights=sample_weight,
family=self.objective,
)
self.result = self.model.fit_regularized(
method="elastic_net", alpha=self.alpha, L1_wt=self.L1_wt, refit=True
)
self.coef_ = self.result.params
self.bse_ = self.result.bse
self.deviance_ = self.result.deviance
self.pseudo_rsquared_ = self.result.pseudo_rsquared()
self.aic_ = self.result.aic
self.bic_ = self.result.bic_llf
self.pvalues_ = self.result.pvalues
self.tvalues_ = self.result.tvalues
self.pearson_chi2_ = self.result.pearson_chi2
[docs] def predict(self, X):
"""
Predict using the fitted model.
Parameters
----------
X :
array-like, shape (n_samples, n_features)
The input data.
Returns
-------
y : array-like, shape (n_samples,)
The predicted target values.
Raises
------
ValueError
If the model has not been fit.
"""
if self.model is None:
raise ValueError("Fit the model first.")
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
X.columns = [f"pred_{i}" for i in range(X.shape[1])]
if self.fit_intercept:
X = sm.add_constant(X)
X = X.rename(columns={"const": "Intercept"})
return self.result.predict(X)
[docs] def get_coef(self):
"""
Get the estimated coefficients of the fitted model.
Returns
-------
coef_ : array-like, shape (n_features,)
The estimated coefficients of the fitted model.
"""
return self.coef_
[docs] def score(
self,
X: pd.DataFrame,
y: pd.Series,
sample_weight: Optional[Union[np.ndarray, pd.Series]] = None,
):
"""
Return the deviance of the fitted model.
Parameters
----------
X :
array-like, shape (n_samples, n_features)
The input data.
sample_weight : array-like, shape (n_samples,), optional (default=None)
Sample weights.
Returns
-------
deviance : float
The deviance of the fitted model.
"""
mu = self.objective.link.inverse(self.predict(X))
var_weights = sample_weight if sample_weight is not None else 1.0
return self.objective.deviance(endog=y, mu=mu, var_weights=var_weights)
[docs] def summary(self):
"""
Print a summary of the fitted model.
Returns
-------
summary : str
The summary of the fitted model.
"""
return self.result.summary()
[docs]def weighted_cross_val_score(estimator, X, y, sample_weight=None, cv=5, n_jobs=-1):
"""
Perform cross-validation for a scikit-learn estimator with a score function that requires sample_weight.
Parameters
----------
estimator : estimator
The scikit-learn estimator object.
X : array-like of shape (n_samples, n_features)
The input features.
y : array-like of shape (n_samples,)
The target variable.
sample_weight : array-like of shape (n_samples,), optional
The sample weights for each data point.
cv : int, default=5
The number of cross-validation folds.
n_jobs:
the number of processes
Returns
-------
scores : array of shape (cv,)
The list of scores for each fold.
average_score : float
The average score across all folds.
"""
# logging.info("Starting cross-validation...")
splitter = (
KFold(n_splits=cv) if len(np.unique(y)) > 2 else StratifiedKFold(n_splits=cv)
)
if not hasattr(estimator, "score") or not callable(getattr(estimator, "score")):
raise ValueError(
"The estimator does not have a score method that takes a sample_weight argument."
)
with Parallel(n_jobs=n_jobs) as parallel:
scores = parallel(
delayed(_fit_and_score)(
estimator, X, y, train_index, test_index, sample_weight
)
for train_index, test_index in splitter.split(X)
)
# logging.info("Finished cross-validation.")
return scores
[docs]def _fit_and_score(
estimator: BaseEstimator,
X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, np.ndarray],
train_index: np.ndarray,
test_index: np.ndarray,
sample_weight: Optional[Union[pd.Series, np.ndarray]] = None,
) -> float:
"""
Fit and score an estimator on a specified train-test split.
Parameters
----------
estimator : BaseEstimator
The estimator object implementing the scikit-learn estimator interface.
X : Union[pd.DataFrame, np.ndarray]
The input features, can be either a pandas DataFrame or a numpy array.
y : Union[pd.Series, np.ndarray]
The target values, can be either a pandas Series or a numpy array.
train_index : np.ndarray
Array of indices representing the training data.
test_index : np.ndarray
Array of indices representing the test data.
sample_weight : Optional[Union[pd.Series, np.ndarray]], default=None
Sample weights to be used during training. Can be either a pandas Series or a numpy array.
Returns
-------
float
The score of the estimator on the test data.
Raises
------
ValueError
If the input data is not of the correct format.
"""
# X = X.values if isinstance(X, pd.DataFrame) else X
y = y.values if isinstance(y, pd.Series) else y
X_train, X_test = X.iloc[train_index, :], X.iloc[test_index, :]
y_train, y_test = y[train_index], y[test_index]
if sample_weight is not None:
sample_weight = (
sample_weight.values
if isinstance(sample_weight, pd.Series)
else sample_weight
)
sample_weight_train = sample_weight[train_index]
sample_weight_test = sample_weight[test_index]
estimator.fit(X_train, y_train, sample_weight=sample_weight_train)
score = estimator.score(X_test, y_test, sample_weight=sample_weight_test)
else:
estimator.fit(X_train, y_train)
score = estimator.score(X_test, y_test)
return score
[docs]def grid_search_cv(
X: Union[pd.DataFrame, np.ndarray],
y: Union[pd.Series, np.ndarray],
sample_weight: Optional[Union[pd.Series, np.ndarray]] = None,
n_iterations: int = 10,
family: str = "gaussian",
link: Optional[str] = None,
score: str = "bic",
fit_intercept: bool = True,
n_jobs: int = -1
) -> EnetGLM:
"""
Perform grid search cross-validation for an Elastic Net Generalized Linear Model (EnetGLM).
Parameters
----------
X : Union[pd.DataFrame, np.ndarray]
The input features, can be either a pandas DataFrame or a numpy array.
y : Union[pd.Series, np.ndarray]
The target values, can be either a pandas Series or a numpy array.
sample_weight : Optional[Union[pd.Series, np.ndarray]], default=None
Sample weights to be used during training. Can be either a pandas Series or a numpy array.
n_iterations : int, default=10
Number of iterations for the grid search.
family : str, default="gaussian"
The family of the GLM. Options: "gaussian", "poisson", "gamma", "negativebinomial", "binomial", "tweedie".
link : str, optional
the GLM link function. It can be any of the: "identity", "log", "logit", "probit", "cloglog", "inverse_squared"
score : str, default="bic"
The score to use for model selection. Options: "bic" (Bayesian Information Criterion) or "mean_cv" (mean cross-validation score).
n_jobs:
the number of processes
Returns
-------
EnetGLM
The best estimator found after grid search cross-validation.
Raises
------
ValueError
If the input data is not of the correct format or if an invalid family or score value is provided.
"""
estimator = EnetGLM(family=family, link=link, L1_wt=1.0, fit_intercept=fit_intercept)
# Check if X and y are pandas DataFrames/Series and convert them to numpy arrays if necessary
# X = check_array(X, accept_sparse=True, force_all_finite=False)
y = check_array(y, ensure_2d=False, force_all_finite=False)
if score not in ["bic", "deviance"]:
raise ValueError("Invalid score value. Options are: 'bic' or 'deviance'.")
grid = np.logspace(-3, 3, n_iterations)
param_score = []
for param in grid:
estimator = clone(estimator)
estimator.set_params(
**{
"alpha": param,
"L1_wt": 1.0,
"fit_intercept": fit_intercept,
"family": family,
}
)
if score == "bic":
estimator.fit(X=X, y=y, sample_weight=sample_weight)
param_score.append(estimator.bic_)
else:
scores = weighted_cross_val_score(
estimator, X, y, sample_weight=sample_weight, cv=5, n_jobs=n_jobs
)
param_score.append(np.mean(scores))
# min deviance or min BIC
best_alpha_value = grid[np.argmin(param_score)]
best_estimator = clone(estimator)
best_estimator.set_params(
**{
"alpha": best_alpha_value,
"L1_wt": 1.0,
"fit_intercept": fit_intercept,
"family": family,
}
)
best_estimator.fit(X, y, sample_weight=sample_weight)
return best_estimator
[docs]class LassoFeatureSelection(BaseEstimator, TransformerMixin):
"""
LassoFeatureSelection performs feature selection using GLM Lasso regularization.
Parameters
----------
family : str, (default="gaussian")
The distributional assumption of the model. It can be any of the statsmodels distribution:
"gaussian", "binomial", "poisson", "gamma", "negativebinomial", "tweedie"
link : str, optional
the GLM link function. It can be any of the: "identity", "log", "logit", "probit", "cloglog", "inverse_squared"
n_iterations : int, default=10
Number of iterations for the grid search.
score : str, default="bic"
The score to use for model selection. Options: "bic" (Bayesian Information Criterion) or "mean_cv" (mean cross-validation score).
n_jobs: int, default=-1
the number of processes. -1 means all the processes
Attributes
----------
family : str
The family of the GLM.
n_iterations : int
Number of iterations for the grid search.
best_estimator_ : EnetGLM
The best estimator found after grid search cross-validation.
selected_features_ : ndarray
The selected feature names.
support_ : ndarray
The support of selected features (True for selected, False otherwise).
feature_names_in_ : ndarray
The input feature names.
score : str
The score used for model selection.
n_jobs: int
the number of processes. -1 means all the processes
Methods
-------
fit(X, y=None, sample_weight=None)
Fit the LassoFeatureSelection model and select the best features.
transform(X)
Transform the input data to keep only the selected features.
get_feature_names_out()
Get the names of the selected features.
"""
def __init__(
self,
family: str = "gaussian",
link: Optional[str] = None,
n_iterations: int = 10,
score: str = "bic",
fit_intercept: bool = True,
n_jobs: int = -1
):
self.family = family
self.link = link
self.n_iterations = n_iterations
self.best_estimator_ = None
self.selected_features_ = None
self.support_ = None
self.feature_names_in_ = None
self.score = score
self.fit_intercept = fit_intercept
self.n_jobs = n_jobs
[docs] def fit(
self,
X: Union[pd.DataFrame, np.ndarray],
y: Optional[Union[pd.Series, np.ndarray]] = None,
sample_weight: Optional[Union[pd.Series, np.ndarray]] = None,
):
"""
Fit the LassoFeatureSelection model and select the best features.
Parameters
----------
X : Union[pd.DataFrame, np.ndarray]
The input features, can be either a pandas DataFrame or a numpy array.
y : Optional[Union[pd.Series, np.ndarray]], default=None
The target values, can be either a pandas Series or a numpy array.
sample_weight : Optional[Union[pd.Series, np.ndarray]], default=None
Sample weights to be used during training. Can be either a pandas Series or a numpy array.
Returns
-------
LassoFeatureSelection
The fitted LassoFeatureSelection model.
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
X.columns = [f"pred_{i}" for i in range(X.shape[1])]
if not self.fit_intercept:
X = drop_existing_sm_constant_from_df(X)
self.feature_names_in_ = (
X.columns.insert(0, "Intercept")
if self.fit_intercept and "Intercept" not in X.columns
else X.columns
)
self.best_estimator_ = grid_search_cv(
family=self.family,
link=self.link,
X=X,
y=y,
sample_weight=sample_weight,
n_iterations=self.n_iterations,
score=self.score,
fit_intercept=self.fit_intercept,
n_jobs=self.n_jobs,
)
self.support_ = self.best_estimator_.coef_ != 0
self.selected_features_ = self.feature_names_in_[self.support_]
return self
[docs] def get_feature_names_out(self) -> np.ndarray:
"""
Get the names of the selected features.
Returns
-------
np.ndarray
The names of the selected features.
"""
return self.feature_names_in_[self.support_]
[docs]def drop_existing_sm_constant_from_df(X):
X = X.drop(columns=["Intercept"]) if "Intercept" in X.columns else X
X = X.drop(columns=["const"]) if "const" in X.columns else X
X = X.drop(columns=["intercept"]) if "intercept" in X.columns else X
return X