'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from copy import deepcopy
from enum import Enum
import numpy as np
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.log.util import basic_filter
from pm4py.util import exec_utils
from pm4py.objects.conversion.log import converter as log_converter
[docs]
class Parameters(Enum):
STRING_ATTRIBUTES = "string_attributes"
NUMERIC_ATTRIBUTES = "numeric_attributes"
ENABLE_MULTIPLIER = "enable_multiplier"
[docs]
def diagnose_from_trans_fitness(log, trans_fitness, parameters=None):
"""
Perform root cause analysis starting from transition fitness knowledge
Parameters
-------------
log
Trace log object
trans_fitness
Transition fitness object
parameters
Possible parameters of the algorithm, including:
string_attributes -> List of string event attributes to consider
in building the decision tree
numeric_attributes -> List of numeric event attributes to consider
in building the decision tree
Returns
-----------
diagnostics
For each problematic transition:
- a decision tree comparing fit and unfit executions
- feature names
- classes
"""
from pm4py.util import ml_utils
if parameters is None:
parameters = {}
log = log_converter.apply(
log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters
)
diagnostics = {}
string_attributes = exec_utils.get_param_value(
Parameters.STRING_ATTRIBUTES, parameters, []
)
numeric_attributes = exec_utils.get_param_value(
Parameters.NUMERIC_ATTRIBUTES, parameters, []
)
enable_multiplier = exec_utils.get_param_value(
Parameters.ENABLE_MULTIPLIER, parameters, False
)
for trans in trans_fitness:
if len(trans_fitness[trans]["underfed_traces"]) > 0:
fit_cases_repr = []
underfed_cases_repr = []
for trace in log:
if trace in trans_fitness[trans]["underfed_traces"]:
underfed_cases_repr.append(
trans_fitness[trans]["underfed_traces"][trace][0]
)
elif trace in trans_fitness[trans]["fit_traces"]:
fit_cases_repr.append(
trans_fitness[trans]["fit_traces"][trace][0]
)
if fit_cases_repr and underfed_cases_repr:
data, feature_names = form_representation_from_dictio_couple(
fit_cases_repr,
underfed_cases_repr,
string_attributes,
numeric_attributes,
enable_multiplier=enable_multiplier,
)
target = []
classes = []
if enable_multiplier:
multiplier_first = int(
max(
float(len(underfed_cases_repr))
/ float(len(fit_cases_repr)),
1,
)
)
multiplier_second = int(
max(
float(len(fit_cases_repr))
/ float(len(underfed_cases_repr)),
1,
)
)
else:
multiplier_first = 1
multiplier_second = 1
for j in range(multiplier_first):
for i in range(len(fit_cases_repr)):
target.append(0)
classes.append("fit")
for j in range(multiplier_second):
for i in range(len(underfed_cases_repr)):
target.append(1)
classes.append("underfed")
data = np.array([np.array(x) for x in data])
target = np.asarray(target)
clf = ml_utils.DecisionTreeClassifier(max_depth=7)
clf.fit(data, target)
diagn_dict = {
"clf": clf,
"data": data,
"feature_names": feature_names,
"target": target,
"classes": classes,
}
diagnostics[trans] = diagn_dict
return diagnostics
[docs]
def diagnose_from_notexisting_activities(
log, notexisting_activities_in_model, parameters=None
):
"""
Perform root cause analysis related to activities that are not present in the model
Parameters
-------------
log
Trace log object
notexisting_activities_in_model
Not existing activities in the model
parameters
Possible parameters of the algorithm, including:
string_attributes -> List of string event attributes to consider
in building the decision tree
numeric_attributes -> List of numeric event attributes to consider
in building the decision tree
Returns
-----------
diagnostics
For each problematic transition:
- a decision tree comparing fit and unfit executions
- feature names
- classes
"""
from pm4py.util import ml_utils
if parameters is None:
parameters = {}
log = log_converter.apply(
log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters
)
diagnostics = {}
string_attributes = exec_utils.get_param_value(
Parameters.STRING_ATTRIBUTES, parameters, []
)
numeric_attributes = exec_utils.get_param_value(
Parameters.NUMERIC_ATTRIBUTES, parameters, []
)
enable_multiplier = exec_utils.get_param_value(
Parameters.ENABLE_MULTIPLIER, parameters, False
)
parameters_filtering = deepcopy(parameters)
parameters_filtering["positive"] = False
values = list(notexisting_activities_in_model.keys())
filtered_log = basic_filter.filter_log_traces_attr(
log, values, parameters=parameters_filtering
)
for act in notexisting_activities_in_model:
fit_cases_repr = []
containing_cases_repr = []
for trace in log:
if trace in notexisting_activities_in_model[act]:
containing_cases_repr.append(
notexisting_activities_in_model[act][trace]
)
elif trace in filtered_log:
fit_cases_repr.append(dict(trace[-1]))
if fit_cases_repr and containing_cases_repr:
data, feature_names = form_representation_from_dictio_couple(
fit_cases_repr,
containing_cases_repr,
string_attributes,
numeric_attributes,
enable_multiplier=enable_multiplier,
)
target = []
classes = []
if enable_multiplier:
multiplier_first = int(
max(
float(len(containing_cases_repr))
/ float(len(fit_cases_repr)),
1,
)
)
multiplier_second = int(
max(
float(len(fit_cases_repr))
/ float(len(containing_cases_repr)),
1,
)
)
else:
multiplier_first = 1
multiplier_second = 1
for j in range(multiplier_first):
for i in range(len(fit_cases_repr)):
target.append(0)
classes.append("fit")
for j in range(multiplier_second):
for i in range(len(containing_cases_repr)):
target.append(1)
classes.append("containing")
data = np.array([np.array(x) for x in data])
target = np.asarray(target)
clf = ml_utils.DecisionTreeClassifier(max_depth=7)
clf.fit(data, target)
diagn_dict = {
"clf": clf,
"data": data,
"feature_names": feature_names,
"target": target,
"classes": classes,
}
diagnostics[act] = diagn_dict
return diagnostics