Source code for pm4py.algo.clustering.trace_attribute_driven.variants.logslice_dist

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import numpy as np
from scipy.spatial.distance import pdist
from pm4py.statistics.attributes.log import get as attributes_filter
from pm4py.util import constants, pandas_utils
from pm4py.algo.discovery.dfg.variants import native
import pandas as pd
from pm4py.algo.clustering.trace_attribute_driven.util import filter_subsets
from pm4py.algo.clustering.trace_attribute_driven.variants import act_dist_calc


[docs] def log2sublog(log, str): tracefilter_log = filter_subsets.apply_trace_attributes( log, [str], parameters={ constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY: "AMOUNT_REQ", "positive": True, }, ) return tracefilter_log
[docs] def slice_dist_suc(log_1, log_2, unit): (log1_list, freq1_list) = filter_subsets.logslice_percent(log_1, unit) (log2_list, freq2_list) = filter_subsets.logslice_percent(log_2, unit) if len(freq1_list) >= len(freq2_list): max_len = len(freq1_list) min_len = len(freq2_list) max_log = log1_list min_log = log2_list var_count_max = freq1_list var_count_min = freq2_list else: max_len = len(freq2_list) min_len = len(freq1_list) max_log = log2_list min_log = log1_list var_count_max = freq2_list var_count_min = freq1_list dist_matrix = np.zeros((max_len, min_len)) max_per_var = np.zeros(max_len) max_freq = np.zeros(max_len) min_freq = np.zeros(min_len) min_per_var = np.zeros(min_len) index_rec = set(list(range(min_len))) if log1_list == log2_list: print("Please give different variant lists!") dist = 0 else: for i in range(max_len): dist_vec = np.zeros(min_len) dfg1 = native.apply(max_log[i]) df1_dfg = act_dist_calc.occu_var_act(dfg1) for j in range(min_len): dfg2 = native.apply(min_log[j]) df2_dfg = act_dist_calc.occu_var_act(dfg2) df_dfg = pandas_utils.merge( df1_dfg, df2_dfg, how="outer", on="var" ).fillna(0) dist_vec[j] = pdist( np.array( [df_dfg["freq_x"].values, df_dfg["freq_y"].values] ), "cosine", )[0] dist_matrix[i][j] = dist_vec[j] if j == (min_len - 1): max_loc_col = np.argmin(dist_vec) if abs(dist_vec[max_loc_col]) <= 1e-8: index_rec.discard(max_loc_col) max_freq[i] = ( var_count_max[i] * var_count_min[max_loc_col] * 2 ) max_per_var[i] = ( dist_vec[max_loc_col] * max_freq[i] * 2 ) else: max_freq[i] = ( var_count_max[i] * var_count_min[max_loc_col] ) max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] if len(index_rec) != 0: for i in list(index_rec): min_loc_row = np.argmin(dist_matrix[:, i]) min_freq[i] = var_count_max[min_loc_row] * var_count_min[i] min_per_var[i] = dist_matrix[min_loc_row, i] * min_freq[i] dist = (np.sum(max_per_var) + np.sum(min_per_var)) / ( np.sum(max_freq) + np.sum(min_freq) ) return dist
[docs] def slice_dist_act(log_1, log_2, unit, parameters=None): (log1_list, freq1_list) = filter_subsets.logslice_percent(log_1, unit) (log2_list, freq2_list) = filter_subsets.logslice_percent(log_2, unit) if len(freq1_list) >= len(freq2_list): max_len = len(freq1_list) min_len = len(freq2_list) max_log = log1_list min_log = log2_list var_count_max = freq1_list var_count_min = freq2_list else: max_len = len(freq2_list) min_len = len(freq1_list) max_log = log2_list min_log = log1_list var_count_max = freq2_list var_count_min = freq1_list dist_matrix = np.zeros((max_len, min_len)) max_per_var = np.zeros(max_len) max_freq = np.zeros(max_len) min_freq = np.zeros(min_len) min_per_var = np.zeros(min_len) index_rec = set(list(range(min_len))) if log1_list == log2_list: print("Please give different variant lists!") dist = 0 else: for i in range(max_len): dist_vec = np.zeros(min_len) act1 = attributes_filter.get_attribute_values( max_log[i], "concept:name" ) df1_act = act_dist_calc.occu_var_act(act1) for j in range(min_len): act2 = attributes_filter.get_attribute_values( min_log[j], "concept:name" ) df2_act = act_dist_calc.occu_var_act(act2) df_act = pandas_utils.merge( df1_act, df2_act, how="outer", on="var" ).fillna(0) dist_vec[j] = pdist( np.array( [df_act["freq_x"].values, df_act["freq_y"].values] ), "cosine", )[0] dist_matrix[i][j] = dist_vec[j] if j == (min_len - 1): max_loc_col = np.argmin(dist_vec) if abs(dist_vec[max_loc_col]) <= 1e-8: index_rec.discard(max_loc_col) max_freq[i] = ( var_count_max[i] * var_count_min[max_loc_col] * 2 ) max_per_var[i] = ( dist_vec[max_loc_col] * max_freq[i] * 2 ) else: max_freq[i] = ( var_count_max[i] * var_count_min[max_loc_col] ) max_per_var[i] = dist_vec[max_loc_col] * max_freq[i] if len(index_rec) != 0: for i in list(index_rec): min_loc_row = np.argmin(dist_matrix[:, i]) min_freq[i] = var_count_max[min_loc_row] * var_count_min[i] min_per_var[i] = dist_matrix[min_loc_row, i] * min_freq[i] dist = (np.sum(max_per_var) + np.sum(min_per_var)) / ( np.sum(max_freq) + np.sum(min_freq) ) return dist