Source code for pm4py.statistics.process_cube.variants.classic

import pandas as pd
import numpy as np
from enum import Enum
from typing import Optional, Dict, Any
from pm4py.util import exec_utils


[docs] class Parameters(Enum): MAX_DIVISIONS_X = "max_divisions_x" MAX_DIVISIONS_Y = "max_divisions_y" AGGREGATION_FUNCTION = "aggregation_function"
[docs] def apply( feature_table: pd.DataFrame, x_col: str, y_col: str, agg_col: str, parameters: Optional[Dict[Any, Any]] = None ): """ Constructs a process cube by slicing data along two dimensions (x_col, y_col) and aggregating a third (agg_col). Additionally: 1) If x_col (or y_col) is an actual column in df, we do numeric binning. Otherwise, we do 'prefix-based' binning (include any column starting with x_col, and assign a row to that bin if >= 1). 2) We return both the pivoted DataFrame and a dict associating each cell (x_bin, y_bin) -> set of case IDs. Parameters ---------- feature_table : pd.DataFrame A feature table that must contain 'case:concept:name' and agg_col, plus the columns for x_col, y_col (if in numeric mode) or the columns that start with x_col, y_col (if in prefix mode). x_col : str The X dimension. If x_col in df.columns, use numeric binning. Otherwise, treat it as a prefix for 'prefix-based' binning. y_col : str The Y dimension. If y_col in df.columns, use numeric binning. Otherwise, treat it as a prefix for 'prefix-based' binning. agg_col : str The column to aggregate (mean, sum, etc.). parameters: Dict[Any, Any] Optional parameters of the method, including: * Parameters.MAX_DIVISIONS_X: If x_col is numeric, how many bins to divide it into. * Parameters.MAX_DIVISIONS_Y: If y_col is numeric, how many bins to divide it into. * Parameters.AGGREGATION_FUNCTION: The aggregation function, e.g., 'mean', 'sum', 'min', 'max'. Returns ------- pivot_df : pd.DataFrame A pivoted DataFrame representing the process cube, with x bins as rows and y bins as columns, containing aggregated values of agg_col. cell_case_dict : dict A dictionary mapping (x_bin, y_bin) -> set of case IDs that fall in that cell. """ if parameters is None: parameters = {} max_divisions_x = exec_utils.get_param_value(Parameters.MAX_DIVISIONS_X, parameters, 4) max_divisions_y = exec_utils.get_param_value(Parameters.MAX_DIVISIONS_Y, parameters, 4) agg_fn = exec_utils.get_param_value(Parameters.AGGREGATION_FUNCTION, parameters, "mean") df = feature_table # ------------------------------------------------------ # 1) Determine if X is numeric-based or prefix-based # ------------------------------------------------------ if x_col in df.columns: numeric_x = True # Create numeric bins for x_col x_min, x_max = df[x_col].min(), df[x_col].max() x_bins = np.linspace(x_min, x_max, max_divisions_x + 1) # Use pd.cut to assign each row exactly one x_bin df["__x_bin_tmp__"] = pd.cut(df[x_col], bins=x_bins, include_lowest=True) else: numeric_x = False # Gather all columns that start with x_col x_prefix_cols = [c for c in df.columns if c.startswith(x_col)] # ------------------------------------------------------ # 2) Determine if Y is numeric-based or prefix-based # ------------------------------------------------------ if y_col in df.columns: numeric_y = True # Create numeric bins for y_col y_min, y_max = df[y_col].min(), df[y_col].max() y_bins = np.linspace(y_min, y_max, max_divisions_y + 1) # Use pd.cut to assign each row exactly one y_bin df["__y_bin_tmp__"] = pd.cut(df[y_col], bins=y_bins, include_lowest=True) else: numeric_y = False y_prefix_cols = [c for c in df.columns if c.startswith(y_col)] # We will build an intermediate "long" table with columns: # [case:concept:name, x_bin, y_bin, agg_col] records = [] # ------------------------------------------------------ # 3) Iterate over rows to assign them to bin(s) # - For numeric bin: exactly 1 bin # - For prefix-based bin: possibly multiple if >= 1 # ------------------------------------------------------ for idx, row in df.iterrows(): case_id = row["case:concept:name"] agg_value = row[agg_col] # A) Determine x_bins for this row if numeric_x: # Exactly one bin from the tmp column x_bin_value = row["__x_bin_tmp__"] # Interval, or NaN if out of range if pd.isna(x_bin_value): # If for some reason it's NaN, skip continue x_bin_list = [x_bin_value] else: # prefix-based # collect all columns that start with x_col where row[column] >= 1 x_bin_list = [] for colname in x_prefix_cols: val = row[colname] if pd.notna(val) and val >= 1: # we treat the 'bin' as the column name itself x_bin_list.append(colname) # If none apply, skip this row entirely (no X membership) if len(x_bin_list) == 0: continue # B) Determine y_bins for this row if numeric_y: y_bin_value = row["__y_bin_tmp__"] if pd.isna(y_bin_value): continue y_bin_list = [y_bin_value] else: # prefix-based y_bin_list = [] for colname in y_prefix_cols: val = row[colname] if pd.notna(val) and val >= 1: y_bin_list.append(colname) if len(y_bin_list) == 0: continue # C) Add cross-product of x_bin_list and y_bin_list to 'records' for xb in x_bin_list: for yb in y_bin_list: records.append((case_id, xb, yb, agg_value)) # ------------------------------------------------------ # 4) Create a temp DataFrame from these records # ------------------------------------------------------ temp_df = pd.DataFrame(records, columns=["case:concept:name", "x_bin", "y_bin", agg_col]) # If nothing ended up in records (e.g., no membership), we can return empty quickly if len(temp_df) == 0: empty_pivot = pd.DataFrame() return empty_pivot, {} # ------------------------------------------------------ # 5) Perform the group-by aggregator # ------------------------------------------------------ agg_df = temp_df.groupby(["x_bin", "y_bin"])[agg_col].agg(agg_fn).reset_index() # Create also a group-by for case IDs cases_df = temp_df.groupby(["x_bin", "y_bin"])["case:concept:name"] \ .agg(lambda x: set(x)).reset_index() cases_df.rename(columns={"case:concept:name": "case_set"}, inplace=True) # Merge aggregator results and case sets merged_df = pd.merge(agg_df, cases_df, on=["x_bin", "y_bin"], how="outer") # ------------------------------------------------------ # 6) Pivot to get a matrix with: # - Rows = x_bin # - Columns = y_bin # - Values = aggregator of agg_col # ------------------------------------------------------ pivot_df = merged_df.pivot(index="x_bin", columns="y_bin", values=agg_col) # ------------------------------------------------------ # 7) Remove rows and columns that are completely empty (all NaN) # ------------------------------------------------------ pivot_df = pivot_df.dropna(how="all", axis=0) # drop any row that's all NaN pivot_df = pivot_df.dropna(how="all", axis=1) # drop any column that's all NaN # ------------------------------------------------------ # 8) Build a dictionary (x_bin, y_bin) -> set of case IDs # ------------------------------------------------------ cell_case_dict = {} for _, row_ in merged_df.iterrows(): xb = row_["x_bin"] yb = row_["y_bin"] # The aggregator row might or might not be in the final pivot if it's all-NaN, # but we still create the dictionary entry. We'll filter out if needed. case_set = row_["case_set"] cell_case_dict[(xb, yb)] = case_set # Optionally remove keys from cell_case_dict that did not appear in the final pivot # (i.e., rows/cols dropped as all-NaN). One approach: valid_x_bins = set(pivot_df.index) valid_y_bins = set(pivot_df.columns) # keep only cells that remain in pivot cell_case_dict = { (xb, yb): s for (xb, yb), s in cell_case_dict.items() if (xb in valid_x_bins) and (yb in valid_y_bins) } # ------------------------------------------------------ # 9) (Optional) Clean up temporary columns if numeric approach used # ------------------------------------------------------ if numeric_x: df.drop(columns=["__x_bin_tmp__"], inplace=True) if numeric_y: df.drop(columns=["__y_bin_tmp__"], inplace=True) return pivot_df, cell_case_dict