Source code for pm4py.statistics.attributes.common.get

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import json
import logging
import importlib.util

from pm4py.util.points_subset import pick_chosen_points_list
from pm4py.util import exec_utils, pandas_utils, constants
from enum import Enum


[docs] class Parameters(Enum): GRAPH_POINTS = "graph_points" POINT_TO_SAMPLE = "points_to_sample"
[docs] def get_sorted_attributes_list(attributes): """ Gets sorted attributes list Parameters ---------- attributes Dictionary of attributes associated with their count Returns ---------- listact Sorted end attributes list """ listattr = [] for a in attributes: listattr.append([a, attributes[a]]) listattr = sorted(listattr, key=lambda x: x[1], reverse=True) return listattr
[docs] def get_attributes_threshold( alist, decreasing_factor, min_activity_count=1, max_activity_count=25 ): """ Get attributes cutting threshold Parameters ---------- alist Sorted attributes list decreasing_factor Decreasing factor of the algorithm min_activity_count Minimum number of activities to include max_activity_count Maximum number of activities to include Returns --------- threshold Activities cutting threshold """ index = max(0, min(min_activity_count - 1, len(alist) - 1)) threshold = alist[index][1] index = index + 1 for i in range(index, len(alist)): value = alist[i][1] if value > threshold * decreasing_factor: threshold = value if i >= max_activity_count: break return threshold
[docs] def get_kde_numeric_attribute(values, parameters=None): """ Gets the KDE estimation for the distribution of a numeric attribute values Parameters ------------- values Values of the numeric attribute value parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- x X-axis values to represent (including the exact min and max) y Y-axis values to represent """ if importlib.util.find_spec("scipy") and importlib.util.find_spec("numpy"): from scipy.stats import gaussian_kde import numpy as np if parameters is None: parameters = {} graph_points = exec_utils.get_param_value( Parameters.GRAPH_POINTS, parameters, 200 ) values = np.sort(values) # Check if we have enough unique values for KDE unique_values = np.unique(values) if len(unique_values) < 2: # Handle edge case: not enough unique values for KDE if len(unique_values) == 0: # No values at all return [], [] else: # Single unique value - create a simple representation single_val = float(unique_values[0]) # Create a small range around the single value for visualization eps = max(abs(single_val) * 0.01, 1e-6) if single_val != 0 else 1.0 xs = np.linspace(single_val - eps, single_val + eps, graph_points) # Create a spike at the single value ys = np.zeros(graph_points) mid_idx = graph_points // 2 ys[mid_idx] = 1.0 return xs.tolist(), ys.tolist() density = gaussian_kde(values) # ensure we have at least two points for each spacing half = max(int(graph_points // 2), 2) min_val, max_val = values[0], values[-1] eps = 1e-6 # linear space including both endpoints xs1 = np.linspace(min_val, max_val, half, endpoint=True) # geometric space including both endpoints (avoid zero) xs2 = np.geomspace(max(min_val, eps), max_val, half, endpoint=True) # combine, add exact endpoints, dedupe & sort xs = np.unique( np.concatenate([xs1, xs2, [min_val, max_val]]) ) return xs.tolist(), density(xs).tolist() else: msg = "scipy is not available. graphs cannot be built!" logging.error(msg) raise Exception(msg)
[docs] def get_kde_numeric_attribute_json(values, parameters=None): """ Gets the KDE estimation for the distribution of a numeric attribute values (expressed as JSON) Parameters -------------- values Values of the numeric attribute value parameters Possible parameters of the algorithm, including: graph_points: number of points to include in the graph Returns -------------- json JSON representing the graph points """ x, y = get_kde_numeric_attribute(values, parameters=parameters) ret = [] for i in range(len(x)): ret.append((x[i], y[i])) return json.dumps(ret)
[docs] def get_kde_date_attribute(values, parameters=None): """ Gets the KDE estimation for the distribution of a date attribute values Parameters ------------- values Values of the date attribute value parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- x X-axis values to represent y Y-axis values to represent """ if importlib.util.find_spec("scipy") and importlib.util.find_spec("numpy"): from scipy.stats import gaussian_kde import numpy as np import pandas as pd if parameters is None: parameters = {} graph_points = exec_utils.get_param_value( Parameters.GRAPH_POINTS, parameters, 200 ) points_to_sample = exec_utils.get_param_value( Parameters.POINT_TO_SAMPLE, parameters, 400 ) red_values = pick_chosen_points_list(points_to_sample, values, include_extremes=True) int_values = sorted( [x.replace(tzinfo=None).timestamp() for x in red_values] ) # Check if we have enough unique values for KDE unique_int_values = np.unique(int_values) if len(unique_int_values) < 2: # Handle edge case: not enough unique values for KDE if len(unique_int_values) == 0: # No values at all return [[], []] else: # Single unique value - create a simple representation single_val = float(unique_int_values[0]) # Create a small time range around the single value (1 hour range) time_eps = 3600 # 1 hour in seconds xs = np.linspace(single_val - time_eps, single_val + time_eps, graph_points) xs_transf = pd.to_datetime(xs * 10**9, unit="ns") # Create a spike at the single value ys = np.zeros(graph_points) mid_idx = graph_points // 2 ys[mid_idx] = 1.0 return [xs_transf, ys.tolist()] density = gaussian_kde(int_values) xs = np.linspace(min(int_values), max(int_values), graph_points) xs_transf = pd.to_datetime(xs * 10**9, unit="ns") return [xs_transf, density(xs)] else: msg = "scipy is not available. graphs cannot be built!" logging.error(msg) raise Exception(msg)
[docs] def get_kde_date_attribute_json(values, parameters=None): """ Gets the KDE estimation for the distribution of a date attribute values (expressed as JSON) Parameters -------------- values Values of the date attribute value parameters Possible parameters of the algorithm, including: graph_points: number of points to include in the graph Returns -------------- json JSON representing the graph points """ x, y = get_kde_date_attribute(values, parameters=parameters) ret = [] for i in range(len(x)): ret.append((x[i].replace(tzinfo=None).timestamp(), y[i])) return json.dumps(ret)