'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import heapq
from pm4py.objects.petri_net.utils.align_utils import levenshtein, discountedEditDistance
from pm4py.objects.petri_net.utils.petri_utils import decorate_places_preset_trans, decorate_transitions_prepostset
from pm4py.objects.petri_net.utils import align_utils as utils
from pm4py.util import exec_utils
from copy import copy
from enum import Enum
from pm4py.statistics.variants.log import get as variants_module
'''
This algorithm computes discounted multi-alignment.
More details in Boltenhagen's thesis
Author: Author: Boltenhagen Mathilde, Thomas Chatain, Josep Carmona
Date: Jan. 2021
'''
[docs]
class Parameters(Enum):
MARKING_LIMIT = "marking_limit"
EXPONENT = "exponent"
[docs]
def apply(log, petri_net, ini, fin, parameters=None):
"""
This function is a preliminary function for __search of the dijkstra_exponential_heuristic for multi-alignment.
The function gets the parameters and launches the algorithm on the variants (all traces aren't usefull for MA, see Boltenhagen's thesis).
"""
# get variants
variants_idxs = variants_module.get_variants_from_log_trace_idx(log, parameters=parameters)
variants = []
for index_variant, var in enumerate(variants_idxs):
variants.append(var.split(","))
# get the number of times we can reach the same marking, very usefull for concurrency and loops
marking_limit = exec_utils.get_param_value(Parameters.MARKING_LIMIT, parameters, None)
if marking_limit is None:
marking_limit=100000
# the discounted parameter
exponent = exec_utils.get_param_value(Parameters.EXPONENT, parameters, None)
if exponent is None:
exponent=2
return __search(petri_net, ini, fin, variants, marking_limit=marking_limit, exponent=exponent)
def __search(net, ini, fin, variants, exponent=2, marking_limit=1000):
'''
This function is the A* algorithm for multi-alignments.
'''
decorate_transitions_prepostset(net)
decorate_places_preset_trans(net)
mymemory={}
closed = {}
ini_state = utils.DijkstraSearchTupleForAntiAndMulti(0, ini, [])
open_set = [ini_state]
heapq.heapify(open_set)
visited = 0
queued = 0
traversed = 0
best, sizeAA = None, None
distanceTime = 0
trans_empty_preset = set(t for t in net.transitions if len(t.in_arcs) == 0)
def costfunction(t, curr_ma, withFrac=True):
'''
Compute the maximal distance of the current multi-alignment + t and the variants.
:param t: transition that we want to add in the current multi-alignment
:param curr_ma: current prefix of multi-alignment
:param withFrac: the fraction that prevents the best suffice, this parameter is false at the end of the algorithm.
'''
if t is None :
str_aa = [a.label for a in curr_ma]
elif len(curr_ma)==0 :
str_aa = [t.label]
else :
str_aa = [a.label for a in curr_ma] + [t.label]
all= []
for v in variants:
if str(v+str_aa) not in mymemory.keys() or not withFrac:
size_of_alignment, distance = discountedEditDistance(str_aa,v, exponent)
if withFrac:
mymemory[str(v+str_aa)] = distance - (exponent**(-len(str_aa)+1)-exponent**(-(len(str_aa)+len(v))))/(exponent-1)
else :
mymemory[str(v+str_aa)] = distance
all.append(mymemory[str(v+str_aa)])
return max(all)
# ----------------------------
# a* algorithm starts here
while not len(open_set) == 0:
curr = heapq.heappop(open_set)
if best and best.g < curr.g:
continue
if curr.m == fin:
curr.g = costfunction(None, curr.r, withFrac=False)
# in case there only one run
if not best and (len(heapq.nsmallest(1,open_set))==0 or heapq.nsmallest(1,open_set)[0].g > curr.g):
return {'multi-alignment': curr.r, 'cost': curr.g, 'visited_states': visited, 'queued_states': queued,
'traversed_arcs': traversed, "max_distance_to_log":getMaxDist(curr.r,variants)}
# other runs might be interesting
elif not best or (best and best.g > curr.g):
best = curr
continue
closed[curr.m]= 1 if (curr.m) not in closed.keys() else closed[curr.m]+1
visited += 1
possible_enabling_transitions = copy(trans_empty_preset)
for p in curr.m:
for t in p.ass_trans:
possible_enabling_transitions.add(t)
enabled_trans = [t for t in possible_enabling_transitions if t.sub_marking <= curr.m]
trans_to_visit_with_cost = [(t, costfunction(t,curr.r, withFrac=True)) for t in enabled_trans if t is not None]
for t, cost in trans_to_visit_with_cost:
traversed += 1
new_marking = utils.add_markings(curr.m, t.add_marking)
if ((new_marking) in closed.keys() and closed[new_marking]>marking_limit) or cost==curr.g:
continue
queued += 1
tp = utils.DijkstraSearchTupleForAntiAndMulti(cost, new_marking, curr.r + [t])
heapq.heappush(open_set, tp)
return {'multi-alignment': best.r, 'cost': best.g, 'visited_states': visited, 'queued_states': queued,
'traversed_arcs': traversed, "max_distance_to_log":getMaxDist(best.r,variants)}
[docs]
def getMaxDist(run, variants):
'''
We give the maximal levenshtein edit distance to the variants
'''
run = [a.label for a in run]
all= []
for v in variants:
all.append(levenshtein(run,v))
return max(all)