Source code for arfs.parallel

"""Parallelize Pandas

This module provides utilities for parallelizing operations on pd.DataFrame

Module Structure:
-----------------
- ``parallel_matrix_entries`` for parallelizing operations returning a matrix (2D) (apply on pairs of columns)
- ``parallel_df`` for parallelizing operations returning a series (1D) (apply on a single column at a time)
"""

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from multiprocessing import cpu_count
from itertools import chain


[docs]def parallel_matrix_entries(func, df, comb_list, sample_weight=None, n_jobs=-1): """parallel_matrix_entries applies a function to each chunk of combination of columns of the dataframe, distributed by cores. This is similar to https://github.com/smazzanti/mrmr/mrmr/pandas.py Parameters ---------- func : callable function to be applied to each pair of columns in comb_list df : pd.DataFrame the dataframe on which to apply the function comb_list : list of tuples of str Pairs of column names corresponding to the entries sample_weight : pd.Series or np.array, optional The weight vector, if any, of shape (n_samples,), by default None n_jobs : int, optional the number of cores to use for the computation, by default -1 Returns ------- pd.DataFrame concatenated results into a single pandas DF """ # Determining the number of jobs n_jobs = cpu_count() if n_jobs == -1 else min(cpu_count(), n_jobs) if n_jobs == 1: lst = func(X=df, sample_weight=sample_weight, comb_list=comb_list) return pd.concat(lst, ignore_index=True).sort_values("val", ascending=False) comb_chunks = np.array_split(comb_list, n_jobs) lst = Parallel(n_jobs=n_jobs)( delayed(func)(X=df, sample_weight=sample_weight, comb_list=comb_chunk) for comb_chunk in comb_chunks ) # Directly return the single DataFrame if lst contains only one element if len(lst) == 1: return lst[0] else: return pd.concat(list(chain(*lst)), ignore_index=True)
[docs]def parallel_df(func, df, series, sample_weight=None, n_jobs=-1): """parallel_df apply a function to each column of the dataframe, distributed by cores. This is similar to https://github.com/smazzanti/mrmr/mrmr/pandas.py Parameters ---------- func : callable function to be applied to each column df : pd.DataFrame the dataframe on which to apply the function series : pd.Series series (target) used by the function sample_weight : pd.Series or np.array, optional The weight vector, if any, of shape (n_samples,), by default None n_jobs : int, optional the number of cores to use for the computation, by default -1 Returns ------- pd.DataFrame concatenated results into a single pandas DF """ # Determining the number of jobs n_jobs = cpu_count() if n_jobs == -1 else min(cpu_count(), n_jobs) if n_jobs == 1: lst = func(df, series, sample_weight).sort_values(ascending=False) return ( pd.concat(lst, ignore_index=True).sort_values("val", ascending=False) if isinstance(lst, list) else lst ) else: col_chunks = np.array_split(range(len(df.columns)), n_jobs) lst = Parallel(n_jobs=n_jobs)( delayed(func)(df.iloc[:, col_chunk], series, sample_weight) for col_chunk in col_chunks ) return pd.concat(lst).sort_values(ascending=False)
[docs]def _compute_series( X, y, sample_weight=None, func_xyw=None, ): """_compute_series is a utility function for computing the series resulting of the ``apply`` Parameters ---------- X : pd.DataFrame, of shape (n_samples, n_features) The set of regressors that will be tested sequentially y : pd.Series or np.array, of shape (n_samples,) The target vector sample_weight : pd.Series or np.array, of shape (n_samples,), optional The weight vector, if any, by default None func_xyw : callable, optional callable (function) for computing the individual elements of the series takes two mandatory inputs (x and y) and an optional input w, sample_weights """ def _closure_compute_series(x, y, sample_weight): x_not_na = ~x.isna() if x_not_na.sum() == 0: return 0 return func_xyw( x=x[x_not_na], y=y[x_not_na], sample_weight=sample_weight[x_not_na], as_frame=False, ) return X.apply( lambda col: _closure_compute_series(x=col, y=y, sample_weight=sample_weight) ).fillna(0.0)
[docs]def _compute_matrix_entries( X, comb_list, sample_weight=None, func_xyw=None, ): """base closure for computing matrix entries applying a function to each chunk of combination of columns of the dataframe, distributed by cores. This is similar to https://github.com/smazzanti/mrmr/mrmr/pandas.py Parameters ---------- X : pd.DataFrame, of shape (n_samples, n_features) The set of regressors that will be tested sequentially sample_weight : pd.Series or np.array, of shape (n_samples,), optional The weight vector, if any, by default None func_xyw : callable, optional callable (function) for computing the individual elements of the matrix takes two mandatory inputs (x and y) and an optional input w, sample_weights comb_list : list of 2-tuple of str Pairs of column names corresponding to the entries Returns ------- List[pd.DataFrame] a list of partial dfs to be concatenated """ v_df_list = [ func_xyw(x=X[comb[0]], y=X[comb[1]], sample_weight=sample_weight, as_frame=True) for comb in comb_list ] return v_df_list