'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects import log as log_lib
from pm4py.algo.evaluation.precision import utils as precision_utils
from pm4py.objects.petri_net.utils import align_utils as utils, check_soundness
from pm4py.objects.petri_net.utils.petri_utils import construct_trace_net
from pm4py.objects.petri_net.utils.synchronous_product import construct
from pm4py.statistics.start_activities.log.get import get_start_activities
from pm4py.objects.petri_net.utils.align_utils import (
get_visible_transitions_eventually_enabled_by_marking,
)
from pm4py.util import exec_utils
from pm4py.util import xes_constants
import importlib.util
from enum import Enum
from pm4py.util import constants
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog, EventStream
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.conversion.log import converter as log_converter
import pandas as pd
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
TOKEN_REPLAY_VARIANT = "token_replay_variant"
CLEANING_TOKEN_FLOOD = "cleaning_token_flood"
SHOW_PROGRESS_BAR = "show_progress_bar"
MULTIPROCESSING = "multiprocessing"
CORES = "cores"
[docs]
def apply(
log: Union[EventLog, EventStream, pd.DataFrame],
net: PetriNet,
marking: Marking,
final_marking: Marking,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> float:
"""
Get Align-ET Conformance precision
Parameters
----------
log
Trace log
net
Petri net
marking
Initial marking
final_marking
Final marking
parameters
Parameters of the algorithm, including:
Parameters.ACTIVITY_KEY -> Activity key
"""
if parameters is None:
parameters = {}
debug_level = (
parameters["debug_level"] if "debug_level" in parameters else 0
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, log_lib.util.xes.DEFAULT_NAME_KEY
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
# default value for precision, when no activated transitions (not even by
# looking at the initial marking) are found
precision = 1.0
sum_ee = 0
sum_at = 0
unfit = 0
if not check_soundness.check_easy_soundness_net_in_fin_marking(
net, marking, final_marking
):
raise Exception(
"trying to apply Align-ETConformance on a Petri net that is not a easy sound net!!"
)
if type(log) is not pd.DataFrame:
log = log_converter.apply(
log,
variant=log_converter.Variants.TO_EVENT_LOG,
parameters=parameters,
)
prefixes, prefix_count = precision_utils.get_log_prefixes(
log, activity_key=activity_key, case_id_key=case_id_key
)
prefixes_keys = list(prefixes.keys())
fake_log = precision_utils.form_fake_log(
prefixes_keys, activity_key=activity_key
)
align_stop_marking = align_fake_log_stop_marking(
fake_log, net, marking, final_marking, parameters=parameters
)
all_markings = transform_markings_from_sync_to_original_net(
align_stop_marking, net, parameters=parameters
)
for i in range(len(prefixes)):
markings = all_markings[i]
if markings is not None:
log_transitions = set(prefixes[prefixes_keys[i]])
activated_transitions_labels = set()
for m in markings:
# add to the set of activated transitions in the model the activated transitions
# for each prefix
activated_transitions_labels = activated_transitions_labels.union(
x.label
for x in utils.get_visible_transitions_eventually_enabled_by_marking(
net, m
)
if x.label is not None
)
escaping_edges = activated_transitions_labels.difference(
log_transitions
)
sum_at += (
len(activated_transitions_labels)
* prefix_count[prefixes_keys[i]]
)
sum_ee += len(escaping_edges) * prefix_count[prefixes_keys[i]]
if debug_level > 1:
print("")
print("prefix=", prefixes_keys[i])
print("log_transitions=", log_transitions)
print("activated_transitions=", activated_transitions_labels)
print("escaping_edges=", escaping_edges)
else:
unfit += prefix_count[prefixes_keys[i]]
if debug_level > 0:
print("\n")
print("overall unfit", unfit)
print("overall activated transitions", sum_at)
print("overall escaping edges", sum_ee)
# fix: also the empty prefix should be counted!
start_activities = set(get_start_activities(log, parameters=parameters))
trans_en_ini_marking = set(
[
x.label
for x in get_visible_transitions_eventually_enabled_by_marking(
net, marking
)
]
)
diff = trans_en_ini_marking.difference(start_activities)
if type(log) is EventLog:
sum_at += len(log) * len(trans_en_ini_marking)
sum_ee += len(log) * len(diff)
else:
sum_at += log[case_id_key].nunique() * len(trans_en_ini_marking)
sum_ee += log[case_id_key].nunique() * len(diff)
# end fix
if sum_at > 0:
precision = 1 - float(sum_ee) / float(sum_at)
return precision
[docs]
def align_fake_log_stop_marking(
fake_log, net, marking, final_marking, parameters=None
):
"""
Align the 'fake' log with all the prefixes in order to get the markings in which
the alignment stops
Parameters
-------------
fake_log
Fake log
net
Petri net
marking
Marking
final_marking
Final marking
parameters
Parameters of the algorithm
Returns
-------------
alignment
For each trace in the log, return the marking in which the alignment stops (expressed as place name with count)
"""
if parameters is None:
parameters = {}
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
multiprocessing = exec_utils.get_param_value(
Parameters.MULTIPROCESSING,
parameters,
constants.ENABLE_MULTIPROCESSING_DEFAULT,
)
progress = None
if (
importlib.util.find_spec("tqdm")
and show_progress_bar
and len(fake_log) > 1
):
from tqdm.auto import tqdm
progress = tqdm(
total=len(fake_log),
desc="computing precision with alignments, completed variants :: ",
)
if multiprocessing:
align_intermediate_result = (
__align_log_with_multiprocessing_stop_marking(
fake_log,
net,
marking,
final_marking,
progress,
parameters=parameters,
)
)
else:
align_intermediate_result = (
__align_log_wo_multiprocessing_stop_marking(
fake_log,
net,
marking,
final_marking,
progress,
parameters=parameters,
)
)
align_result = []
for i in range(len(align_intermediate_result)):
res = align_intermediate_result[i]
if res is not None:
align_result.append([])
for mark in res:
res2 = {}
for pl in mark:
# transforms the markings for easier correspondence at the end
# (distributed engine friendly!)
res2[(pl.name[0], pl.name[1])] = mark[pl]
align_result[-1].append(res2)
else:
# if there is no path from the initial marking
# replaying the given prefix, then add None
align_result.append(None)
# gracefully close progress bar
if progress is not None:
progress.close()
del progress
return align_result
def __align_log_wo_multiprocessing_stop_marking(
fake_log, net, marking, final_marking, progress, parameters=None
):
align_intermediate_result = []
for i in range(len(fake_log)):
res = __align_trace_stop_marking(
fake_log[i], net, marking, final_marking, parameters=parameters
)
align_intermediate_result.append(res)
if progress is not None:
progress.update()
return align_intermediate_result
def __align_log_with_multiprocessing_stop_marking(
fake_log, net, marking, final_marking, progress, parameters=None
):
if parameters is not None:
parameters = {}
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
num_cores = exec_utils.get_param_value(
Parameters.CORES, parameters, multiprocessing.cpu_count() - 2
)
align_intermediate_result = []
with ProcessPoolExecutor(max_workers=num_cores) as executor:
futures = []
for i in range(len(fake_log)):
futures.append(
executor.submit(
__align_trace_stop_marking,
fake_log[i],
net,
marking,
final_marking,
parameters,
)
)
if progress is not None:
alignments_ready = 0
while alignments_ready != len(futures):
current = 0
for index, variant in enumerate(futures):
current = current + 1 if futures[index].done() else current
if current > alignments_ready:
for i in range(0, current - alignments_ready):
progress.update()
alignments_ready = current
for index, variant in enumerate(futures):
align_intermediate_result.append(futures[index].result())
return align_intermediate_result
def __align_trace_stop_marking(
trace, net, marking, final_marking, parameters=None
):
sync_net, sync_initial_marking, sync_final_marking = build_sync_net(
trace, net, marking, final_marking, parameters=parameters
)
stop_marking = Marking()
for pl, count in sync_final_marking.items():
if pl.name[1] == utils.SKIP:
stop_marking[pl] = count
cost_function = utils.construct_standard_cost_function(
sync_net, utils.SKIP
)
# perform the alignment of the prefix
res = precision_utils.__search(
sync_net,
sync_initial_marking,
sync_final_marking,
stop_marking,
cost_function,
utils.SKIP,
)
return res
[docs]
def build_sync_net(
trace, petri_net, initial_marking, final_marking, parameters=None
):
"""
Build the sync product net between the Petri net and the trace prefix
Parameters
---------------
trace
Trace prefix
petri_net
Petri net
initial_marking
Initial marking
final_marking
Final marking
parameters
Possible parameters of the algorithm
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
trace_net, trace_im, trace_fm = construct_trace_net(
trace, activity_key=activity_key
)
sync_prod, sync_initial_marking, sync_final_marking = construct(
trace_net,
trace_im,
trace_fm,
petri_net,
initial_marking,
final_marking,
utils.SKIP,
)
return sync_prod, sync_initial_marking, sync_final_marking