Source code for pm4py.algo.conformance.alignments.petri_net.algorithm

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from copy import copy

from pm4py.algo.conformance.alignments.petri_net import variants
from pm4py.objects.petri_net.utils import align_utils, check_soundness
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.util.xes_constants import DEFAULT_NAME_KEY, DEFAULT_TRACEID_KEY
from pm4py.objects.log.obj import Trace, Event
import time
from pm4py.util.lp import solver
from pm4py.util import exec_utils
from enum import Enum
import sys
from pm4py.util.constants import (
    PARAMETER_CONSTANT_ACTIVITY_KEY,
    PARAMETER_CONSTANT_CASEID_KEY,
    CASE_CONCEPT_NAME,
)
import importlib.util
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog, EventStream, Trace
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import typing, constants, pandas_utils
import pandas as pd


[docs] class Variants(Enum): VERSION_STATE_EQUATION_A_STAR = variants.state_equation_a_star VERSION_DIJKSTRA_NO_HEURISTICS = variants.dijkstra_no_heuristics VERSION_DIJKSTRA_LESS_MEMORY = variants.dijkstra_less_memory VERSION_DISCOUNTED_A_STAR = variants.discounted_a_star
[docs] class Parameters(Enum): PARAM_TRACE_COST_FUNCTION = "trace_cost_function" PARAM_MODEL_COST_FUNCTION = "model_cost_function" PARAM_SYNC_COST_FUNCTION = "sync_cost_function" PARAM_ALIGNMENT_RESULT_IS_SYNC_PROD_AWARE = "ret_tuple_as_trans_desc" PARAM_TRACE_NET_COSTS = "trace_net_costs" TRACE_NET_CONSTR_FUNCTION = "trace_net_constr_function" TRACE_NET_COST_AWARE_CONSTR_FUNCTION = ( "trace_net_cost_aware_constr_function" ) PARAM_MAX_ALIGN_TIME_TRACE = "max_align_time_trace" PARAM_MAX_ALIGN_TIME = "max_align_time" PARAMETER_VARIANT_DELIMITER = "variant_delimiter" CASE_ID_KEY = PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = PARAMETER_CONSTANT_ACTIVITY_KEY VARIANTS_IDX = "variants_idx" SHOW_PROGRESS_BAR = "show_progress_bar" CORES = "cores" BEST_WORST_COST_INTERNAL = "best_worst_cost_internal" FITNESS_ROUND_DIGITS = "fitness_round_digits" SYNCHRONOUS = "synchronous_dijkstra" EXPONENT="theta" ENABLE_BEST_WORST_COST = "enable_best_worst_cost"
def __variant_mapper(variant): if type(variant) is str: if variant == "Variants.VERSION_STATE_EQUATION_A_STAR": variant = Variants.VERSION_STATE_EQUATION_A_STAR elif variant == "Variants.VERSION_DIJKSTRA_NO_HEURISTICS": variant = Variants.VERSION_DIJKSTRA_NO_HEURISTICS elif variant == "Variants.VERSION_DIJKSTRA_LESS_MEMORY": variant = Variants.VERSION_DIJKSTRA_LESS_MEMORY return variant DEFAULT_VARIANT = Variants.VERSION_DIJKSTRA_LESS_MEMORY if solver.DEFAULT_LP_SOLVER_VARIANT is not None: DEFAULT_VARIANT = __variant_mapper(constants.DEFAULT_ALIGNMENTS_VARIANT) VERSION_STATE_EQUATION_A_STAR = Variants.VERSION_STATE_EQUATION_A_STAR VERSION_DIJKSTRA_NO_HEURISTICS = Variants.VERSION_DIJKSTRA_NO_HEURISTICS VERSION_DIJKSTRA_LESS_MEMORY = Variants.VERSION_DIJKSTRA_LESS_MEMORY VERSIONS = { Variants.VERSION_DIJKSTRA_NO_HEURISTICS, Variants.VERSION_DIJKSTRA_NO_HEURISTICS, Variants.VERSION_DIJKSTRA_LESS_MEMORY, }
[docs] def apply( obj: Union[EventLog, EventStream, pd.DataFrame, Trace], petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, parameters: Optional[Dict[Any, Any]] = None, variant=DEFAULT_VARIANT, ) -> Union[typing.AlignmentResult, typing.ListAlignments]: if parameters is None: parameters = {} if isinstance(obj, Trace): return apply_trace( obj, petri_net, initial_marking, final_marking, parameters=parameters, variant=variant, ) else: return apply_log( obj, petri_net, initial_marking, final_marking, parameters=parameters, variant=variant, )
[docs] def apply_trace( trace, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT, ): """ apply alignments to a trace Parameters ----------- trace :class:`pm4py.log.log.Trace` trace of events petri_net :class:`pm4py.objects.petri.petrinet.PetriNet` the model to use for the alignment initial_marking :class:`pm4py.objects.petri.petrinet.Marking` initial marking of the net final_marking :class:`pm4py.objects.petri.petrinet.Marking` final marking of the net variant selected variant of the algorithm, possible values: {\'Variants.VERSION_STATE_EQUATION_A_STAR, Variants.VERSION_DIJKSTRA_NO_HEURISTICS \'} parameters :class:`dict` parameters of the algorithm, for key \'state_equation_a_star\': Parameters.ACTIVITY_KEY -> Attribute in the log that contains the activity Parameters.PARAM_MODEL_COST_FUNCTION -> mapping of each transition in the model to corresponding synchronous costs Parameters.PARAM_SYNC_COST_FUNCTION -> mapping of each transition in the model to corresponding model cost Parameters.PARAM_TRACE_COST_FUNCTION -> mapping of each index of the trace to a positive cost value Returns ----------- alignment :class:`dict` with keys **alignment**, **cost**, **visited_states**, **queued_states** and **traversed_arcs** The alignment is a sequence of labels of the form (a,t), (a,>>), or (>>,t) representing synchronous/log/model-moves. """ if parameters is None: parameters = copy({PARAMETER_CONSTANT_ACTIVITY_KEY: DEFAULT_NAME_KEY}) variant = __variant_mapper(variant) parameters = copy(parameters) enable_best_worst_cost = exec_utils.get_param_value( Parameters.ENABLE_BEST_WORST_COST, parameters, True ) ali = exec_utils.get_variant(variant).apply( trace, petri_net, initial_marking, final_marking, parameters=parameters ) trace_cost_function = exec_utils.get_param_value( Parameters.PARAM_TRACE_COST_FUNCTION, parameters, [] ) # Instead of using the length of the trace, use the sum of the trace cost # function trace_cost_function_sum = sum(trace_cost_function) if enable_best_worst_cost: best_worst_cost = exec_utils.get_param_value( Parameters.BEST_WORST_COST_INTERNAL, parameters, __get_best_worst_cost( petri_net, initial_marking, final_marking, variant, parameters ), ) if ali is not None and best_worst_cost is not None: ltrace_bwc = trace_cost_function_sum + best_worst_cost fitness_num = ali["cost"] // align_utils.STD_MODEL_LOG_MOVE_COST fitness_den = ltrace_bwc // align_utils.STD_MODEL_LOG_MOVE_COST fitness = 1 - fitness_num / fitness_den if fitness_den > 0 else 0 ali["fitness"] = fitness # returning also the best worst cost, for log fitness computation ali["bwc"] = ltrace_bwc return ali
[docs] def apply_log( log, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT, ): """ apply alignments to a log Parameters ----------- log object of the form :class:`pm4py.log.log.EventLog` event log petri_net :class:`pm4py.objects.petri.petrinet.PetriNet` the model to use for the alignment initial_marking :class:`pm4py.objects.petri.petrinet.Marking` initial marking of the net final_marking :class:`pm4py.objects.petri.petrinet.Marking` final marking of the net variant selected variant of the algorithm, possible values: {\'Variants.VERSION_STATE_EQUATION_A_STAR, Variants.VERSION_DIJKSTRA_NO_HEURISTICS \'} parameters :class:`dict` parameters of the algorithm, Returns ----------- alignment :class:`list` of :class:`dict` with keys **alignment**, **cost**, **visited_states**, **queued_states** and **traversed_arcs** The alignment is a sequence of labels of the form (a,t), (a,>>), or (>>,t) representing synchronous/log/model-moves. """ if parameters is None: parameters = dict() if solver.DEFAULT_LP_SOLVER_VARIANT is not None: if not check_soundness.check_easy_soundness_net_in_fin_marking( petri_net, initial_marking, final_marking ): raise Exception( "trying to apply alignments on a Petri net that is not a easy sound net!!" ) enable_best_worst_cost = exec_utils.get_param_value( Parameters.ENABLE_BEST_WORST_COST, parameters, True ) variant = __variant_mapper(variant) start_time = time.time() max_align_time = exec_utils.get_param_value( Parameters.PARAM_MAX_ALIGN_TIME, parameters, sys.maxsize ) max_align_time_case = exec_utils.get_param_value( Parameters.PARAM_MAX_ALIGN_TIME_TRACE, parameters, sys.maxsize ) variants_idxs, one_tr_per_var = __get_variants_structure(log, parameters) progress = __get_progress_bar(len(one_tr_per_var), parameters) if enable_best_worst_cost: best_worst_cost = __get_best_worst_cost( petri_net, initial_marking, final_marking, variant, parameters ) parameters[Parameters.BEST_WORST_COST_INTERNAL] = best_worst_cost all_alignments = [] for trace in one_tr_per_var: this_max_align_time = min( max_align_time_case, (max_align_time - (time.time() - start_time)) * 0.5, ) parameters[Parameters.PARAM_MAX_ALIGN_TIME_TRACE] = this_max_align_time all_alignments.append( apply_trace( trace, petri_net, initial_marking, final_marking, parameters=copy(parameters), variant=variant, ) ) if progress is not None: progress.update() alignments = __form_alignments(variants_idxs, all_alignments) __close_progress_bar(progress) return alignments
[docs] def apply_multiprocessing( log, petri_net, initial_marking, final_marking, parameters=None, variant=DEFAULT_VARIANT, ): """ Applies the alignments using a process pool (multiprocessing) Parameters --------------- log Event log petri_net Petri net initial_marking Initial marking final_marking Final marking parameters Parameters of the algorithm Returns ---------------- aligned_traces Alignments """ if parameters is None: parameters = {} import multiprocessing variant = __variant_mapper(variant) num_cores = exec_utils.get_param_value( Parameters.CORES, parameters, multiprocessing.cpu_count() - 2 ) enable_best_worst_cost = exec_utils.get_param_value( Parameters.ENABLE_BEST_WORST_COST, parameters, True ) variants_idxs, one_tr_per_var = __get_variants_structure(log, parameters) if enable_best_worst_cost: best_worst_cost = __get_best_worst_cost( petri_net, initial_marking, final_marking, variant, parameters ) parameters[Parameters.BEST_WORST_COST_INTERNAL] = best_worst_cost all_alignments = [] from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor(max_workers=num_cores) as executor: futures = [] for trace in one_tr_per_var: futures.append( executor.submit( apply_trace, trace, petri_net, initial_marking, final_marking, parameters, str(variant), ) ) progress = __get_progress_bar(len(one_tr_per_var), parameters) if progress is not None: alignments_ready = 0 while alignments_ready != len(futures): current = 0 for index, variant in enumerate(futures): current = current + 1 if futures[index].done() else current if current > alignments_ready: for i in range(0, current - alignments_ready): progress.update() alignments_ready = current for index, variant in enumerate(futures): all_alignments.append(futures[index].result()) __close_progress_bar(progress) alignments = __form_alignments(variants_idxs, all_alignments) return alignments
def __get_best_worst_cost( petri_net, initial_marking, final_marking, variant, parameters ): parameters_best_worst = copy(parameters) best_worst_cost = exec_utils.get_variant(variant).get_best_worst_cost( petri_net, initial_marking, final_marking, parameters=parameters_best_worst, ) return best_worst_cost def __get_variants_structure(log, parameters): if parameters is None: parameters = {} activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, DEFAULT_NAME_KEY ) variants_idxs = {} one_tr_per_var = [] if pandas_utils.check_is_pandas_dataframe(log): case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, CASE_CONCEPT_NAME ) traces = [ tuple(x) for x in log.groupby(case_id_key)[activity_key] .agg(list) .to_dict() .values() ] for idx, trace in enumerate(traces): if trace not in variants_idxs: variants_idxs[trace] = [idx] case = Trace() for act in trace: case.append(Event({activity_key: act})) one_tr_per_var.append(case) else: variants_idxs[trace].append(idx) else: log = log_converter.apply( log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters, ) for idx, case in enumerate(log): trace = tuple(x[activity_key] for x in case) if trace not in variants_idxs: variants_idxs[trace] = [idx] one_tr_per_var.append(case) else: variants_idxs[trace].append(idx) return variants_idxs, one_tr_per_var def __get_progress_bar(num_variants, parameters): show_progress_bar = exec_utils.get_param_value( Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR ) progress = None if ( importlib.util.find_spec("tqdm") and show_progress_bar and num_variants > 1 ): from tqdm.auto import tqdm progress = tqdm( total=num_variants, desc="aligning log, completed variants :: " ) return progress def __form_alignments(variants_idxs, all_alignments): al_idx = {} for index_variant, variant in enumerate(variants_idxs): for trace_idx in variants_idxs[variant]: al_idx[trace_idx] = all_alignments[index_variant] alignments = [] for i in range(len(al_idx)): alignments.append(al_idx[i]) return alignments def __close_progress_bar(progress): if progress is not None: progress.close() del progress
[docs] def get_diagnostics_dataframe(log, align_output, parameters=None): """ Gets the diagnostics results of alignments (of a log) in a dataframe Parameters -------------- log Event log align_output Output of the alignments Returns -------------- dataframe Diagnostics dataframe """ if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, DEFAULT_TRACEID_KEY ) import pandas as pd diagn_stream = [] for index in range(len(log)): case_id = log[index].attributes[case_id_key] cost = align_output[index]["cost"] fitness = align_output[index]["fitness"] is_fit = fitness == 1.0 diagn_stream.append( { "case_id": case_id, "cost": cost, "fitness": fitness, "is_fit": is_fit, } ) return pandas_utils.instantiate_dataframe(diagn_stream)