Source code for pm4py.algo.discovery.log_skeleton.variants.classic

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from collections import Counter
from enum import Enum

from pm4py.algo.discovery.log_skeleton import trace_skel
from pm4py.objects.log.util import xes
from pm4py.util import exec_utils
from pm4py.util import variants_util, pandas_utils
from pm4py.util.constants import (
    PARAMETER_CONSTANT_ACTIVITY_KEY,
    PARAMETER_CONSTANT_CASEID_KEY,
    CASE_CONCEPT_NAME,
)
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog
import pandas as pd


[docs] class Parameters(Enum): # parameter for the noise threshold NOISE_THRESHOLD = "noise_threshold" # considered constraints in conformance checking among: equivalence, # always_after, always_before, never_together, directly_follows, # activ_freq CONSIDERED_CONSTRAINTS = "considered_constraints" # default choice for conformance checking DEFAULT_CONSIDERED_CONSTRAINTS = [ "equivalence", "always_after", "always_before", "never_together", "directly_follows", "activ_freq", ] CASE_ID_KEY = PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = PARAMETER_CONSTANT_ACTIVITY_KEY PARAMETER_VARIANT_DELIMITER = "variant_delimiter"
NOISE_THRESHOLD = Parameters.NOISE_THRESHOLD CONSIDERED_CONSTRAINTS = Parameters.CONSIDERED_CONSTRAINTS DEFAULT_CONSIDERED_CONSTRAINTS = Parameters.DEFAULT_CONSIDERED_CONSTRAINTS ACTIVITY_KEY = Parameters.ACTIVITY_KEY PARAMETER_VARIANT_DELIMITER = Parameters.PARAMETER_VARIANT_DELIMITER
[docs] class Outputs(Enum): EQUIVALENCE = "equivalence" ALWAYS_AFTER = "always_after" ALWAYS_BEFORE = "always_before" NEVER_TOGETHER = "never_together" DIRECTLY_FOLLOWS = "directly_follows" ACTIV_FREQ = "activ_freq"
[docs] def equivalence(logs_traces, all_activs, noise_threshold=0): """ Gets the equivalence relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.equivalence(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set( x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold) ) return ret
[docs] def always_after(logs_traces, all_activs, noise_threshold=0): """ Gets the always-after relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ # logs_traces: Counter mapping each trace‐tuple → frequency # First, for each A, count how many traces have A at all. traces_with_A = Counter() # For each (trace_variant → freq), check if A appears in that variant. for trace_variant, freq in logs_traces.items(): for act in trace_variant: traces_with_A[act] += freq # Next, for each pair (A,B), count how many traces have B after A at least once. traces_with_A_then_B = Counter() for trace_variant, freq in logs_traces.items(): # Build the set of all (A,B) such that B comes after A in this one variant after_pairs = set(trace_skel.after(list(trace_variant))) for (A,B) in after_pairs: traces_with_A_then_B[(A,B)] += freq # Finally, keep only those (A,B) with # traces_with_A_then_B[(A,B)] >= traces_with_A[A] * (1 - noise_threshold) result = set() for (A,B), count_AB in traces_with_A_then_B.items(): if count_AB >= traces_with_A[A] * (1 - noise_threshold): result.add((A,B)) return result
[docs] def always_before(logs_traces, all_activs, noise_threshold=0): """ Gets the always-before relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ traces_with_A = Counter() for trace_variant, freq in logs_traces.items(): for act in trace_variant: traces_with_A[act] += freq traces_with_A_then_B = Counter() for trace_variant, freq in logs_traces.items(): before_pairs = set(trace_skel.before(list(trace_variant))) for (A,B) in before_pairs: traces_with_A_then_B[(A,B)] += freq result = set() for (A,B), count_AB in traces_with_A_then_B.items(): if count_AB >= traces_with_A[A] * (1 - noise_threshold): result.add((A,B)) return result
[docs] def never_together(logs_traces, all_activs, len_log, noise_threshold=0): """ Gets the never-together relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities len_log Length of the log noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ all_combos = set((x, y) for x in all_activs for y in all_activs if x != y) ret0 = Counter() for k in all_combos: ret0[k] = all_activs[k[0]] for trace in logs_traces: rs = Counter(trace_skel.combos(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 -= rs ret = set( x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold) ) return ret
[docs] def directly_follows(logs_traces, all_activs, noise_threshold=0): """ Gets the allowed directly-follows relations given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = Counter() for trace in logs_traces: rs = Counter(trace_skel.directly_follows(list(trace))) for k in rs: rs[k] = rs[k] * logs_traces[trace] ret0 += rs ret = set( x for x, y in ret0.items() if y >= all_activs[x[0]] * (1.0 - noise_threshold) ) return ret
[docs] def activ_freq(logs_traces, all_activs, len_log, noise_threshold=0): """ Gets the allowed activities frequencies given the traces of the log Parameters ------------- logs_traces Traces of the log all_activs All the activities len_log Length of the log noise_threshold Noise threshold Returns -------------- rel List of relations in the log """ ret0 = {} ret = {} for trace in logs_traces: rs = trace_skel.activ_freq(trace) for act in all_activs: if act not in rs: rs[act] = 0 for act in rs: if act not in ret0: ret0[act] = Counter() ret0[act][rs[act]] += logs_traces[trace] for act in ret0: ret0[act] = sorted( list((x, y) for x, y in ret0[act].items()), key=lambda x: x[1], reverse=True, ) added = 0 i = 0 while i < len(ret0[act]): added += ret0[act][i][1] if added >= (1.0 - noise_threshold) * len_log: ret0[act] = ret0[act][: min(i + 1, len(ret0[act]))] i = i + 1 ret[act] = set(x[0] for x in ret0[act]) return ret
[docs] def apply( log: Union[EventLog, pd.DataFrame], parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Dict[str, Any]: """ Discover a log skeleton from an event log Parameters ------------- log Event log parameters Parameters of the algorithm, including: - the activity key (Parameters.ACTIVITY_KEY) - the noise threshold (Parameters.NOISE_THRESHOLD) Returns ------------- model Log skeleton model """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY ) noise_threshold = exec_utils.get_param_value( Parameters.NOISE_THRESHOLD, parameters, 0.0 ) if type(log) is EventLog: logs_traces = Counter([tuple(y[activity_key] for y in x) for x in log]) all_activs = Counter(list(y[activity_key] for x in log for y in x)) elif pandas_utils.check_is_pandas_dataframe(log): case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, CASE_CONCEPT_NAME ) all_activs = log[activity_key].value_counts().to_dict() logs_traces = Counter( [ tuple(x) for x in log.groupby(case_id_key)[activity_key] .agg(list) .to_dict() .values() ] ) ret = {} ret[Outputs.EQUIVALENCE.value] = equivalence( logs_traces, all_activs, noise_threshold=noise_threshold ) ret[Outputs.ALWAYS_AFTER.value] = always_after( logs_traces, all_activs, noise_threshold=noise_threshold ) ret[Outputs.ALWAYS_BEFORE.value] = always_before( logs_traces, all_activs, noise_threshold=noise_threshold ) ret[Outputs.NEVER_TOGETHER.value] = never_together( logs_traces, all_activs, len(log), noise_threshold=noise_threshold ) ret[Outputs.DIRECTLY_FOLLOWS.value] = directly_follows( logs_traces, all_activs, noise_threshold=noise_threshold ) ret[Outputs.ACTIV_FREQ.value] = activ_freq( logs_traces, all_activs, len(log), noise_threshold=noise_threshold ) return ret
[docs] def apply_from_variants_list(var_list, parameters=None): """ Discovers the log skeleton from the variants list Parameters --------------- var_list Variants list parameters Parameters Returns --------------- model Log skeleton model """ if parameters is None: parameters = {} log = EventLog() for cv in var_list: v = cv[0] trace = variants_util.variant_to_trace(v, parameters=parameters) log.append(trace) return apply(log, parameters=parameters)
[docs] def prepare_encode(log_skeleton): """ Prepares the log skeleton for encoding Parameters -------------- log_skeleton Log skeleton Returns -------------- log_skeleton Log skeleton (with lists instead of sets) """ log_skeleton[Outputs.EQUIVALENCE.value] = list( log_skeleton[Outputs.EQUIVALENCE.value] ) log_skeleton[Outputs.ALWAYS_AFTER.value] = list( log_skeleton[Outputs.ALWAYS_AFTER.value] ) log_skeleton[Outputs.ALWAYS_BEFORE.value] = list( log_skeleton[Outputs.ALWAYS_BEFORE.value] ) log_skeleton[Outputs.NEVER_TOGETHER.value] = list( log_skeleton[Outputs.NEVER_TOGETHER.value] ) log_skeleton[Outputs.DIRECTLY_FOLLOWS.value] = list( log_skeleton[Outputs.DIRECTLY_FOLLOWS.value] ) for act in log_skeleton[Outputs.ACTIV_FREQ.value]: log_skeleton[Outputs.ACTIV_FREQ.value][act] = list( log_skeleton[Outputs.ACTIV_FREQ.value][act] ) return log_skeleton