Source code for pm4py.algo.discovery.footprints.tree.variants.bottomup

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects.process_tree.obj import Operator
from pm4py.objects.process_tree.utils import bottomup as bottomup_disc
from copy import copy

from enum import Enum
from typing import Optional, Dict, Any
from pm4py.objects.process_tree.obj import ProcessTree


[docs] class Outputs(Enum): DFG = "dfg" SEQUENCE = "sequence" PARALLEL = "parallel" START_ACTIVITIES = "start_activities" END_ACTIVITIES = "end_activities" ACTIVITIES = "activities" SKIPPABLE = "skippable" ACTIVITIES_ALWAYS_HAPPENING = "activities_always_happening" MIN_TRACE_LENGTH = "min_trace_length" MAX_TRACE_LENGTH = "max_trace_length_wo_loops" TRACE = "trace"
START_ACTIVITIES = Outputs.START_ACTIVITIES.value END_ACTIVITIES = Outputs.END_ACTIVITIES.value ACTIVITIES = Outputs.ACTIVITIES.value SKIPPABLE = Outputs.SKIPPABLE.value SEQUENCE = Outputs.SEQUENCE.value PARALLEL = Outputs.PARALLEL.value ACTIVITIES_ALWAYS_HAPPENING = Outputs.ACTIVITIES_ALWAYS_HAPPENING.value
[docs] def fix_fp(sequence, parallel): """ Fix footprints Parameters -------------- sequence Sequence parallel Parallel Returns ------------- sequence Sequence parallel Parallel """ sequence = sequence.difference(parallel) par_els = {(x[0], x[1]) for x in sequence if (x[1], x[0]) in sequence} for el in par_els: parallel.add(el) sequence.remove(el) return sequence, parallel
[docs] def get_footprints_leaf(node, footprints_dictio): """ Gets the footprints for a leaf node Parameters --------------- node Node footprints_dictio Dictionary of footprints of the process tree Returns --------------- footprints Footprints of the leaf node """ if node.label is None: return { START_ACTIVITIES: set(), END_ACTIVITIES: set(), ACTIVITIES: set(), SKIPPABLE: True, SEQUENCE: set(), PARALLEL: set(), ACTIVITIES_ALWAYS_HAPPENING: set(), } else: return { START_ACTIVITIES: set([node.label]), END_ACTIVITIES: set([node.label]), ACTIVITIES: set([node.label]), SKIPPABLE: False, SEQUENCE: set(), PARALLEL: set(), ACTIVITIES_ALWAYS_HAPPENING: set([node.label]), }
[docs] def get_footprints_xor(node, footprints_dictio): """ Gets the footprints for the XOR node Parameters --------------- node Node footprints_dictio Dictionary of footprints of the process tree Returns --------------- footprints Footprints of the XOR node """ start_activities = set() end_activities = set() activities = set() skippable = False sequence = set() parallel = set() activities_always_happening = set() if node.children: activities_always_happening = copy( footprints_dictio[node.children[0]][ACTIVITIES_ALWAYS_HAPPENING] ) for n0 in node.children: n = footprints_dictio[n0] start_activities = start_activities.union(n[START_ACTIVITIES]) end_activities = end_activities.union(n[END_ACTIVITIES]) activities = activities.union(n[ACTIVITIES]) skippable = skippable or n[SKIPPABLE] sequence = sequence.union(n[SEQUENCE]) parallel = parallel.union(n[PARALLEL]) if not n[SKIPPABLE]: activities_always_happening = ( activities_always_happening.intersection( n[ACTIVITIES_ALWAYS_HAPPENING] ) ) else: activities_always_happening = set() sequence, parallel = fix_fp(sequence, parallel) return { START_ACTIVITIES: start_activities, END_ACTIVITIES: end_activities, ACTIVITIES: activities, SKIPPABLE: skippable, SEQUENCE: sequence, PARALLEL: parallel, ACTIVITIES_ALWAYS_HAPPENING: activities_always_happening, }
[docs] def get_footprints_parallel(node, footprints_dictio): """ Gets the footprints for the parallel node Parameters --------------- node Node footprints_dictio Dictionary of footprints of the process tree Returns --------------- footprints Footprints of the parallel node """ start_activities = set() end_activities = set() activities = set() skippable = True sequence = set() parallel = set() activities_always_happening = set() for n0 in node.children: n = footprints_dictio[n0] start_activities = start_activities.union(n[START_ACTIVITIES]) end_activities = end_activities.union(n[END_ACTIVITIES]) activities = activities.union(n[ACTIVITIES]) skippable = skippable and n[SKIPPABLE] sequence = sequence.union(n[SEQUENCE]) parallel = parallel.union(n[PARALLEL]) if not n[SKIPPABLE]: activities_always_happening = activities_always_happening.union( n[ACTIVITIES_ALWAYS_HAPPENING] ) i = 0 while i < len(node.children): acti_i = list(footprints_dictio[node.children[i]][ACTIVITIES]) j = i + 1 while j < len(node.children): acti_j = list(footprints_dictio[node.children[j]][ACTIVITIES]) for a1 in acti_i: for a2 in acti_j: parallel.add((a1, a2)) parallel.add((a2, a1)) j = j + 1 i = i + 1 sequence, parallel = fix_fp(sequence, parallel) return { START_ACTIVITIES: start_activities, END_ACTIVITIES: end_activities, ACTIVITIES: activities, SKIPPABLE: skippable, SEQUENCE: sequence, PARALLEL: parallel, ACTIVITIES_ALWAYS_HAPPENING: activities_always_happening, }
[docs] def get_footprints_sequence(node, footprints_dictio): """ Gets the footprints for the sequence Parameters --------------- node Node footprints_dictio Dictionary of footprints of the process tree Returns --------------- footprints Footprints of the sequence node """ start_activities = set() end_activities = set() activities = set() skippable = True sequence = set() parallel = set() activities_always_happening = set() for n0 in node.children: n = footprints_dictio[n0] skippable = skippable and n[SKIPPABLE] sequence = sequence.union(n[SEQUENCE]) parallel = parallel.union(n[PARALLEL]) activities = activities.union(n[ACTIVITIES]) if not n[SKIPPABLE]: activities_always_happening = activities_always_happening.union( n[ACTIVITIES_ALWAYS_HAPPENING] ) # adds the footprints i = 0 while i < len(node.children) - 1: n0 = footprints_dictio[node.children[i]] j = i + 1 while j < len(node.children): n1 = footprints_dictio[node.children[j]] n0_end_act = n0[END_ACTIVITIES] n1_start_act = n1[START_ACTIVITIES] for a1 in n0_end_act: for a2 in n1_start_act: sequence.add((a1, a2)) if not n1[SKIPPABLE]: break j = j + 1 i = i + 1 # adds the start activities i = 0 while i < len(node.children): n = footprints_dictio[node.children[i]] start_activities = start_activities.union(n[START_ACTIVITIES]) if not n[SKIPPABLE]: break i = i + 1 # adds the end activities i = len(node.children) - 1 while i >= 0: n = footprints_dictio[node.children[i]] end_activities = end_activities.union(n[END_ACTIVITIES]) if not n[SKIPPABLE]: break i = i - 1 sequence, parallel = fix_fp(sequence, parallel) return { START_ACTIVITIES: start_activities, END_ACTIVITIES: end_activities, ACTIVITIES: activities, SKIPPABLE: skippable, SEQUENCE: sequence, PARALLEL: parallel, ACTIVITIES_ALWAYS_HAPPENING: activities_always_happening, }
[docs] def get_footprints_loop(node, footprints_dictio): """ Gets the footprints for the loop Parameters --------------- node Node footprints_dictio Dictionary of footprints of the process tree Returns --------------- footprints Footprints of the loop node """ start_activities = set() end_activities = set() activities = set() sequence = set() parallel = set() activities_always_happening = set() for n0 in node.children: n = footprints_dictio[n0] sequence = sequence.union(n[SEQUENCE]) parallel = parallel.union(n[PARALLEL]) activities = activities.union(n[ACTIVITIES]) do = footprints_dictio[node.children[0]] redo = footprints_dictio[node.children[1]] skippable = do[SKIPPABLE] if not do[SKIPPABLE]: activities_always_happening = copy(do[ACTIVITIES_ALWAYS_HAPPENING]) start_activities = start_activities.union(do[START_ACTIVITIES]) if do[SKIPPABLE]: start_activities = start_activities.union(redo[START_ACTIVITIES]) end_activities = end_activities.union(do[END_ACTIVITIES]) if do[SKIPPABLE]: end_activities = end_activities.union(redo[END_ACTIVITIES]) for a1 in do[END_ACTIVITIES]: for a2 in redo[START_ACTIVITIES]: sequence.add((a1, a2)) for a1 in redo[END_ACTIVITIES]: for a2 in do[START_ACTIVITIES]: sequence.add((a1, a2)) if do[SKIPPABLE]: for a1 in redo[END_ACTIVITIES]: for a2 in redo[START_ACTIVITIES]: sequence.add((a1, a2)) if redo[SKIPPABLE]: for a1 in do[END_ACTIVITIES]: for a2 in do[START_ACTIVITIES]: sequence.add((a1, a2)) sequence, parallel = fix_fp(sequence, parallel) return { START_ACTIVITIES: start_activities, END_ACTIVITIES: end_activities, ACTIVITIES: activities, SKIPPABLE: skippable, SEQUENCE: sequence, PARALLEL: parallel, ACTIVITIES_ALWAYS_HAPPENING: activities_always_happening, }
[docs] def get_footprints(node, footprints_dictio): """ Gets the footprints for a node (having the history of the child nodes) Parameters -------------- node Node of the tree footprints_dictio Dictionary of footprints of the process tree Returns -------------- footprints Footprints of the node (having the history of the child nodes) """ if len(node.children) == 0: return get_footprints_leaf(node, footprints_dictio) elif node.operator == Operator.XOR: return get_footprints_xor(node, footprints_dictio) elif node.operator == Operator.PARALLEL or node.operator == Operator.OR: return get_footprints_parallel(node, footprints_dictio) elif node.operator == Operator.SEQUENCE: return get_footprints_sequence(node, footprints_dictio) elif node.operator == Operator.LOOP: return get_footprints_loop(node, footprints_dictio)
[docs] def get_all_footprints(tree, parameters=None): """ Gets all the footprints for the nodes of the tree Parameters ----------------- tree Process tree parameters Parameters of the algorithm Returns ---------------- dictio Dictionary that associates a footprint to each node of the tree """ if parameters is None: parameters = {} # for each node of the bottom up, proceed to getting the footprints bottomup = bottomup_disc.get_bottomup_nodes(tree, parameters=parameters) footprints_dictio = {} for i in range(len(bottomup)): footprints_dictio[bottomup[i]] = get_footprints( bottomup[i], footprints_dictio ) return footprints_dictio
[docs] def apply( tree: ProcessTree, parameters: Optional[Dict[Any, Any]] = None ) -> Dict[str, Any]: """ Footprints detection on process tree Parameters ----------------- tree Process tree parameters Parameters of the algorithm Returns ----------------- footprints Footprints """ if parameters is None: parameters = {} all_footprints = get_all_footprints(tree, parameters=parameters) root_node_footprints = all_footprints[tree] this_parameters = copy(parameters) this_parameters["avoid_loops"] = True min_trace_length = bottomup_disc.get_min_trace_length(tree, parameters=this_parameters) max_trace_length = bottomup_disc.get_max_trace_length(tree, parameters=this_parameters) root_node_footprints[Outputs.MIN_TRACE_LENGTH.value] = min_trace_length root_node_footprints[Outputs.MAX_TRACE_LENGTH.value] = max_trace_length return root_node_footprints