""" Parallelized Association and Correlation matrix
This module provides parallelized methods for computing associations.
Namely, correlation, correlation ratio, Theil's U, Cramer's V
They are the basis of the MRmr feature selection
"""
import math
import warnings
import matplotlib
import numpy as np
import gc
import pandas as pd
import scipy.stats as ss
import matplotlib.pyplot as plt
import scipy.cluster.hierarchy as sch
import scipy.stats as ss
from mpl_toolkits.axes_grid1 import make_axes_locatable
from sklearn.utils import as_float_array, safe_sqr, safe_mask
from multiprocessing import cpu_count
from itertools import combinations, permutations, product
from pandas.api.types import is_numeric_dtype
from scipy.stats import rankdata
from functools import partial
from .parallel import (
parallel_matrix_entries,
parallel_df,
_compute_series,
_compute_matrix_entries,
)
from .utils import create_dtype_dict
_PRECISION = 1e-13
########################
# Redundancy measures
########################
# For computing the redundancy of all the columns with a given series
# R(y, x_i) for i=1,..., N --> a series (y is a fixed, chosen column)
# the main functions are:
# - the series-series computation (two columns)
# - the closure for applying the latter function to all columns of a dataframe
# - the "series" version, using the closure for computing the redundancy with all the cols of the DF
#
# For computing the redundancy matrix all the cols combinations of columns
# R(x_i, x_j) for i, j=1,..., N --> a data frame (either TRIUL if the measure is symmetric
# or the full matrix if asymmetric)
# - the series-series computation (two columns), same as for series case
# - the function looping over a chunk of combinations
# - the parallelization (sending different chunks to different cores and applying the latter function)
[docs]def symmetric_function(func):
func.is_symmetric = True
return func
[docs]def asymmetric_function(func):
func.is_symmetric = False
return func
[docs]def create_col_combinations(func, selected_cols):
"""
Create column combinations or permutations based on the symmetry of the function.
This function checks if `func` is symmetric. If it is, it creates combinations of `selected_cols`;
otherwise, it creates permutations.
Parameters
----------
func : callable
The function to check for symmetry. Should be decorated with `@symmetric_function`.
selected_cols : list
The columns to be combined or permuted.
Returns
-------
list of tuples
A list of tuples representing column combinations or permutations.
If `func` is symmetric, combinations of `selected_cols` are returned;
otherwise, permutations are returned.
"""
if getattr(func, "is_symmetric", False):
# If the function is symmetric, use combinations
return list(combinations(selected_cols, 2)) if selected_cols else []
else:
# If the function is not symmetric, use permutations
return list(permutations(selected_cols, 2)) if selected_cols else []
##################
# CAT-CAT
##################
[docs]def weighted_conditional_entropy(x, y, sample_weight=None):
"""
Computes the weighted conditional entropy between two categorical predictors.
Parameters
----------
x : pd.Series of shape (n_samples,)
The predictor vector.
y : pd.Series of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
Returns
-------
float
Weighted conditional entropy.
"""
# Handle sample_weight
if sample_weight is None:
sample_weight = np.ones(len(x))
elif np.count_nonzero(sample_weight) == 0:
raise ValueError(
"All elements in sample_weight are zero. Cannot divide by zero."
)
# Integer encoding for categorical data
y_encoded, _ = pd.factorize(y)
x_encoded, _ = pd.factorize(x)
# Total weight
tot_weight = np.sum(sample_weight)
if tot_weight == 0:
return 0
# Grouped weights for y and (x, y)
y_weights = np.bincount(
y_encoded, weights=sample_weight, minlength=len(np.unique(y_encoded))
)
xy_weights = {
level: np.bincount(
y_encoded[x_encoded == level],
weights=sample_weight[x_encoded == level],
minlength=len(np.unique(y_encoded)),
)
for level in np.unique(x_encoded)
}
# Conditional entropy calculation
h_xy = 0.0
for level in xy_weights:
for y_index, xy_weight in enumerate(xy_weights[level]):
p_xy = xy_weight / tot_weight
p_y = y_weights[y_index] / tot_weight
if p_xy != 0:
h_xy += p_xy * math.log(p_y / p_xy, math.e)
return h_xy
[docs]@asymmetric_function
def weighted_theils_u(x, y, sample_weight=None, as_frame=False):
"""
Computes the weighted Theil's U statistic between two categorical predictors.
Parameters
----------
x : pd.Series of shape (n_samples,)
The predictor vector.
y : pd.Series of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
as_frame : bool
Return output as a dataframe or a float.
Returns
-------
pd.DataFrame or float
Predictor names and value of the Theil's U statistic.
"""
if sample_weight is None:
sample_weight = np.ones(len(x))
tot_weight = np.sum(sample_weight)
# Integer encoding
y_encoded, y_unique = pd.factorize(y)
x_encoded, x_unique = pd.factorize(x)
# Extend bincount to cover all categories
y_weights = np.bincount(y_encoded, weights=sample_weight, minlength=len(y_unique))
x_weights = np.bincount(x_encoded, weights=sample_weight, minlength=len(x_unique))
# Entropy calculations
p_x = x_weights / tot_weight
h_x = ss.entropy(p_x)
h_xy = 0.0
for unique_x in np.unique(x_encoded):
x_mask = x_encoded == unique_x
y_sub_weights = np.bincount(
y_encoded[x_mask], weights=sample_weight[x_mask], minlength=len(y_unique)
)
p_xy = y_sub_weights / tot_weight
p_y = y_weights / tot_weight
# Avoid division by zero in log calculation
valid_mask = (p_xy != 0) & (p_y != 0)
h_xy += np.sum(p_xy[valid_mask] * np.log(p_y[valid_mask] / p_xy[valid_mask]))
if h_x == 0:
return 1.0
u = (h_x - h_xy) / h_x
# Check for floating point precision issues
if abs(u) < _PRECISION or abs(u - 1.0) < _PRECISION:
rounded_u = round(u)
warnings.warn(
f"Rounded U = {u} to {rounded_u}. This is probably due to floating point precision issues.",
RuntimeWarning,
)
u = rounded_u
# Return as DataFrame or float
if as_frame:
return pd.DataFrame({"row": [x.name], "col": [y.name], "val": [u]})
else:
return u
[docs]def theils_u_matrix(X, sample_weight=None, n_jobs=1, handle_na="drop"):
"""theils_u_matrix theils_u_matrix computes the weighted Theil's U statistic for
categorical-categorical association. This is an asymmetric coefficient: U(x,y) != U(y,x)
U(x, y) means the uncertainty of x given y: value is on the range of [0,1] -
where 0 means y provides no information about x, and 1 means y provides full information about x
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.DataFrame
The Theil's U matrix in a tidy (long) format.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
# Cramer's V only for categorical columns
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
cat_cols = dtypes_dic["cat"]
if cat_cols and (len(cat_cols) >= 2):
# explicitly store the unique 2-permutation of column names
# permutations and not combinations because U is asymmetric
comb_list = [comb for comb in permutations(cat_cols, 2)]
# define the number of cores
n_jobs = (
min(cpu_count(), len(cat_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
# parallelize jobs
theil_u_matrix_entries = partial(
_compute_matrix_entries, func_xyw=weighted_theils_u
)
lst = parallel_matrix_entries(
func=theil_u_matrix_entries,
df=X,
comb_list=comb_list,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return None
[docs]def theils_u_series(X, target, sample_weight=None, n_jobs=1, handle_na="drop"):
"""theils_u_series computes the weighted Theil's U statistic for
categorical-categorical association. This is an asymmetric coefficient: U(x,y) != U(y,x)
U(x, y) means the uncertainty of x given y: value is on the range of [0,1] -
where 0 means y provides no information about x, and 1 means y provides full information about x
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
target : str or int
the predictor name or index with which to compute association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.Series
The Theil's U series.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
if col_dtypes_dic[target] != "cat":
raise TypeError("the target column is not categorical")
# Cramer's V only for categorical columns
cat_cols = dtypes_dic["cat"]
if cat_cols:
# define the number of cores
n_jobs = (
min(cpu_count(), len(cat_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
# parallelize jobs
_theil_u = partial(_compute_series, func_xyw=weighted_theils_u)
lst = parallel_df(
func=_theil_u,
df=X[cat_cols],
series=X[target],
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return None
[docs]@symmetric_function
def cramer_v(x, y, sample_weight=None, as_frame=False):
"""
Computes the weighted V statistic of two categorical predictors.
Parameters
----------
x : pd.Series of shape (n_samples,)
The first categorical predictor.
y : pd.Series of shape (n_samples,)
The second categorical predictor, order doesn't matter, symmetrical association.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
as_frame : bool
Return output as a DataFrame or a float.
Returns
-------
pd.DataFrame or float
Single row DataFrame with the predictor names and the statistic value, or the statistic as a float.
"""
if sample_weight is None:
sample_weight = np.ones(len(x))
else:
sample_weight = np.asarray(sample_weight)
if sample_weight.sum() == 0:
raise ValueError("Sum of sample weights cannot be zero.")
weighted_tab = pd.crosstab(x, y, sample_weight, aggfunc=sum).fillna(0)
chi2 = ss.chi2_contingency(weighted_tab, correction=False)[0]
tot_weight = sample_weight.sum()
phi2 = chi2 / tot_weight
r, k = weighted_tab.shape
phi2corr = max(0, phi2 - ((k - 1) * (r - 1)) / (tot_weight - 1))
rcorr = r - ((r - 1) ** 2) / (tot_weight - 1)
kcorr = k - ((k - 1) ** 2) / (tot_weight - 1)
v = np.sqrt(phi2corr / min((kcorr - 1), (rcorr - 1)))
if as_frame:
x_name = x.name if x.name else "var1"
y_name = y.name if y.name else "var2"
return pd.DataFrame(
{"row": [x_name, y_name], "col": [y_name, x_name], "val": [v, v]}
)
else:
return v
[docs]def cramer_v_matrix(X, sample_weight=None, n_jobs=1, handle_na="drop"):
"""cramer_v_matrix computes the weighted Cramer's V statistic for
categorical-categorical association. This is a symmetric coefficient: V(x,y) = V(y,x)
It uses the corrected Cramer's V statistics, itself based on the chi2 contingency
table. The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.DataFrame
The Cramer's V matrix (lower triangular) in a tidy (long) format.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
# Cramer's V only for categorical columns
# in GLM supposed to be all the columns
cat_cols = dtypes_dic["cat"]
if cat_cols and (len(cat_cols) >= 2):
# explicitly store the unique 2-combinations of column names
comb_list = [comb for comb in combinations(cat_cols, 2)]
# define the number of cores
n_jobs = (
min(cpu_count(), len(cat_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
_cramer_v_matrix_entries = partial(_compute_matrix_entries, func_xyw=cramer_v)
lst = parallel_matrix_entries(
func=_cramer_v_matrix_entries,
df=X,
comb_list=comb_list,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return None
[docs]def cramer_v_series(X, target, sample_weight=None, n_jobs=1, handle_na="drop"):
"""cramer_v_series computes the weighted Cramer's V statistic for
categorical-categorical association. This is a symmetric coefficient: V(x,y) = V(y,x)
It uses the corrected Cramer's V statistics, itself based on the chi2 contingency
table. The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
target : str or int
the predictor name or index with which to compute association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.Series
The Cramer's V series
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
if col_dtypes_dic[target] != "cat":
raise TypeError("the target column is not categorical")
# Cramer's V only for categorical columns
cat_cols = dtypes_dic["cat"]
if cat_cols:
X = X[cat_cols]
# define the number of cores
n_jobs = (
min(cpu_count(), len(cat_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
# parallelize jobs
_cramer_v = partial(_compute_series, func_xyw=cramer_v)
lst = parallel_df(
func=_cramer_v,
df=X[cat_cols],
series=X[target],
sample_weight=sample_weight,
n_jobs=n_jobs,
)
# concatenate the results
# v_df_list = list(chain(*v_df_list))
return lst # pd.concat(lst)
else:
return None
[docs]def _weighted_correlation_ratio(*args):
"""Calculates the Correlation Ratio (sometimes marked by the greek letter Eta)
for categorical-continuous association.
Answers the question - given a continuous value of a measurement, is it
possible to know which category is it associated with?
Value is in the range [0,1], where 0 means a category cannot be determined
by a continuous measurement, and 1 means a category can be determined with
absolute certainty.
Based on the scikit-learn implementation of the unweighted version.
Returns
-------
float
value of the correlation ratio
"""
# Convert to float array and compute weights
args = [(np.asarray(a[0], dtype=float), np.asarray(a[1])) for a in args]
weight_per_class = np.array([a[1].sum() for a in args])
tot_weight = weight_per_class.sum()
# Weighted sum of squares and list of weighted sums
ss_alldata = sum((a[1] * np.square(a[0])).sum(axis=0) for a in args)
sums_args = [np.sum(a[0] * a[1], axis=0) for a in args]
square_of_sums_alldata = sum(sums_args) ** 2
# Total sum of squares and between-classes sum of squares
sstot = ss_alldata - square_of_sums_alldata / tot_weight
ssbn = sum(np.square(s) / w for s, w in zip(sums_args, weight_per_class))
ssbn -= square_of_sums_alldata / tot_weight
# Ensure sstot and ssbn are at least 1D arrays to avoid shape issues
sstot = np.atleast_1d(sstot)
ssbn = np.atleast_1d(ssbn)
# Handle constant features
constant_features_idx = np.where(sstot == 0.0)[0]
if np.any(ssbn) and constant_features_idx.size:
warnings.warn("Features %s are constant." % constant_features_idx, UserWarning)
# Correlation Ratio calculation
etasq = np.divide(ssbn, sstot, out=np.zeros_like(sstot), where=sstot != 0)
return np.sqrt(etasq).ravel()
[docs]@symmetric_function
def correlation_ratio(x, y, sample_weight=None, as_frame=False):
"""Compute the weighted correlation ratio. The association between a continuous predictor (y)
and a categorical predictor (x). It can be weighted.
Parameters
----------
x : pd.Series of shape (n_samples,)
The categorical predictor vector
y : pd.Series of shape (n_samples,)
The continuous predictor
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
as_frame: bool
return output as a dataframe or a float
Returns
-------
float
value of the correlation ratio
"""
if not (isinstance(x, pd.Series) and isinstance(y, pd.Series)):
raise TypeError(
f"Both x and y must be pandas Series. The type of x: {type(x)} and The type of y: {type(y)}"
)
if sample_weight is None:
sample_weight = np.ones_like(y)
# Determine the categorical and continuous variables
if x.dtype in ["category", "object", "bool"]:
categorical, continuous = x, y
elif y.dtype in ["category", "object", "bool"]:
categorical, continuous = y, x
else:
raise TypeError(
"One of the series must be categorical and the other numerical."
)
# Prepare arguments for the weighted correlation ratio calculation
unique_categories = np.unique(categorical)
args = [
(continuous[categorical == category], sample_weight[categorical == category])
for category in unique_categories
]
# Compute the weighted correlation ratio
v = _weighted_correlation_ratio(*args)[0]
# Format the result
if as_frame:
x_name = x.name if x.name else "var1"
y_name = y.name if y.name else "var2"
return pd.DataFrame(
{"row": [x_name, y_name], "col": [y_name, x_name], "val": [v, v]}
)
else:
return v
[docs]def correlation_ratio_matrix(X, sample_weight=None, n_jobs=1, handle_na="drop"):
"""correlation_ratio_matrix computes the weighted Correlation Ratio for
categorical-numerical association. This is a symmetric coefficient: CR(x,y) = CR(y,x)
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.DataFrame
The correlation ratio matrix (lower triangular) in a tidy (long) format.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
# Cramer's V only for categorical columns
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
cat_cols = dtypes_dic["cat"]
num_cols = dtypes_dic["num"]
if cat_cols and num_cols:
# explicitly store the unique 2-combinations of column names
# the first one should be the categorical predictor
comb_list = list(product(cat_cols, num_cols))
# define the number of cores
n_jobs = (
min(cpu_count(), len(cat_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
# parallelize jobs
corr_ratio_matrix_entries = partial(
_compute_matrix_entries, func_xyw=correlation_ratio
)
lst = parallel_matrix_entries(
func=corr_ratio_matrix_entries,
df=X,
comb_list=comb_list,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return None
[docs]def correlation_ratio_series(X, target, sample_weight=None, n_jobs=1, handle_na="drop"):
"""correlation_ratio_series computes the weighted correlation ration for
categorical-numerical association. This is a symmetric coefficient: CR(x,y) = CR(y,x)
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format, a series.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
target : str or int
the predictor name or index with which to compute association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.Series
The Correlation ratio series (lower triangular) in a tidy (long) format.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
if col_dtypes_dic[target] == "cat":
# if the target is categorical, pick only num predictors
pred_list = dtypes_dic["num"]
else:
# if the target is numerical, the 2nd pred should be categorical
pred_list = dtypes_dic["cat"]
if pred_list:
# define the number of cores
n_jobs = (
min(cpu_count(), len(pred_list))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
# parallelize jobs
_corr_ratio = partial(_compute_series, func_xyw=correlation_ratio)
lst = parallel_df(
func=_corr_ratio,
df=X[pred_list],
series=X[target],
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return None
[docs]def wm(x, w):
"""wm computes the weighted mean
Parameters
----------
x : array-like of shape (n_samples,)
the target array
w : array-like of shape (n_samples,)
the sample weights array
Returns
-------
float
weighted mean
"""
return np.sum(x * w) / np.sum(w)
[docs]def wcov(x, y, w):
"""wcov computes the weighted covariance
Parameters
----------
x : array-like of shape (n_samples,)
the predictor 1 array
y : array-like of shape (n_samples,)
the predictor 2 array
w : array-like of shape (n_samples,)
the sample weights array
Returns
-------
float
weighted covariance
"""
return np.sum(w * (x - wm(x, w)) * (y - wm(y, w))) / np.sum(w)
[docs]def wcorr(x, y, w):
"""wcov computes the weighted Pearson correlation coefficient
Parameters
----------
x : array-like of shape (n_samples,)
the predictor 1 array
y : array-like of shape (n_samples,)
the predictor 2 array
w : array-like of shape (n_samples,)
the sample weights array
Returns
-------
float
weighted correlation coefficient
"""
return wcov(x, y, w) / np.sqrt(wcov(x, x, w) * wcov(y, y, w))
[docs]def wrank(x, w):
"""wrank computes the weighted rank
Parameters
----------
x : array-like of shape (n_samples,)
the target array
w : array-like of shape (n_samples,)
the sample weights array
Returns
-------
float
weighted rank
"""
(unique, arr_inv, counts) = np.unique(
rankdata(x), return_counts=True, return_inverse=True
)
a = np.bincount(arr_inv, w)
return (np.cumsum(a) - a)[arr_inv] + ((counts + 1) / 2 * (a / counts))[arr_inv]
[docs]def wspearman(x, y, w):
"""wcov computes the weighted Spearman correlation coefficient
Parameters
----------
x : array-like of shape (n_samples,)
the perdictor 1 array
y : array-like of shape (n_samples,)
the perdictor 2 array
w : array-like of shape (n_samples,)
the sample weights array
Returns
-------
float
Spearman weighted correlation coefficient
"""
return wcorr(wrank(x, w), wrank(y, w), w)
[docs]@symmetric_function
def weighted_corr(x, y, sample_weight=None, as_frame=False, method="spearman"):
"""weighted_corr computes the weighted correlation coefficient (Pearson or Spearman)
Parameters
----------
x : pd.Series of shape (n_samples,)
The categorical predictor vector
y : pd.Series of shape (n_samples,)
The continuous predictor
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
as_frame: bool
return output as a dataframe or a float
method : str
either "spearman" or "pearson", by default "pearson"
Returns
-------
float or pd.DataFrame
weighted correlation coefficient
"""
if sample_weight is None:
sample_weight = np.ones_like(y)
if method == "pearson":
c = wcorr(x, y, sample_weight)
else:
c = wspearman(x, y, sample_weight)
if as_frame:
x_name = x.name if isinstance(x, pd.Series) else "var"
y_name = y.name if isinstance(y, pd.Series) else "target"
return pd.DataFrame(
{"row": [x_name, y_name], "col": [y_name, x_name], "val": [c, c]}
)
else:
return c
[docs]def wcorr_series(
X, target, sample_weight=None, n_jobs=1, handle_na="drop", method="spearman"
):
"""wcorr_series computes the weighted correlation coefficient (Pearson or Spearman) for
continuous-continuous association. This is an symmetric coefficient: corr(x,y) = corr(y,x)
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
target : str or int
the predictor name or index with which to compute association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
method : str
either "spearman" or "pearson", by default "spearman"
Returns
-------
pd.Series
The weighted correlation series.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
if col_dtypes_dic[target] == "cat":
raise TypeError("the target column is categorical")
num_cols = dtypes_dic["num"]
y = X[target]
if num_cols:
_wcorr_method = partial(weighted_corr, method=method)
# parallelize jobs
_wcorr_method_series = partial(_compute_series, func_xyw=_wcorr_method)
return parallel_df(
func=_wcorr_method_series,
df=X[num_cols],
series=y,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
else:
return None
[docs]def wcorr_matrix(X, sample_weight=None, n_jobs=1, handle_na="drop", method="spearman"):
"""wcorr_matrix computes the weighted correlation statistic for
(Pearson or Spearman) for continuous-continuous association.
This is an symmetric coefficient: corr(x,y) = corr(y,x)
The computation is embarrassingly parallel and is distributed on available cores.
Moreover, the statistic is computed for the unique combinations only and returned in a
tidy (long) format.
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
method : str
either "spearman" or "pearson"
Returns
-------
pd.DataFrame
The Cramer's V matrix (lower triangular) in a tidy (long) format.
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
num_cols = dtypes_dic["num"]
if num_cols:
# explicitely store the unique 2-combinations of column names
comb_list = [comb for comb in combinations(num_cols, 2)]
# define the number of cores
n_jobs = (
min(cpu_count(), len(num_cols))
if n_jobs == -1
else min(cpu_count(), n_jobs)
)
if (n_jobs > 1) or (method != "pearson"):
_wcorr_method = partial(weighted_corr, method=method)
_wcorr_method_entries = partial(
_compute_matrix_entries, func_xyw=_wcorr_method
)
lst = parallel_matrix_entries(
func=_wcorr_method_entries,
df=X,
comb_list=comb_list,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
return lst
else:
return (
matrix_to_xy(weighted_correlation_1cpu(X, sample_weight, handle_na))
.to_frame()
.reset_index()
.rename(columns={"level_0": "row", "level_1": "col", 0: "val"})
)
else:
return None
[docs]def weighted_correlation_1cpu(X, sample_weight=None, handle_na="drop"):
"""weighted_correlation computes the lower triangular weighted correlation matrix
using a single CPU, therefore using common numpy linear algebra
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.DataFrame
the lower triangular weighted correlation matrix in long format
"""
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
# degree of freedom for the second moment estimator
ddof = 1
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
numeric_cols = dtypes_dic["num"]
if numeric_cols:
data = X[numeric_cols]
col_idx = data.columns
data = data.values
sum_weights = sample_weight.sum()
weighted_sum = np.dot(data.T, sample_weight)
weighted_mean = weighted_sum / sum_weights
demeaned = data - weighted_mean
sum_of_squares = np.dot((demeaned**2).T, sample_weight)
weighted_std = np.sqrt(sum_of_squares / (sum_weights - ddof))
weighted_cov = np.dot(sample_weight * demeaned.T, demeaned)
weighted_cov /= sum_weights - ddof
weighted_corcoef = pd.DataFrame(
weighted_cov / weighted_std / weighted_std[:, None],
index=col_idx,
columns=col_idx,
)
return weighted_corcoef
else:
return None
#################################
# association
# cat-cat + cat-cont + cont-cont
#################################
[docs]def association_series(
X,
target,
features=None,
sample_weight=None,
nom_nom_assoc=weighted_theils_u,
num_num_assoc=partial(weighted_corr, method="spearman"),
nom_num_assoc=correlation_ratio,
normalize=False,
n_jobs=1,
handle_na="drop",
):
"""
Computes the association series for different types of predictors.
This function calculates the association between the specified `target` and other predictors in `X`.
It supports different types of associations: nominal-nominal, numerical-numerical, and nominal-numerical.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Predictor dataframe.
target : str or int
The predictor name or index with which to compute the association.
features : list of str, optional
List of features with which to compute the association. If None, all features in X are used.
sample_weight : array-like, shape (n_samples,), optional
The weight vector, by default None.
nom_nom_assoc : callable
Function to compute the nominal-nominal (categorical-categorical) association.
It should take two pd.Series and an optional weight array, and return a single number.
num_num_assoc : callable
Function to compute the numerical-numerical association.
It should take two pd.Series and return a single number.
nom_num_assoc : callable
Function to compute the nominal-numerical association.
It should take two pd.Series and return a single number.
normalize : bool, optional
Whether to normalize the scores or not. If True, scores are normalized to the range [0, 1].
n_jobs : int, optional
The number of cores to use for the computation. The default, -1, uses all available cores.
handle_na : str, optional
How to handle NA values. Options are 'drop', 'fill', and None. The default, 'drop', drops rows with NA values.
Returns
-------
pd.Series
A series with all the association values with the target column, sorted in descending order.
Raises
------
TypeError
If `features` is provided but is not a list of strings.
Examples
--------
>>> import pandas as pd
>>> from sklearn import datasets
>>> iris = datasets.load_iris()
>>> X = pd.DataFrame(iris.data, columns=iris.feature_names)
>>> association_series(X, 'sepal length (cm)', num_num_assoc=my_num_num_function)
Notes
-----
The function dynamically selects the appropriate association method based on the data types
of the target and other predictors. For numerical-numerical associations,
it uses `num_num_assoc`; for nominal-nominal, `nom_nom_assoc`; and for nominal-numerical, `nom_num_assoc`.
"""
# Input validation and preprocessing
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
if features is not None:
if not all(isinstance(f, str) for f in features):
raise TypeError("Features must be a list of strings.")
data = X[features + [target]]
else:
data = X.copy()
# Determine the data types
is_numeric = pd.api.types.is_numeric_dtype
numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = data.select_dtypes(exclude=[np.number]).columns.tolist()
# Compute associations based on data types
if all(is_numeric(data[col]) for col in data.columns):
assoc_series = _callable_association_series_fn(
num_num_assoc, data, target, sample_weight, n_jobs, "num-num"
)
elif all(not is_numeric(data[col]) for col in data.columns):
assoc_series = _callable_association_series_fn(
nom_nom_assoc, data, target, sample_weight, n_jobs, "nom-nom"
)
else:
assoc_series = _callable_association_series_fn(
nom_num_assoc, data, target, sample_weight, n_jobs, "nom-num"
)
# Additional association for target-specific types
if is_numeric(data[target]):
assoc_series_complement = _callable_association_series_fn(
num_num_assoc, data, target, sample_weight, n_jobs, "num-num"
)
assoc_series = pd.concat([assoc_series, assoc_series_complement])
elif not is_numeric(data[target]):
assoc_series_complement = _callable_association_series_fn(
nom_nom_assoc, data, target, sample_weight, n_jobs, "nom-nom"
)
assoc_series = pd.concat([assoc_series, assoc_series_complement])
# Normalize if required
if normalize:
assoc_series = (assoc_series - assoc_series.min()) / np.ptp(assoc_series)
return assoc_series.sort_values(ascending=False)
[docs]def association_matrix(
X,
sample_weight=None,
nom_nom_assoc=weighted_theils_u,
num_num_assoc=weighted_corr,
nom_num_assoc=correlation_ratio,
n_jobs=1,
handle_na="drop",
):
"""
Computes the association matrix for continuous-continuous, categorical-continuous,
and categorical-categorical predictors using specified callable functions.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Predictor dataframe.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
nom_nom_assoc : callable
Function to compute the categorical-categorical association.
num_num_assoc : callable
Function to compute the numerical-numerical association.
nom_num_assoc : callable
Function to compute the categorical-numerical association.
n_jobs : int, optional
The number of cores to use for the computation, by default 1.
handle_na : str, optional
How to handle NA values ('drop', 'fill', or None), by default "drop".
Returns
-------
pd.DataFrame
The association matrix.
"""
# Input validation and preprocessing
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
n_cat_cols = len(dtypes_dic["cat"])
n_num_cols = len(dtypes_dic["num"])
df_to_concat = []
# Numerical-Numerical Associations
if n_num_cols >= 2:
w_num_num = _callable_association_matrix_fn(
assoc_fn=num_num_assoc,
X=X,
sample_weight=sample_weight,
n_jobs=n_jobs,
kind="num-num",
)
df_to_concat.append(w_num_num)
# Categorical-Numerical Associations
if n_num_cols >= 1 and n_cat_cols >= 1:
w_nom_num = _callable_association_matrix_fn(
assoc_fn=nom_num_assoc,
X=X,
sample_weight=sample_weight,
n_jobs=n_jobs,
kind="nom-num",
)
df_to_concat.append(w_nom_num)
# Categorical-Categorical Associations
if n_cat_cols >= 2:
w_nom_nom = _callable_association_matrix_fn(
assoc_fn=nom_nom_assoc,
X=X,
sample_weight=sample_weight,
n_jobs=n_jobs,
kind="nom-nom",
)
df_to_concat.append(w_nom_nom)
return (
pd.concat(df_to_concat, ignore_index=True) if df_to_concat else pd.DataFrame()
)
[docs]def _callable_association_series_fn(
assoc_fn, X, target, sample_weight=None, n_jobs=1, kind="nom-nom"
):
"""_callable_association_series_fn private function, utility for computing association series
for a callable custom association
Parameters
----------
assoc_fn : callable
a function which receives two `pd.Series` (and optionally a weight array) and returns a single number
X : array-like of shape (n_samples, n_features)
predictor dataframe
target : str or int
the predictor name or index with which to compute association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
kind : str
kind of association, either 'num-num' or 'nom-nom' or 'nom-num'
Returns
-------
pd.Series
the association series
Raises
------
ValueError
if kind is not 'num-num' or 'nom-nom' or 'nom-num'
"""
X, sample_weight = _check_association_input(X, sample_weight, handle_na="drop")
# Validate 'kind' parameter
valid_kinds = ["num-num", "nom-nom", "nom-num"]
if kind not in valid_kinds:
raise ValueError(f"kind must be one of {valid_kinds}")
# Create dtype dictionaries
col_dtypes_dic = create_dtype_dict(X)
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
# Determine predictor list based on 'kind'
if kind in ["nom-nom", "nom-num"]:
if kind == "nom-nom" and col_dtypes_dic[target] != "cat":
raise TypeError(
"Target column is not categorical for 'nom-nom' association"
)
pred_list = (
dtypes_dic["cat"]
if kind == "nom-nom"
else dtypes_dic["num"]
if col_dtypes_dic[target] == "cat"
else dtypes_dic["cat"]
)
else: # kind == 'num-num'
pred_list = dtypes_dic["num"]
# Return None if no predictors are available
if not pred_list:
return None
# Define the number of cores
n_jobs = (
min(cpu_count(), len(pred_list)) if n_jobs == -1 else min(cpu_count(), n_jobs)
)
# Setup parallel computation
_assoc_fn = partial(_compute_series, func_xyw=assoc_fn)
return parallel_df(
func=_assoc_fn,
df=X[pred_list],
series=X[target],
sample_weight=sample_weight,
n_jobs=n_jobs,
)
[docs]def _callable_association_matrix_fn(
assoc_fn, X, sample_weight=None, n_jobs=1, kind="nom-nom", cols_comb=None
):
"""_callable_association_matrix_fn private function, utility for computing association matrix
for a callable custom association
Parameters
----------
assoc_fn : callable
a function which receives two `pd.Series` (and optionally a weight array) and returns a single number
X : array-like of shape (n_samples, n_features)
predictor dataframe
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
kind : str
kind of association, either 'num-num' or 'nom-nom' or 'nom-num'
cols_comb : list of 2-uple of str, optional
combination of column names (list of 2-uples of strings)
Returns
-------
pd.DataFrame
the association matrix
"""
# Validate 'kind' parameter
valid_kinds = ["num-num", "nom-nom", "nom-num"]
if kind not in valid_kinds:
raise ValueError(f"kind must be one of {valid_kinds}")
# Create dtype dictionaries
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
# Determine column combinations based on 'kind' and 'cols_comb'
if cols_comb is None:
if kind == "num-num":
selected_cols = dtypes_dic["num"]
cols_comb = create_col_combinations(assoc_fn, selected_cols)
elif kind == "nom-nom":
selected_cols = dtypes_dic["cat"]
cols_comb = create_col_combinations(assoc_fn, selected_cols)
elif kind == "nom-num":
# cols_comb = create_col_combinations(assoc_fn, selected_cols)
cols_comb = list(product(dtypes_dic["cat"], dtypes_dic["num"]))
# Return None if no column combinations are available
if not cols_comb:
return None
# Define the number of cores
n_jobs = (
min(cpu_count(), len(cols_comb)) if n_jobs == -1 else min(cpu_count(), n_jobs)
)
# Setup parallel computation
_assoc_fn = partial(_compute_matrix_entries, func_xyw=assoc_fn)
return parallel_matrix_entries(
func=_assoc_fn,
df=X,
comb_list=cols_comb,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
################################
# Association predictor-target
################################
[docs]def f_oneway_weighted(*args):
"""
Calculate the weighted F-statistic for one-way ANOVA (continuous target, categorical predictor).
Parameters
----------
X : array-like of shape (n_samples,)
The predictor dataframe.
y : array-like of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
Returns
-------
float
The value of the F-statistic.
Notes
-----
The F-statistic is calculated as:
.. math::
F(rf) = \\frac{\\sum_i (\\bar{Y}_{i \\bullet} - \\bar{Y})^2 / (K-1)}{\\sum_i \\sum_k (\\bar{Y}_{ij} - \\bar{Y}_{i\\bullet})^2 / (N - K)}
"""
# how many levels (predictor)
n_classes = len(args)
# convert to float 2-tuple array
args = [as_float_array(a) for a in args]
# compute the total weight per level
weight_per_class = np.array([a[1].sum() for a in args])
# total weight
tot_weight = np.sum(weight_per_class)
# weighted sum of squares
ss_alldata = sum((a[1] * safe_sqr(a[0])).sum(axis=0) for a in args)
# list of weighted sums
sums_args = [np.asarray((a[0] * a[1]).sum(axis=0)) for a in args]
square_of_sums_alldata = sum(sums_args) ** 2
square_of_sums_args = [s**2 for s in sums_args]
sstot = ss_alldata - square_of_sums_alldata / float(tot_weight)
ssbn = 0.0
for k, _ in enumerate(args):
ssbn += square_of_sums_args[k] / weight_per_class[k]
ssbn -= square_of_sums_alldata / float(tot_weight)
sswn = sstot - ssbn
dfbn = n_classes - 1
dfwn = tot_weight - n_classes
msb = ssbn / float(dfbn)
msw = sswn / float(dfwn)
constant_features_idx = np.where(msw == 0.0)[0]
if np.nonzero(msb)[0].size != msb.size and constant_features_idx.size:
warnings.warn("Features %s are constant." % constant_features_idx, UserWarning)
f = msb / msw
# flatten matrix to vector in sparse case
f = np.asarray(f).ravel()
return f
[docs]def f_cat_regression(x, y, sample_weight=None, as_frame=False):
"""f_cat_regression computes the weighted ANOVA F-value for the provided sample.
(continuous target, categorical predictor)
Parameters
----------
x : pd.Series of shape (n_samples,)
The predictor vector, the first categorical predictor
y : pd.Series of shape (n_samples,)
second categorical predictor, order doesn't matter, symmetrical association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
as_frame: bool
return output as a dataframe or a float
Returns
-------
float
value of the F-statistic
"""
if sample_weight is None:
sample_weight = np.ones_like(y)
# one 2-uple per level of the categorical feature x
args = [
(
y[safe_mask(y, x == k)],
sample_weight[safe_mask(sample_weight, x == k)],
)
for k in np.unique(x)
]
if as_frame:
x_name = x.name if isinstance(x, pd.Series) else "var"
y_name = y.name if isinstance(y, pd.Series) else "target"
return pd.DataFrame(
{"row": x_name, "col": y_name, "val": f_oneway_weighted(*args)[0]},
index=[0],
)
else:
return f_oneway_weighted(*args)[0]
[docs]def f_cat_regression_parallel(X, y, sample_weight=None, n_jobs=1, handle_na="drop"):
"""f_cat_regression_parallel computes the weighted ANOVA F-value for the provided categorical predictors
using parallelization of the code (continuous target, categorical predictor).
Parameters
----------
X : array-like of shape (n_samples, n_features)
predictor dataframe
y : array-like of shape (n_samples,)
The target vector
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default 1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.Series
the value of the F-statistic for each predictor
"""
# Cramer's V only for categorical columns
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
cat_cols = dtypes_dic["cat"]
if not isinstance(y, pd.Series):
y = pd.Series(y)
y.name = "target"
target = y.name
X = X.copy()
X[target] = y.values
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
y = X[target].copy()
# Ensure 'target' is a valid column in X
if target in X.columns:
X = X.drop(columns=[target])
# define the number of cores
n_jobs = min(cpu_count(), X.shape[1]) if n_jobs == -1 else min(cpu_count(), n_jobs)
# parallelize jobs
_f_stat_cat = partial(_compute_series, func_xyw=f_cat_regression)
return parallel_df(
func=_f_stat_cat,
df=X[cat_cols],
series=y,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
[docs]def f_cont_regression_parallel(
X, y, sample_weight=None, n_jobs=-1, force_finite=True, handle_na="drop"
):
"""Univariate linear regression tests returning F-statistic.
Quick linear model for testing the effect of a single regressor,
sequentially for many regressors.
This is done in 2 steps:
1. The cross-correlation between each regressor and the target is computed using:
E[(X[:, i] - mean(X[:, i])) * (y - mean(y))] / (std(X[:, i]) * std(y))
2. It is converted to an F score ranks features in the same order if all the features
are positively correlated with the target.
Note that it is therefore recommended as a feature selection criterion to identify
potentially predictive features for a downstream classifier, irrespective of
the sign of the association with the target variable.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The predictor dataframe.
y : array-like of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
n_jobs : int, optional
The number of cores to use for the computation, by default -1.
handle_na : str, optional
Either drop rows with NaN, fill NaN with 0, or do nothing, by default "drop".
force_finite : bool, optional
Whether or not to force the F-statistics and associated p-values to
be finite. There are two cases where the F-statistic is expected to not
be finite:
- when the target `y` or some features in `X` are constant. In this
case, the Pearson's R correlation is not defined leading to obtain
`np.nan` values in the F-statistic and p-value. When
`force_finite=True`, the F-statistic is set to `0.0` and the
associated p-value is set to `1.0`.
- when a feature in `X` is perfectly correlated (or
anti-correlated) with the target `y`. In this case, the F-statistic
is expected to be `np.inf`. When `force_finite=True`, the F-statistic
is set to `np.finfo(dtype).max`.
Returns
-------
f_statistic : array-like of shape (n_features,)
F-statistic for each feature.
"""
if not isinstance(y, pd.Series):
y = pd.Series(y)
y.name = "target"
target = y.name
X = X.copy()
X[target] = y.values
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
correlation_coefficient = wcorr_series(X, target, sample_weight, n_jobs, handle_na)
deg_of_freedom = y.size - 2
corr_coef_squared = correlation_coefficient**2
with np.errstate(divide="ignore", invalid="ignore"):
f_statistic = corr_coef_squared / (1 - corr_coef_squared) * deg_of_freedom
if force_finite and not np.isfinite(f_statistic).all():
# case where there is a perfect (anti-)correlation
# f-statistics can be set to the maximum and p-values to zero
mask_inf = np.isinf(f_statistic)
f_statistic[mask_inf] = np.finfo(f_statistic.dtype).max
# case where the target or some features are constant
# f-statistics would be minimum and thus p-values large
mask_nan = np.isnan(f_statistic)
f_statistic[mask_nan] = 0.0
return f_statistic.drop(labels=[target]).sort_values(ascending=False)
[docs]def f_stat_regression_parallel(
X, y, sample_weight=None, n_jobs=-1, force_finite=True, handle_na="drop"
):
"""
Compute the weighted explained variance for the provided categorical and numerical predictors using parallelization.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The predictor dataframe.
y : array-like of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
n_jobs : int, optional
The number of cores to use for the computation, by default -1.
handle_na : str, optional
Either drop rows with NA, fill NA with 0, or do nothing, by default "drop".
force_finite : bool, optional
Whether or not to force the F-statistics and associated p-values to be finite.
There are two cases where the F-statistic is expected to not be finite:
- When the target `y` or some features in `X` are constant. In this case,
the Pearson's R correlation is not defined leading to obtain `np.nan`
values in the F-statistic and p-value. When `force_finite=True`, the
F-statistic is set to `0.0` and the associated p-value is set to `1.0`.
- When a feature in `X` is perfectly correlated (or anti-correlated)
with the target `y`. In this case, the F-statistic is expected to be `np.inf`.
When `force_finite=True`, the F-statistic is set to `np.finfo(dtype).max`.
Returns
-------
pd.Series
The value of the F-statistic for each predictor.
"""
f_stat_cont_series = f_cont_regression_parallel(
X,
y,
sample_weight,
n_jobs,
force_finite=force_finite,
handle_na=handle_na,
)
f_stat_cat_series = f_cat_regression_parallel(
X, y, sample_weight, n_jobs, handle_na=handle_na
)
# normalize the scores
# correlation coefficient varies in the range [-1, 1]
# correlation ratio varies in the range [0, 1]
# Cramer's V varies in the range [0, 1]
# Theil's U varies in the range [0, 1]
# F-statistic varies in the range [0, +inf] but both kind of F stat are not necessary on the same scale
# in order to have a similar scale and compare both kind, one can studentize them
if X.shape[1] > 1:
f_stat_cont_series = (
f_stat_cont_series - f_stat_cont_series.mean()
) / f_stat_cont_series.std()
f_stat_cat_series = (
f_stat_cat_series - f_stat_cat_series.mean()
) / f_stat_cat_series.std()
return pd.concat([f_stat_cont_series, f_stat_cat_series]).sort_values(
ascending=False
)
[docs]def f_cont_classification(x, y, sample_weight=None, as_frame=False):
"""f_cont_classification computes the weighted ANOVA F-value for the provided sample.
Categorical target, continuous predictor.
Parameters
----------
x : pd.Series of shape (n_samples,)
The predictor vector, the first categorical predictor
y : pd.Series of shape (n_samples,)
second categorical predictor, order doesn't matter, symmetrical association
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
as_frame: bool
return output as a dataframe or a float
Returns
-------
float :
value of the F-statistic
"""
if sample_weight is None:
sample_weight = np.ones_like(y)
# one 2-uple per level of the categorical target y, continuous predictor x
args = [
(
x[safe_mask(x, y == k)],
sample_weight[safe_mask(sample_weight, y == k)],
)
for k in np.unique(y)
]
if as_frame:
x_name = x.name if isinstance(x, pd.Series) else "var"
y_name = y.name if isinstance(y, pd.Series) else "target"
return pd.DataFrame(
{"row": x_name, "col": y_name, "val": f_oneway_weighted(*args)[0]},
index=[0],
)
else:
return f_oneway_weighted(*args)[0]
[docs]def f_cont_classification_parallel(
X, y, sample_weight=None, n_jobs=-1, handle_na="drop"
):
"""f_cont_classification_parallel computes the weighted ANOVA F-value
for the provided categorical predictors using parallelization of the code.
Categorical target, continuous predictor.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The set of regressors that will be tested sequentially
y : array-like of shape (n_samples,)
The target vector
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None
n_jobs : int, optional
the number of cores to use for the computation, by default -1
handle_na : str, optional
either drop rows with na, fill na with 0 or do nothing, by default "drop"
Returns
-------
pd.Series
the value of the F-statistic for each predictor
"""
dtypes_dic = create_dtype_dict(X, dic_keys="dtypes")
num_cols = dtypes_dic["num"]
if not isinstance(y, pd.Series):
y = pd.Series(y)
y.name = "target"
target = y.name
X = X.copy()
X[target] = y.values
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
y = X[target].copy()
X = X.drop(target, axis=1)
# define the number of cores
n_jobs = min(cpu_count(), X.shape[1]) if n_jobs == -1 else min(cpu_count(), n_jobs)
# parallelize jobs
_f_stat_cont_clf = partial(_compute_series, func_xyw=f_cont_classification)
return parallel_df(
func=_f_stat_cont_clf,
df=X[num_cols],
series=y,
sample_weight=sample_weight,
n_jobs=n_jobs,
)
[docs]def f_cat_classification_parallel(
X,
y,
sample_weight=None,
n_jobs=-1,
force_finite=True,
handle_na="drop",
):
"""
Univariate information dependence.
It ranks features in the same order if all the features are positively correlated with the target.
Note that it is therefore recommended as a feature selection criterion to identify
potentially predictive features for a downstream classifier, irrespective of
the sign of the association with the target variable.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The predictor dataframe.
y : array-like of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
n_jobs : int, optional
The number of cores to use for the computation, by default -1.
handle_na : str, optional
Either drop rows with NaN, fill NaN with 0, or do nothing, by default "drop".
force_finite : bool, optional
Whether or not to force the F-statistics and associated p-values to
be finite. There are two cases where the F-statistic is expected to not
be finite:
- when the target `y` or some features in `X` are constant. In this
case, the Pearson's R correlation is not defined leading to obtain
`np.nan` values in the F-statistic and p-value. When
`force_finite=True`, the F-statistic is set to `0.0` and the
associated p-value is set to `1.0`.
- when a feature in `X` is perfectly correlated (or
anti-correlated) with the target `y`. In this case, the F-statistic
is expected to be `np.inf`. When `force_finite=True`, the F-statistic
is set to `np.finfo(dtype).max`.
Returns
-------
f_statistic : array-like of shape (n_features,)
F-statistic for each feature.
"""
if not isinstance(y, pd.Series):
y = pd.Series(y)
y.name = "target"
target = y.name
X = X.copy()
X[target] = y.values
# sanity checks
X, sample_weight = _check_association_input(X, sample_weight, handle_na)
deg_of_freedom = y.size - 2
theils_u_coef = theils_u_series(X, target, sample_weight, n_jobs, handle_na)
theils_u_coef_squared = theils_u_coef**2
with np.errstate(divide="ignore", invalid="ignore"):
f_statistic = (
theils_u_coef_squared / (1 - theils_u_coef_squared) * deg_of_freedom
)
if force_finite and not np.isfinite(f_statistic).all():
# case where there is a perfect (anti-)correlation
# f-statistics can be set to the maximum and p-values to zero
mask_inf = np.isinf(f_statistic)
f_statistic[mask_inf] = np.finfo(f_statistic.dtype).max
# case where the target or some features are constant
# f-statistics would be minimum and thus p-values large
mask_nan = np.isnan(f_statistic)
f_statistic[mask_nan] = 0.0
return f_statistic.drop(labels=[target]).sort_values(ascending=False)
[docs]def f_stat_classification_parallel(
X, y, sample_weight=None, n_jobs=1, force_finite=True, handle_na="drop"
):
"""
Compute the weighted ANOVA F-value for the provided categorical and numerical predictors using parallelization.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The predictor dataframe.
y : array-like of shape (n_samples,)
The target vector.
sample_weight : array-like of shape (n_samples,), optional
The weight vector, by default None.
n_jobs : int, optional
The number of cores to use for the computation, by default 1.
handle_na : str, optional
Either drop rows with NA, fill NA with 0, or do nothing, by default "drop".
force_finite : bool, optional
Whether or not to force the F-statistics and associated p-values to be finite.
There are two cases where the F-statistic is expected to not be finite:
- When the target `y` or some features in `X` are constant. In this case,
the Pearson's R correlation is not defined leading to obtain `np.nan`
values in the F-statistic and p-value. When `force_finite=True`, the
F-statistic is set to `0.0` and the associated p-value is set to `1.0`.
- When a feature in `X` is perfectly correlated (or anti-correlated)
with the target `y`. In this case, the F-statistic is expected to be `np.inf`.
When `force_finite=True`, the F-statistic is set to `np.finfo(dtype).max`.
Returns
-------
pd.Series
The value of the F-statistic for each predictor.
"""
f_stat_cont_series = f_cont_classification_parallel(
X, y, sample_weight, n_jobs, handle_na=handle_na
)
f_stat_cat_series = f_cat_classification_parallel(
X,
y,
sample_weight,
n_jobs,
handle_na=handle_na,
force_finite=force_finite,
)
# normalize the scores
# correlation ratio varies in the range [0, 1]
# Cramer's V varies in the range [0, 1]
# Theil's U varies in the range [0, 1]
# F-statistic varies in the range [0, +inf]
# Both kind of F stat are not necessarily on the same scale
# one can studentize them
if X.shape[1] > 1:
f_stat_cont_series = (
f_stat_cont_series - f_stat_cont_series.mean()
) / f_stat_cont_series.std()
f_stat_cat_series = (
f_stat_cat_series - f_stat_cat_series.mean()
) / f_stat_cat_series.std()
return pd.concat([f_stat_cont_series, f_stat_cat_series]).sort_values(
ascending=False
)
############
# Utilities
############
[docs]def is_list_of_str(str_list):
"""Raise an exception if ``str_list`` is not a list of strings
Parameters
----------
str_list : list
to list to be tested
Raises
------
TypeError
if ``str_list`` is not a ``list[str]``
"""
if str_list is not None:
if not (
isinstance(str_list, list) and all(isinstance(s, str) for s in str_list)
):
return False
else:
return True
[docs]def matrix_to_xy(df, columns=None, reset_index=False):
"""matrix_to_xy wide to long format of the association matrix
Parameters
----------
df : pd.DataFrame
the wide format of the association matrix
columns : list of str, optional
list of column names, by default None
reset_index : bool, optional
wether to reset_index or not, by default False
Returns
-------
pd.DataFrame
the long format of the association matrix
"""
bool_index = np.tril(np.ones(df.shape), 0).astype(bool)
xy = (
df.where(bool_index).stack().reset_index()
if reset_index
else df.where(bool_index).stack()
)
if reset_index:
xy.columns = columns or ["row", "col", "val"]
return xy
[docs]def xy_to_matrix(xy):
"""xy_to_matrix long to wide format of the association matrix
Parameters
----------
xy : pd.DataFrame
the long format of the association matrix, 3 columns.
Returns
-------
pd.DataFrame
"""
xy = xy.pivot(index="row", columns="col").fillna(0)
xy.columns = xy.columns.droplevel(0)
return xy.rename_axis(None, axis=1).rename_axis(None, axis=0)
###############
# visualization
###############
[docs]def cluster_sq_matrix(sq_matrix, method="ward"):
"""
Apply agglomerative clustering to sort a square correlation matrix.
Parameters
----------
sq_matrix : pd.DataFrame
A square correlation matrix.
method : str, optional
The linkage method, by default "ward".
Returns
-------
pd.DataFrame
A sorted square matrix.
Example
-------
>>> from some_module import association_matrix, cluster_sq_matrix
>>> assoc = association_matrix(iris_df, plot=False)
>>> assoc_clustered = cluster_sq_matrix(assoc, method="complete")
"""
d = sch.distance.pdist(sq_matrix.values)
L = sch.linkage(d, method=method)
ind = sch.fcluster(L, 0.5 * d.max(), "distance")
columns = [sq_matrix.columns.tolist()[i] for i in list((np.argsort(ind)))]
sq_matrix = sq_matrix.reindex(columns, axis=1)
sq_matrix = sq_matrix.reindex(columns, axis=0)
return sq_matrix
[docs]def heatmap(
data, row_labels, col_labels, ax=None, cbar_kw=None, cbarlabel="", **kwargs
):
"""heatmap Create a heatmap from a numpy array and two lists of labels.
Parameters
----------
data : array-like of shape (M, N)
matrix to plot
row_labels : array-like of shape (M,)
labels for the rows
col_labels : array-like of shape (N,)
labels for the columns
ax : matplotlib.axes.Axes, optional
A `matplotlib.axes.Axes` instance to which the heatmap is plotted. If
not provided, use current axes or create a new one, by default None
cbar_kw : dict, optional
A dictionary with arguments to `matplotlib.Figure.colorbar`, by default None
cbarlabel : str, optional
The label for the colorbar, by default ""
kwargs : dict, optional
All other arguments are forwarded to `imshow`.
Returns
-------
tuple
imgshow and cbar objects
"""
if ax is None:
ax = plt.gca()
if cbar_kw is None:
cbar_kw = {}
# Plot the heatmap
im = ax.imshow(data, **kwargs)
# Create colorbar
divider = make_axes_locatable(ax)
ax_cb = divider.append_axes("right", size="5%", pad=0.05)
fig = ax.get_figure()
fig.add_axes(ax_cb)
cbar = ax.figure.colorbar(im, cax=ax_cb, **cbar_kw)
cbar.ax.set_ylabel(cbarlabel, rotation=-90, va="bottom")
# Show all ticks and label them with the respective list entries.
ax.set_xticks(np.arange(data.shape[1]), labels=col_labels)
ax.set_yticks(np.arange(data.shape[0]), labels=row_labels)
# Let the horizontal axes labeling appear on top.
ax.tick_params(top=False, bottom=True, labeltop=False, labelbottom=True)
# Rotate the tick labels and set their alignment.
plt.setp(ax.get_xticklabels(), rotation=90, ha="right", rotation_mode="anchor")
# Turn spines off and create white grid.
ax.spines[:].set_visible(False)
ax.set_xticks(np.arange(data.shape[1] + 1) - 0.5, minor=True)
ax.set_yticks(np.arange(data.shape[0] + 1) - 0.5, minor=True)
ax.grid(which="minor", color="w", linestyle="-", linewidth=2)
ax.tick_params(which="minor", bottom=False, left=False)
return im, cbar
[docs]def annotate_heatmap(
im,
data=None,
valfmt="{x:.2f}",
textcolors=("black", "white"),
threshold=None,
**textkw,
):
"""annotate_heatmap annotates a heatmap
Parameters
----------
im : matplotlib.axes.Axes
The AxesImage to be labeled
data : array-like of shape (M, N), optional
data to illustrate, if none is provided the function retrieves
the array of the mlp obkect, by default None
valfmt : str, optional
annotation formating, by default "{x:.2f}"
textcolors : tuple, optional
A pair of colors. The first is used for values below a threshold,
the second for those above, by default ("black", "white")
threshold : float, optional
Value in data units according to which the colors from textcolors are
applied. If None (the default) uses the middle of the colormap as
separation, by default None
textkw : dict, optional
All other arguments are forwarded to mpl annotation.
Returns
-------
_type_
_description_
"""
if not isinstance(data, (list, np.ndarray)):
data = im.get_array()
# Normalize the threshold to the images color range.
if threshold is not None:
threshold = im.norm(threshold)
else:
threshold = im.norm(data.max()) / 2.0
# Set default alignment to center, but allow it to be
# overwritten by textkw.
kw = dict(horizontalalignment="center", verticalalignment="center")
kw.update(textkw)
# Get the formatter in case a string is supplied
if isinstance(valfmt, str):
valfmt = matplotlib.ticker.StrMethodFormatter(valfmt)
# Loop over the data and create a `Text` for each "pixel".
# Change the text's color depending on the data.
texts = []
for i in range(data.shape[0]):
for j in range(data.shape[1]):
kw.update(color=textcolors[int(im.norm(data[i, j]) > threshold)])
text = im.axes.text(j, i, valfmt(data[i, j], None), **kw)
texts.append(text)
return texts
[docs]def plot_association_matrix(
assoc_mat,
suffix_dic=None,
ax=None,
cmap="PuOr",
cbarlabel=None,
figsize=None,
show=True,
cbar_kw=None,
imgshow_kw=None,
annotate=False,
):
"""plot_association_matrix renders the sorted associations/correlation matrix.
The sorting is done using hierarchical clustering,
very like in seaborn or other packages.
Categorical(nom): uncertainty coefficient & correlation ratio from 0 to 1.
The uncertainty coefficient is asymmetrical, (approximating how much the elements on the
left PROVIDE INFORMATION on elements in the row). Continuous(con): symmetrical numerical
correlations (Spearman's) from -1 to 1
Parameters
----------
assoc_mat : pd.DataFrame
the square association frame
suffix_dic : Dict[str, str], optional
dictionary of data type for adding suffixes to column names
in the plotting utility for association matrix, by default None
ax : matplotlib.axes.Axes, optional
_description_, by default None
cmap : str, optional
the colormap. Please use a scientific colormap. See the ``scicomap`` package, by default "PuOr"
cbarlabel : str, optional
the colorbar label, by default None
figsize : Tuple[float, float], optional
figure size in inches, by default None
show : bool, optional
Whether or not to display the figure, by default True
cbar_kw : Dict, optional
colorbar kwargs, by default None
imgshow_kw : Dict, optional
imgshow kwargs, by default None
annotate : bool
Whether to annotate or not the colormap
Returns
-------
matplotlib.figure and matplotlib.axes.Axes
the figure and the axes
"""
# default size if None
if figsize is None:
ncol = len(assoc_mat)
figsize = (ncol / 2.5, ncol / 2.5)
# provide default to the figure
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig = ax.get_figure()
assoc_mat = cluster_sq_matrix(assoc_mat)
# provide default to imshow
if imgshow_kw is None:
imgshow_kw = {"vmin": -1, "vmax": 1}
# provide default to the colorbar
if cbar_kw is None:
cbar_kw = {"ticks": [-1, -0.5, 0, 0.5, 1]}
# rename the columns for keeping track of num vs cat columns
if suffix_dic is not None:
rename_dic = {c: f"{c}_{suffix_dic[c]}" for c in assoc_mat.columns}
assoc_mat = assoc_mat.rename(columns=rename_dic)
assoc_mat = assoc_mat.rename(index=rename_dic)
im, cbar = heatmap(
assoc_mat.values,
assoc_mat.columns,
assoc_mat.columns,
ax=ax,
cmap=cmap,
cbarlabel=cbarlabel,
cbar_kw=cbar_kw,
**imgshow_kw,
)
if annotate:
texts = annotate_heatmap(im, valfmt="{x:.1f}", textcolors=("white", "black"))
fig.tight_layout()
if show:
plt.show()
return fig, ax
[docs]def plot_association_matrix_int(
assoc_mat, suffix_dic=None, cmap="PuOr", figsize=(800, 600), cluster_matrix=True
):
"""Plot the interactive sorted associations/correlation matrix.
The sorting is done using hierarchical clustering,
very like in seaborn or other packages.
Categorical(nom): uncertainty coefficient & correlation ratio from 0 to 1.
The uncertainty coefficient is assymmetrical, (approximating how much the elements on the
left PROVIDE INFORMATION on elements in the row). Continuous(con): symmetrical numerical
correlations (Spearman's) from -1 to 1
Parameters
----------
assoc_mat : pd.DataFrame
the square association frame
suffix_dic : Dict[str, str], optional
dictionary of data type for adding suffixes to column names
in the plotting utility for association matrix, by default None
cmap : str, optional
the colormap. Please use a scientific colormap. See the ``scicomap`` package, by default "PuOr"
figsize : Tuple[float, float], optional
figure size in inches, by default None
cluster_matrix : bool
whether or not to cluster the square matrix, by default True
Returns
-------
panel.Column
the panel object
"""
try:
import holoviews as hv
except ImportError:
raise ImportError(
"Holoviews is not installed. Please install it using 'pip install holoviews'."
)
try:
import panel as pn
except ImportError:
raise ImportError(
"Panel is not installed. Please install it using 'pip install panel'."
)
cmap = cmap if cmap is not None else "coolwarm"
# rename the columns for keeping track of num vs cat columns
if suffix_dic is not None:
rename_dic = {c: f"{c}_{suffix_dic[c]}" for c in assoc_mat.columns}
assoc_mat = assoc_mat.rename(columns=rename_dic)
assoc_mat = assoc_mat.rename(index=rename_dic)
if cluster_matrix:
assoc_mat = cluster_sq_matrix(assoc_mat)
heatmap = hv.HeatMap((assoc_mat.columns, assoc_mat.index, assoc_mat)).redim.range(
z=(-1, 1)
)
heatmap.opts(
tools=["tap", "hover"],
height=figsize[1],
width=figsize[0],
toolbar="left",
colorbar=True,
cmap=cmap,
fontsize={"title": 12, "ticks": 12, "minor_ticks": 12},
xrotation=90,
invert_xaxis=False,
invert_yaxis=True,
xlabel="",
ylabel="",
)
title_str = "**Continuous (con) and Categorical (nom) Associations **"
sub_title_str = (
"*Categorical(nom): uncertainty coefficient & correlation ratio from 0 to 1. The uncertainty "
"coefficient is asymmetrical, (approximating how much the elements on the "
"left PROVIDE INFORMATION on elements in the row). Continuous(con): symmetrical numerical "
"correlations (Spearman's) from -1 to 1*"
)
panel_layout = pn.Column(
pn.pane.Markdown(title_str, align="start", style={"color": "#575757"}), # bold
pn.pane.Markdown(
sub_title_str, align="start", style={"color": "#575757"}
), # italic
heatmap,
background="#ebebeb",
)
gc.enable()
del assoc_mat
gc.collect()
return panel_layout