Source code for pm4py.algo.discovery.ocel.ocpn.variants.classic

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''

from pm4py.objects.ocel.obj import OCEL
from pm4py.algo.discovery.ocel.ocdfg.variants import classic as ocdfg_discovery
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from enum import Enum
from pm4py.util import exec_utils
from pm4py.objects.ocel import constants as ocel_constants
from collections import Counter
from typing import Optional, Dict, Any
from pm4py.objects.dfg.obj import DFG
from pm4py.objects.conversion.process_tree import converter as tree_converter
from pm4py.algo.conformance.tokenreplay import algorithm as token_based_replay
from pm4py.objects.ocel.util import flattening
from copy import copy


[docs] class Parameters(Enum): EVENT_ACTIVITY = ocel_constants.PARAM_EVENT_ACTIVITY OBJECT_TYPE = ocel_constants.PARAM_OBJECT_TYPE INDUCTIVE_MINER_VARIANT = "inductive_miner_variant" DOUBLE_ARC_THRESHOLD = "double_arc_threshold" DIAGNOSTICS_WITH_TBR = "diagnostics_with_token_based_replay"
[docs] def apply( ocel: OCEL, parameters: Optional[Dict[Any, Any]] = None ) -> Dict[str, Any]: """ Discovers an object-centric Petri net (without annotation) from the given object-centric event log, using the Inductive Miner as process discovery algorithm. Reference paper: van der Aalst, Wil MP, and Alessandro Berti. "Discovering object-centric Petri nets." Fundamenta informaticae 175.1-4 (2020): 1-40. Parameters ----------------- ocel Object-centric event log parameters Parameters of the algorithm, including: - Parameters.EVENT_ACTIVITY => the activity attribute to be used - Parameters.OBJECT_TYPE => the object type attribute to be used - Parameters.DOUBLE_ARC_THRESHOLD => the threshold for the attribution of the "double arc", as described in the paper. - Parameters.DIAGNOSTICS_WITH_TBR => performs token-based replay and stores the result in the return dict Returns ----------------- ocpn Object-centric Petri net model, presented as a dictionary of properties: - activities: complete set of activities derived from the object-centric event log - object_types: complete set of object types derived from the object-centric event log - edges: dictionary connecting each object type to a set of directly-followed arcs between activities (expressed as tuples, e.g., (act1, act2)). Every pair of activities is linked to some sets: - event_pairs: the tuples of event identifiers where the directly-follows arc occurs - total_objects: set of tuples containing two event and one object identifier, uniquely identifying an occurrence of the arc. - activities_indep: dictionary linking each activity, regardless of the object type, to some sets: - events: the event identifiers where the activity occurs - unique_objects: the object identifiers where the activity occurs - total_objects: the tuples of event and object identifiers where the activity occurs. - activities_ot: dictionary linking each object type to another dictionary, where the activities are linked to some sets: - events: the event identifiers where the activity occurs (with at least one object of the given object type) - unique_objects: the object identifiers of the given object type where the activity occurs - total_objects: the tuples of event and object identifiers where the activity occurs. - start_activities: dictionary linking each object type to another dictionary, where the start activities of the given object type are linked to some sets: - events: the event identifiers where the start activity occurs (with at least one object of the given object type) - unique_objects: the object identifiers of the given object type where the start activity occurs - total_objects: the tuples of event and object identifiers where the start activity occurs. - end_activities: dictionary linking each object type to another dictionary, where the end activities of the given object type are linked to some sets: - events: the event identifiers where the end activity occurs (with at least one object of the given object type) - unique_objects: the object identifiers of the given object type where the end activity occurs - total_objects: the tuples of event and object identifiers where the end activity occurs. - petri_nets: the accepted Petri nets (Petri net + initial marking + final marking) discovered by the process discovery algorithm - double_arcs_on_activity: dictionary linking each object type to another dictionary, where each arc of the Petri net is linked to a boolean (True if it is a double arc) - tbr_results: the results of the token-based replay operation (if required) """ if parameters is None: parameters = {} double_arc_threshold = exec_utils.get_param_value( Parameters.DOUBLE_ARC_THRESHOLD, parameters, 0.8 ) inductive_miner_variant = exec_utils.get_param_value( Parameters.INDUCTIVE_MINER_VARIANT, parameters, "im" ) diagnostics_with_tbr = exec_utils.get_param_value( Parameters.DIAGNOSTICS_WITH_TBR, parameters, False ) ocdfg_parameters = copy(parameters) ocdfg_parameters["compute_edges_performance"] = False ocpn = ocdfg_discovery.apply(ocel, parameters=ocdfg_parameters) petri_nets = {} double_arcs_on_activity = {} tbr_results = {} for ot in ocpn["object_types"]: activities_eo = ocpn["activities_ot"]["total_objects"][ot] start_activities = { x: len(y) for x, y in ocpn["start_activities"]["events"][ot].items() } end_activities = { x: len(y) for x, y in ocpn["end_activities"]["events"][ot].items() } dfg = {} if ot in ocpn["edges"]["event_couples"]: dfg = { x: len(y) for x, y in ocpn["edges"]["event_couples"][ot].items() } is_activity_double = {} for act in activities_eo: ev_obj_count = Counter([x[0] for x in activities_eo[act]]) this_single_amount = 0 for y in ev_obj_count.values(): if y == 1: this_single_amount += 1 this_single_amount = this_single_amount / len(ev_obj_count) if this_single_amount <= double_arc_threshold: is_activity_double[act] = True else: is_activity_double[act] = False double_arcs_on_activity[ot] = is_activity_double im_parameters = copy(parameters) # disables the fallthroughs, as computing the model on a myriad of different object types # could be really expensive im_parameters["disable_fallthroughs"] = True # for performance reasons, also disable the strict sequence cut (use # the normal sequence cut) im_parameters["disable_strict_sequence_cut"] = True process_tree = None flat_log = None if inductive_miner_variant == "im" or diagnostics_with_tbr: # do the flattening only if it is required flat_log = flattening.flatten(ocel, ot, parameters=parameters) if inductive_miner_variant == "imd": obj = DFG() obj._graph = Counter(dfg) obj._start_activities = Counter(start_activities) obj._end_activities = Counter(end_activities) process_tree = inductive_miner.apply( obj, variant=inductive_miner.Variants.IMd, parameters=im_parameters, ) elif inductive_miner_variant == "im": process_tree = inductive_miner.apply( flat_log, parameters=im_parameters ) petri_net = tree_converter.apply(process_tree, parameters=parameters) if diagnostics_with_tbr: tbr_parameters = copy(parameters) tbr_parameters["enable_pltr_fitness"] = True tbr_parameters["show_progress_bar"] = False ( replayed_traces, place_fitness_per_trace, transition_fitness_per_trace, notexisting_activities_in_model, ) = token_based_replay.apply( flat_log, petri_net[0], petri_net[1], petri_net[2], parameters=tbr_parameters, ) place_diagnostics = { place: {"m": 0, "r": 0, "c": 0, "p": 0} for place in place_fitness_per_trace } trans_count = {trans: 0 for trans in petri_net[0].transitions} # computes the missing, remaining, consumed, and produced tokens # per place. for place, res in place_fitness_per_trace.items(): place_diagnostics[place]["m"] += res["m"] place_diagnostics[place]["r"] += res["r"] place_diagnostics[place]["c"] += res["c"] place_diagnostics[place]["p"] += res["p"] # counts the number of times a transition has been fired during the # replay. for trace in replayed_traces: for trans in trace["activated_transitions"]: trans_count[trans] += 1 tbr_results[ot] = (place_diagnostics, trans_count) petri_nets[ot] = petri_net ocpn["petri_nets"] = petri_nets ocpn["double_arcs_on_activity"] = double_arcs_on_activity ocpn["tbr_results"] = tbr_results return ocpn