Source code for pm4py.algo.simulation.playout.dfg.variants.classic

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import datetime
import heapq
import math
import sys
import time
from copy import deepcopy
from enum import Enum
from typing import Optional, Dict, Any, Union, Tuple

from pm4py.objects.log.obj import EventLog
from pm4py.objects.log.obj import Trace, Event
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.util.dt_parsing.variants import strpfromiso


[docs] class Parameters(Enum): MAX_NO_VARIANTS = "max_no_variants" MIN_WEIGHTED_PROBABILITY = "min_weighted_probability" MAX_NO_OCC_PER_ACTIVITY = "max_no_occ_per_activitiy" ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY INTERRUPT_SIMULATION_WHEN_DFG_COMPLETE = ( "interrupt_simulation_when_dfg_complete" ) ADD_TRACE_IF_TAKES_NEW_ELS_TO_DFG = "add_trace_if_takes_new_els_to_dfg" RETURN_VARIANTS = "return_variants" MAX_EXECUTION_TIME = "max_execution_time" RETURN_ONLY_IF_COMPLETE = "return_only_if_complete" MIN_VARIANT_OCC = "min_variant_occ"
[docs] def get_node_tr_probabilities(dfg, start_activities, end_activities): """ Gets the transition probabilities between the nodes of a DFG Parameters -------------- dfg DFG start_activities Start activities end_activities End activities Returns --------------- weighted_start_activities Start activities, with a relative weight going from 0 to 1 node_transition_probabilities The transition probabilities between the nodes of the DFG (the end node is None) """ node_transition_probabilities = {} # this part gives to each edge a transition probability, # given the (integer) DFG and the (integer) count of the end activities for el in dfg: if el[0] not in node_transition_probabilities: node_transition_probabilities[el[0]] = {} node_transition_probabilities[el[0]][el[1]] = dfg[el] for ea in end_activities: if ea not in node_transition_probabilities: node_transition_probabilities[ea] = {} node_transition_probabilities[ea][None] = end_activities[ea] for source in node_transition_probabilities: sum_values = sum(node_transition_probabilities[source].values()) for target in node_transition_probabilities[source]: # logarithm gives numerical stability node_transition_probabilities[source][target] = math.log( float(node_transition_probabilities[source][target]) / float(sum_values) ) # this part gives to each start activity a probability, given its frequency sum_start_act = sum(start_activities.values()) start_activities = deepcopy(start_activities) for sa in start_activities: start_activities[sa] = math.log( float(start_activities[sa]) / float(sum_start_act) ) return start_activities, node_transition_probabilities
[docs] def get_traces(dfg, start_activities, end_activities, parameters=None): """ Gets the most probable traces from the DFG, one-by-one (iterator), until the least probable Parameters --------------- dfg *Complete* DFG start_activities Start activities end_activities End activities parameters Parameters of the algorithm, including: - Parameters.MAX_NO_OCC_PER_ACTIVITY => the maximum number of occurrences per activity in the traces of the log (default: 2) Returns --------------- yielded_trace Trace of the simulation """ if parameters is None: parameters = {} max_no_occ_per_activity = exec_utils.get_param_value( Parameters.MAX_NO_OCC_PER_ACTIVITY, parameters, 2 ) start_activities, node_transition_probabilities = ( get_node_tr_probabilities(dfg, start_activities, end_activities) ) # we start from the partial traces containing only the start activities along # with their probability partial_traces = [ (-start_activities[sa], (sa,)) for sa in start_activities ] heapq.heapify(partial_traces) while partial_traces: item = heapq.heappop(partial_traces) prob = item[0] trace_tuple = item[1] # Get last activity directly from the tuple last_act = trace_tuple[-1] # Count activities efficiently without using Counter trace_counter = {} for act in trace_tuple: trace_counter[act] = trace_counter.get(act, 0) + 1 for new_act in node_transition_probabilities[last_act]: # Use get() to avoid KeyError for activities not yet in trace current_count = trace_counter.get(new_act, 0) if current_count < max_no_occ_per_activity: prob_new_act = node_transition_probabilities[last_act][new_act] if new_act is None: p = math.exp(-(prob - prob_new_act)) yield (trace_tuple, p) else: # Create new trace directly without list conversions new_trace = trace_tuple + (new_act,) heapq.heappush( partial_traces, (prob - prob_new_act, new_trace) )
[docs] def apply( dfg: Dict[Tuple[str, str], int], start_activities: Dict[str, int], end_activities: Dict[str, int], parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Union[EventLog, Dict[Tuple[str, str], int]]: """ Applies the playout algorithm on a DFG, extracting the most likely traces according to the DFG Parameters --------------- dfg *Complete* DFG start_activities Start activities end_activities End activities parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY => the activity key of the simulated log - Parameters.TIMESTAMP_KEY => the timestamp key of the simulated log - Parameters.MAX_NO_VARIANTS => the maximum number of variants generated by the method (default: 3000) - Parameters.MIN_WEIGHTED_PROBABILITY => the minimum overall weighted probability that makes the method stop (default: 1) - Parameters.MAX_NO_OCC_PER_ACTIVITY => the maximum number of occurrences per activity in the traces of the log (default: 2) - Parameters.INTERRUPT_SIMULATION_WHEN_DFG_COMPLETE => interrupts the simulation when the DFG of the simulated log has the same keys to the DFG of the original log (all behavior is contained) (default: False) - Parameters.ADD_TRACE_IF_TAKES_NEW_ELS_TO_DFG => adds a simulated trace to the simulated log only if it adds elements to the simulated DFG, e.g., it adds behavior; skip insertion otherwise (default: False) - Parameters.RETURN_VARIANTS => returns the traces as variants with a likely number of occurrences Returns --------------- simulated_log Simulated log """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY, ) activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY ) max_no_variants = exec_utils.get_param_value( Parameters.MAX_NO_VARIANTS, parameters, 3000 ) min_weighted_probability = exec_utils.get_param_value( Parameters.MIN_WEIGHTED_PROBABILITY, parameters, 1.0 ) interrupt_simulation_when_dfg_complete = exec_utils.get_param_value( Parameters.INTERRUPT_SIMULATION_WHEN_DFG_COMPLETE, parameters, False ) add_trace_if_takes_new_els_to_dfg = exec_utils.get_param_value( Parameters.ADD_TRACE_IF_TAKES_NEW_ELS_TO_DFG, parameters, False ) return_variants = exec_utils.get_param_value( Parameters.RETURN_VARIANTS, parameters, False ) max_execution_time = exec_utils.get_param_value( Parameters.MAX_EXECUTION_TIME, parameters, sys.maxsize ) return_only_if_complete = exec_utils.get_param_value( Parameters.RETURN_ONLY_IF_COMPLETE, parameters, False ) min_variant_occ = exec_utils.get_param_value( Parameters.MIN_VARIANT_OCC, parameters, 1 ) # keep track of the DFG, start activities and end activities of the # (ongoing) simulation simulated_traces_dfg = set() simulated_traces_sa = set() simulated_traces_ea = set() interrupt_break_condition = False interrupted = False overall_probability = 0.0 final_traces = [] max_occ = 0 start_time = time.time() for tr, p in get_traces( dfg, start_activities, end_activities, parameters=parameters ): if ( interrupt_simulation_when_dfg_complete and interrupt_break_condition ): break if len(final_traces) >= max_no_variants: interrupted = True break if overall_probability > min_weighted_probability: interrupted = True break current_time = time.time() if (current_time - start_time) > max_execution_time: interrupted = True break overall_probability += p diff_sa = {tr[0]}.difference(simulated_traces_sa) diff_ea = {tr[-1]}.difference(simulated_traces_ea) diff_dfg = {(tr[i], tr[i + 1]) for i in range(len(tr) - 1)}.difference( simulated_traces_dfg ) adds_something = ( len(diff_sa) > 0 or len(diff_ea) > 0 or len(diff_dfg) > 0 ) if add_trace_if_takes_new_els_to_dfg and not adds_something: # interrupt the addition if the ADD_TRACE_IF_TAKES_NEW_ELS_TO_DFG is set to True, # and the trace does not really change the information on the DFG, start activities, # end activities continue # update the start activities, end activities, DFG of the original log simulated_traces_sa = simulated_traces_sa.union(diff_sa) simulated_traces_ea = simulated_traces_ea.union(diff_ea) simulated_traces_dfg = simulated_traces_dfg.union(diff_dfg) # memorize the difference between the original DFG and the DFG of the # simulated log diff_original_sa = set(start_activities).difference( simulated_traces_sa ) diff_original_ea = set(end_activities).difference(simulated_traces_ea) diff_original_dfg = set(dfg).difference(simulated_traces_dfg) interrupt_break_condition = ( len(diff_original_sa) == 0 and len(diff_original_ea) == 0 and len(diff_original_dfg) == 0 ) var_occ = math.ceil(p * max_no_variants) max_occ = max(max_occ, var_occ) if var_occ < min_variant_occ <= max_occ: break final_traces.append((-p, tr)) if ( interrupt_simulation_when_dfg_complete and interrupt_break_condition ): break # make sure that the traces are strictly ordered by their probability # (generally, the order is already pretty good, since the states are visited in the queue based on their order, # but not always 100% consistent) final_traces = sorted(final_traces) if return_variants: # returns the variants instead of the log variants = {} for p, tr in final_traces: var_occ = math.ceil(-p * max_no_variants) variants[tr] = var_occ if not (interrupted and return_only_if_complete): return variants else: event_log = EventLog() # assigns to each event an increased timestamp from 1970 curr_timestamp = 10000000 for index, tr in enumerate(final_traces): log_trace = Trace( attributes={ xes_constants.DEFAULT_TRACEID_KEY: str(index), "probability": -tr[0], } ) for act in tr[1]: log_trace.append( Event( { activity_key: act, timestamp_key: strpfromiso.fix_naivety( datetime.datetime.fromtimestamp(curr_timestamp) ), } ) ) # increases by 1 second curr_timestamp += 1 event_log.append(log_trace) if not (interrupted and return_only_if_complete): return event_log
[docs] def get_trace_probability( trace, dfg, start_activities, end_activities, parameters=None ): """ Given a trace of a log, gets its probability given the complete DFG Parameters ---------------- trace Trace of a log dfg *Complete* DFG start_activities Start activities of the model end_activities End activities of the model parameters Parameters of the algorithm: - Parameters.ACTIVITY_KEY => activity key Returns ---------------- prob The probability of the trace according to the DFG """ if parameters is None: parameters = {} activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY ) trace_act = tuple(x[activity_key] for x in trace) start_activities, node_transition_probabilities = ( get_node_tr_probabilities(dfg, start_activities, end_activities) ) try: # the following code would not crash, assuming that the trace is fully # replayable on the model sum_prob = 0.0 sum_prob += start_activities[trace_act[0]] for i in range(len(trace_act)): this_act = trace_act[i] next_act = trace_act[i + 1] if i < len(trace_act) - 1 else None lpt = node_transition_probabilities[this_act][next_act] sum_prob += lpt return math.exp(sum_prob) except BaseException: # if it crashes, the trace is not replayable on the model # then return probability 0 return 0.0