Source code for arfs.sampling

"""This module provide methods for sampling large datasets for reducing the running time
"""
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from collections import Counter
from sklearn.cluster import AgglomerativeClustering
from sklearn.ensemble import IsolationForest
from scipy.stats import ks_2samp
from .utils import is_list_of_str, is_list_of_bool, is_list_of_int


[docs]def sample(df, n=1000, sample_weight=None, method="gower"): """Sampling rows from a dataframe when random sampling is not enough for reducing the number of rows. The strategies are either using hierarchical clustering based on the Gower distance or using isolation forest for identifying the most similar elements. For the clustering algorithm, clusters are determined using the Gower distance (mixed type data) and the dataset is shrunk from n_samples to n_clusters. For the isolation forest algorithm, samples are added till a sufficient 2-samples KS statistics is reached or if the number iteration reached the max number (20) Parameters ---------- df : pd.DataFrame the dataframe to sample, with or without the target n : int, optional the number of clusters if method is ``"gower"``, by default 100 sample_weight : pd.Series or np.array, optional sample weights, by default None method : str, optional the strategy to use for sampling the rows. Either ``"gower"`` or ``"isoforest"``, by default 'gower' Returns ------- pd.DataFrame the sampled dataframe """ assert isinstance(df, pd.DataFrame), "X should be a DataFrame" X = df.copy() num_cols = list(X.select_dtypes(include=[np.number])) non_num_cols = list(set(list(X.columns)) - set(num_cols)) if method == "gower": # basic imputation if non_num_cols: X[non_num_cols] = X[non_num_cols].fillna(X[non_num_cols].mode().iloc[0]) if num_cols: X[num_cols] = X[num_cols].fillna(X[num_cols].mean().iloc[0]) # no need for scaling, it is built-in the computation of the Gower distance gd = gower_matrix(X, cat_features=non_num_cols, weight=sample_weight) labels = AgglomerativeClustering( n_clusters=n, metric="precomputed", linkage="complete" ).fit_predict(gd) X["label"] = labels X["label"] = "clus_" + X["label"].astype(str) X_num = X.groupby("label")[num_cols].agg("mean") if non_num_cols: X_nonnum = X.groupby("label")[non_num_cols].agg(get_most_common) X_sampled = X_num.join(X_nonnum) else: X_sampled = X_num X_sampled = X_sampled.reindex(X.columns, axis=1) return X_sampled elif method == "isoforest": X[non_num_cols] = X[non_num_cols].astype("str").astype("category") for col in non_num_cols: X[col] = X[col].astype("category").cat.codes idx = isof_find_sample(X, sample_weight=None) return X.iloc[idx, :] else: NotImplementedError(f"{method} not implemented")
[docs]def get_most_common(srs): x = list(srs) my_counter = Counter(x) return my_counter.most_common(1)[0][0]
[docs]def gower_matrix( data_x, data_y=None, weight=None, cat_features="auto", ): """Computes the gower distances between X and Y Gower is a similarity measure for categorical, boolean and numerical mixed data. Parameters ---------- data_x : np.array or pd.DataFrame The data for computing the Gower distance data_y : np.array or pd.DataFrame or pd.Series, optional The reference matrix or vector to compare with, optional weight : np.array or pd.Series, optional sample weight, optional cat_features : list of str or bool or int, optional auto-detect cat features or a list of cat features, by default 'auto' Returns ------- np.array The Gower distance matrix, shape (n_samples, n_samples) Notes ----- The non-numeric features, and numeric feature ranges are determined from X and not Y. Raises ------ TypeError If two dataframes are passed but have different number of columns TypeError If two arrays are passed but have different number of columns TypeError Sparse matrices are not supported TypeError if a list of categorical columns is passed, it should be a list of strings or integers or boolean values """ # function checks X = data_x if data_y is None: Y = data_x else: Y = data_y if not isinstance(X, np.ndarray): y_col = Y.columns if isinstance(Y, pd.DataFrame) else Y.index if not np.array_equal(X.columns, y_col): raise TypeError("X and Y must have same columns!") else: if not X.shape[1] == Y.shape[1]: raise TypeError("X and Y must have same y-dim!") if issparse(X) or issparse(Y): raise TypeError("Sparse matrices are not supported!") x_n_rows, x_n_cols = X.shape y_n_rows, y_n_cols = Y.shape if cat_features == "auto": if not isinstance(X, np.ndarray): is_number = np.vectorize(lambda x: not np.issubdtype(x, np.number)) cat_features = is_number(X.dtypes) else: cat_features = np.zeros(x_n_cols, dtype=bool) for col in range(x_n_cols): if not np.issubdtype(type(X[0, col]), np.number): cat_features[col] = True else: # force categorical columns (if integer encoded for instance) if is_list_of_str(cat_features): cat_feat = [True if c in cat_features else False for c in X.columns] cat_features = np.array(cat_feat) elif is_list_of_bool(cat_features): cat_features = np.array(cat_features) elif is_list_of_int(cat_features): cat_feat = [ True if c in cat_features else False for c in range(len(X.columns)) ] cat_features = np.array(cat_feat) else: raise TypeError( "If not 'auto' cat_features should be a list of strings, integers or Booleans" ) # print(cat_features) if not isinstance(X, np.ndarray): X = np.asarray(X) if not isinstance(Y, np.ndarray): Y = np.asarray(Y) Z = np.concatenate((X, Y)) x_index = range(0, x_n_rows) y_index = range(x_n_rows, x_n_rows + y_n_rows) Z_num = Z[:, np.logical_not(cat_features)] num_cols = Z_num.shape[1] num_ranges = np.zeros(num_cols) num_max = np.zeros(num_cols) for col in range(num_cols): col_array = Z_num[:, col].astype(np.float32) max_ = np.nanmax(col_array) min_ = np.nanmin(col_array) if np.isnan(max_): max_ = 0.0 if np.isnan(min_): min_ = 0.0 num_max[col] = max_ num_ranges[col] = (1 - min_ / max_) if (max_ != 0) else 0.0 # This is to normalize the numeric values between 0 and 1. Z_num = np.divide( Z_num.astype(float), num_max.astype(float), out=np.zeros_like(Z_num).astype(float), where=num_max != 0, ) Z_cat = Z[:, cat_features] if weight is None: weight = np.ones(Z.shape[1]) # print(weight) weight_cat = weight[cat_features] weight_num = weight[np.logical_not(cat_features)] out = np.zeros((x_n_rows, y_n_rows), dtype=np.float32) weight_sum = weight.sum() X_cat = Z_cat[x_index,] X_num = Z_num[x_index,] Y_cat = Z_cat[y_index,] Y_num = Z_num[y_index,] # print(X_cat,X_num,Y_cat,Y_num) for i in range(x_n_rows): j_start = i if x_n_rows != y_n_rows: j_start = 0 # call the main function res = _gower_distance_row( X_cat[i, :], X_num[i, :], Y_cat[j_start:y_n_rows, :], Y_num[j_start:y_n_rows, :], weight_cat, weight_num, weight_sum, num_ranges, ) # print(res) out[i, j_start:] = res if x_n_rows == y_n_rows: out[i:, j_start] = res return out
[docs]def _gower_distance_row( xi_cat, xi_num, xj_cat, xj_num, feature_weight_cat, feature_weight_num, feature_weight_sum, ranges_of_numeric, ): """Compute a row of the Gower matrix Parameters ---------- xi_cat : np.array categorical row of the X matrix xi_num : np.array numerical row of the X matrix xj_cat : np.array categorical row of the X matrix xj_num : np.array numerical row of the X matrix feature_weight_cat : np.array weight vector for the categorical features feature_weight_num : np.array weight vector for the numerical features feature_weight_sum : float The sum of the weights ranges_of_numeric : np.array range of the scaled numerical features (between 0 and 1) Returns ------- np.array : array a row vector of the Gower distance """ # categorical columns sij_cat = np.where(xi_cat == xj_cat, np.zeros_like(xi_cat), np.ones_like(xi_cat)) sum_cat = np.multiply(feature_weight_cat, sij_cat).sum(axis=1) # numerical columns abs_delta = np.absolute(xi_num - xj_num) sij_num = np.divide( abs_delta, ranges_of_numeric, out=np.zeros_like(abs_delta), where=ranges_of_numeric != 0, ) sum_num = np.multiply(feature_weight_num, sij_num).sum(axis=1) sums = np.add(sum_cat, sum_num) sum_sij = np.divide(sums, feature_weight_sum) return sum_sij
[docs]def smallest_indices(ary, n): """Returns the n largest indices from a numpy array. Parameters ---------- ary : np.array the array for which to return largest indices n : int the number of indices to return Returns ------- dict the dictionary of indices and values of the largest elements """ # n += 1 flat = np.nan_to_num(ary.flatten(), nan=999) indices = np.argpartition(-flat, -n)[-n:] indices = indices[np.argsort(flat[indices])] # indices = np.delete(indices,0,0) values = flat[indices] return {"index": indices, "values": values}
[docs]def gower_topn( data_x, data_y=None, weight=None, cat_features="auto", n=5, key=None, ): """Get the n most similar elements Parameters ---------- data_x : np.array or pd.DataFrame The data for the look up data_y : np.array or pd.DataFrame or pd.Series, optional elements for which to return the most similar elements, should be a single row weight : np.array or pd.Series, optional sample weight, by default None cat_features : list of str or bool or int, optional auto detection of cat features or a list of strings, booleans or integers, by default 'auto' n : int, optional the number of neighbors/similar rows to find, by default 5 key : str, optional identifier key. If several rows refer to the same id, this column will be used for finding the nearest neighbors with a different id, by default None Returns ------- dict the dictionary of indices and values of the closest elements Raises ------ TypeError if the reference element is not a single row """ if data_y.shape[0] >= 2: raise TypeError("Only support `data_y` of 1 row. ") if key is None: dm = gower_matrix(data_y, data_x, weight, cat_features) else: X = data_x.drop(key, axis=1) Y = data_x.drop(key, axis=1) dm = gower_matrix(Y, X, weight, cat_features) if key is not None: idx = smallest_indices(np.nan_to_num(dm[0], nan=1), n)["index"] val = smallest_indices(np.nan_to_num(dm[0], nan=1), n)["values"] unique_id = data_x.iloc[idx, :] unique_id = unique_id[key] nunique_id = unique_id.nunique() mul = 1 # continue looking for the closest n unique records with a different id while nunique_id < n: idx = smallest_indices(np.nan_to_num(dm[0], nan=1), mul * n)["index"] val = smallest_indices(np.nan_to_num(dm[0], nan=1), mul * n)["values"] unique_id = data_x.iloc[idx, :].reset_index() unique_id = unique_id[key] nunique_id = unique_id.nunique() mul += 1 # find the indices of the unique id _, idx_n = np.unique(unique_id, return_index=True) # select only the rows corresponding to unique id val = val[idx_n] idx = idx[idx_n] # sort them from the closest to the farthest, according to the Gower metrics idx_n = np.argsort(val) # return the n closest records, with a different id return {"index": idx[idx_n[:n]], "values": val[idx_n[:n]]} else: return smallest_indices(np.nan_to_num(dm[0], nan=1), n)
[docs]def get_5_percent_splits(length): """splits dataframe into 5% intervals Parameters ---------- length : int array length Returns ------- array vector of sizes """ five_percent = round(5 / 100 * length) return np.arange(five_percent, length, five_percent)
[docs]def isolation_forest(X, sample_weight=None): """fits isolation forest to the dataset and gives an anomaly score to every sample Parameters ---------- X : pd.DataFrame or np.array the predictors matrix sample_weight : pd.Series or np.array, optional the sample weights, if any, by default None """ clf = IsolationForest().fit(X, sample_weight=sample_weight) return clf.score_samples(X)
[docs]def isof_find_sample(X, sample_weight=None): """Finds a sample by comparing the distributions of the anomaly scores between the sample and the original distribution using the KS-test. Starts of a 5% however will increase to 10% and then 15% etc. if a significant sample can not be found References ---------- Sampling method taken from boruta_shap, author: https://github.com/Ekeany Parameters ---------- X : pd.DataFrame the predictors matrix sample_weight : pd.Series or np.array, optional the sample weights, if any, by default None Returns ------- array the indices for reducing the shadow predictors matrix """ loop = True iteration = 0 size = get_5_percent_splits(length=X.shape[0]) element = 1 preds = isolation_forest(X, sample_weight) while loop: sample_indices = np.random.choice( np.arange(preds.size), size=size[element], replace=False ) sample = np.take(preds, sample_indices) if ks_2samp(preds, sample).pvalue > 0.95: break if iteration == 20: element += 1 iteration = 0 return sample_indices