'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from copy import deepcopy
from enum import Enum
from pm4py.algo.discovery.dfg import algorithm as dfg_alg
from pm4py.algo.filtering.dfg.dfg_filtering import (
clean_dfg_based_on_noise_thresh,
)
from pm4py.objects.conversion.heuristics_net import converter as hn_conv_alg
from pm4py.objects.heuristics_net import defaults
from pm4py.objects.heuristics_net.node import Node
from pm4py.statistics.attributes.log import get as log_attributes
from pm4py.statistics.end_activities.log import get as log_ea_filter
from pm4py.statistics.start_activities.log import get as log_sa_filter
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants as xes
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
import pandas as pd
from pm4py.objects.heuristics_net.obj import HeuristicsNet
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
DEPENDENCY_THRESH = "dependency_thresh"
AND_MEASURE_THRESH = "and_measure_thresh"
MIN_ACT_COUNT = "min_act_count"
MIN_DFG_OCCURRENCES = "min_dfg_occurrences"
DFG_PRE_CLEANING_NOISE_THRESH = "dfg_pre_cleaning_noise_thresh"
LOOP_LENGTH_TWO_THRESH = "loop_length_two_thresh"
HEU_NET_DECORATION = "heu_net_decoration"
[docs]
def apply(
log: EventLog,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> Tuple[PetriNet, Marking, Marking]:
"""
Discovers a Petri net using Heuristics Miner
Parameters
------------
log
Event log
parameters
Possible parameters of the algorithm,
including:
- Parameters.ACTIVITY_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.DFG_PRE_CLEANING_NOISE_THRESH
- Parameters.LOOP_LENGTH_TWO_THRESH
Returns
------------
net
Petri net
im
Initial marking
fm
Final marking
"""
if parameters is None:
parameters = {}
heu_net = apply_heu(log, parameters=parameters)
net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters)
return net, im, fm
[docs]
def apply_pandas(
df: pd.DataFrame,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> Tuple[PetriNet, Marking, Marking]:
"""
Discovers a Petri net using Heuristics Miner
Parameters
------------
df
Pandas dataframe
parameters
Possible parameters of the algorithm,
including: activity_key, case_id_glue, timestamp_key,
dependency_thresh, and_measure_thresh, min_act_count, min_dfg_occurrences, dfg_pre_cleaning_noise_thresh,
loops_length_two_thresh
Returns
------------
net
Petri net
im
Initial marking
fm
Final marking
"""
if parameters is None:
parameters = {}
heu_net = apply_heu_pandas(df, parameters=parameters)
return hn_conv_alg.apply(heu_net, parameters=parameters)
[docs]
def apply_dfg(
dfg: Dict[Tuple[str, str], int],
activities=None,
activities_occurrences=None,
start_activities=None,
end_activities=None,
parameters: Optional[Dict[Any, Any]] = None,
) -> Tuple[PetriNet, Marking, Marking]:
"""
Discovers a Petri net using Heuristics Miner
Parameters
------------
dfg
Directly-Follows Graph
activities
(If provided) list of activities of the log
activities_occurrences
(If provided) dictionary of activities occurrences
start_activities
(If provided) dictionary of start activities occurrences
end_activities
(If provided) dictionary of end activities occurrences
parameters
Possible parameters of the algorithm,
including:
- Parameters.ACTIVITY_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.DFG_PRE_CLEANING_NOISE_THRESH
- Parameters.LOOP_LENGTH_TWO_THRESH
Returns
------------
net
Petri net
im
Initial marking
fm
Final marking
"""
if parameters is None:
parameters = {}
heu_net = apply_heu_dfg(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
parameters=parameters,
)
net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters)
return net, im, fm
[docs]
def apply_heu(
log: EventLog, parameters: Optional[Dict[Any, Any]] = None
) -> HeuristicsNet:
"""
Discovers an Heuristics Net using Heuristics Miner
Parameters
------------
log
Event log
parameters
Possible parameters of the algorithm,
including:
- Parameters.ACTIVITY_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.DFG_PRE_CLEANING_NOISE_THRESH
- Parameters.LOOP_LENGTH_TWO_THRESH
Returns
------------
heu
Heuristics Net
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY
)
heu_net_decoration = exec_utils.get_param_value(
Parameters.HEU_NET_DECORATION, parameters, "frequency"
)
start_activities = log_sa_filter.get_start_activities(
log, parameters=parameters
)
end_activities = log_ea_filter.get_end_activities(
log, parameters=parameters
)
activities_occurrences = log_attributes.get_attribute_values(
log, activity_key, parameters=parameters
)
activities = list(activities_occurrences.keys())
dfg = dfg_alg.apply(log, parameters=parameters)
parameters_w2 = deepcopy(parameters)
parameters_w2["window"] = 2
dfg_window_2 = dfg_alg.apply(log, parameters=parameters_w2)
freq_triples = dfg_alg.apply(
log, parameters=parameters, variant=dfg_alg.Variants.FREQ_TRIPLES
)
performance_dfg = None
if heu_net_decoration == "performance":
performance_dfg = dfg_alg.apply(
log, variant=dfg_alg.Variants.PERFORMANCE, parameters=parameters
)
return apply_heu_dfg(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
dfg_window_2=dfg_window_2,
freq_triples=freq_triples,
performance_dfg=performance_dfg,
parameters=parameters,
)
[docs]
def apply_heu_pandas(
df: pd.DataFrame,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> HeuristicsNet:
"""
Discovers an Heuristics Net using Heuristics Miner
Parameters
------------
df
Pandas dataframe
parameters
Possible parameters of the algorithm,
including:
- Parameters.ACTIVITY_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.DFG_PRE_CLEANING_NOISE_THRESH
- Parameters.LOOP_LENGTH_TWO_THRESH
Returns
------------
heu
Heuristics Net
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY
)
case_id_glue = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY, parameters, None
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY
)
from pm4py.algo.discovery.dfg.adapters.pandas import (
df_statistics,
freq_triples as get_freq_triples,
)
from pm4py.statistics.attributes.pandas import get as pd_attributes
from pm4py.statistics.start_activities.pandas import get as pd_sa_filter
from pm4py.statistics.end_activities.pandas import get as pd_ea_filter
start_activities = pd_sa_filter.get_start_activities(
df, parameters=parameters
)
end_activities = pd_ea_filter.get_end_activities(df, parameters=parameters)
activities_occurrences = pd_attributes.get_attribute_values(
df, activity_key, parameters=parameters
)
activities = list(activities_occurrences.keys())
heu_net_decoration = exec_utils.get_param_value(
Parameters.HEU_NET_DECORATION, parameters, "frequency"
)
if timestamp_key in df:
dfg = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
start_timestamp_key=start_timestamp_key,
)
dfg_window_2 = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
window=2,
start_timestamp_key=start_timestamp_key,
)
frequency_triples = get_freq_triples.get_freq_triples(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
)
else:
dfg = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
sort_timestamp_along_case_id=False,
)
dfg_window_2 = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
sort_timestamp_along_case_id=False,
window=2,
)
frequency_triples = get_freq_triples.get_freq_triples(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
sort_timestamp_along_case_id=False,
)
performance_dfg = None
if heu_net_decoration == "performance":
performance_dfg = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
start_timestamp_key=start_timestamp_key,
measure="performance",
)
heu_net = apply_heu_dfg(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
dfg_window_2=dfg_window_2,
freq_triples=frequency_triples,
performance_dfg=performance_dfg,
parameters=parameters,
)
return heu_net
[docs]
def apply_heu_dfg(
dfg,
activities=None,
activities_occurrences=None,
start_activities=None,
end_activities=None,
dfg_window_2=None,
freq_triples=None,
performance_dfg=None,
parameters=None,
) -> HeuristicsNet:
"""
Discovers an Heuristics Net using Heuristics Miner
Parameters
------------
dfg
Directly-Follows Graph
activities
(If provided) list of activities of the log
activities_occurrences
(If provided) dictionary of activities occurrences
start_activities
(If provided) dictionary of start activities occurrences
end_activities
(If provided) dictionary of end activities occurrences
dfg_window_2
(If provided) DFG of window 2
freq_triples
(If provided) Frequency triples
performance_dfg
(If provided) Performance DFG
parameters
Possible parameters of the algorithm,
including:
- Parameters.ACTIVITY_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.DFG_PRE_CLEANING_NOISE_THRESH
- Parameters.LOOP_LENGTH_TWO_THRESH
Returns
------------
heu
Heuristics Net
"""
if parameters is None:
parameters = {}
dependency_thresh = exec_utils.get_param_value(
Parameters.DEPENDENCY_THRESH,
parameters,
defaults.DEFAULT_DEPENDENCY_THRESH,
)
and_measure_thresh = exec_utils.get_param_value(
Parameters.AND_MEASURE_THRESH,
parameters,
defaults.DEFAULT_AND_MEASURE_THRESH,
)
min_act_count = exec_utils.get_param_value(
Parameters.MIN_ACT_COUNT, parameters, defaults.DEFAULT_MIN_ACT_COUNT
)
min_dfg_occurrences = exec_utils.get_param_value(
Parameters.MIN_DFG_OCCURRENCES,
parameters,
defaults.DEFAULT_MIN_DFG_OCCURRENCES,
)
dfg_pre_cleaning_noise_thresh = exec_utils.get_param_value(
Parameters.DFG_PRE_CLEANING_NOISE_THRESH,
parameters,
defaults.DEFAULT_DFG_PRE_CLEANING_NOISE_THRESH,
)
loops_length_two_thresh = exec_utils.get_param_value(
Parameters.LOOP_LENGTH_TWO_THRESH,
parameters,
defaults.DEFAULT_LOOP_LENGTH_TWO_THRESH,
)
heu_net = HeuristicsNet(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
dfg_window_2=dfg_window_2,
freq_triples=freq_triples,
performance_dfg=performance_dfg,
)
heu_net = calculate(
heu_net,
dependency_thresh=dependency_thresh,
and_measure_thresh=and_measure_thresh,
min_act_count=min_act_count,
min_dfg_occurrences=min_dfg_occurrences,
dfg_pre_cleaning_noise_thresh=dfg_pre_cleaning_noise_thresh,
loops_length_two_thresh=loops_length_two_thresh,
)
return heu_net
[docs]
def calculate(
heu_net,
dependency_thresh=defaults.DEFAULT_DEPENDENCY_THRESH,
and_measure_thresh=defaults.DEFAULT_AND_MEASURE_THRESH,
min_act_count=defaults.DEFAULT_MIN_ACT_COUNT,
min_dfg_occurrences=defaults.DEFAULT_MIN_DFG_OCCURRENCES,
dfg_pre_cleaning_noise_thresh=defaults.DEFAULT_DFG_PRE_CLEANING_NOISE_THRESH,
loops_length_two_thresh=defaults.DEFAULT_LOOP_LENGTH_TWO_THRESH,
parameters=None,
):
"""
Calculate the dependency matrix, populate the nodes
Parameters
-------------
dependency_thresh
(Optional) dependency threshold
and_measure_thresh
(Optional) AND measure threshold
min_act_count
(Optional) minimum number of occurrences of an activity
min_dfg_occurrences
(Optional) minimum dfg occurrences
dfg_pre_cleaning_noise_thresh
(Optional) DFG pre cleaning noise threshold
loops_length_two_thresh
(Optional) loops length two threshold
parameters
Other parameters of the algorithm
"""
if parameters is None:
parameters = {}
heu_net.min_dfg_occurrences = min_dfg_occurrences
heu_net.dependency_matrix = None
heu_net.dependency_matrix = {}
heu_net.dfg_matrix = None
heu_net.dfg_matrix = {}
heu_net.performance_matrix = None
heu_net.performance_matrix = {}
if dfg_pre_cleaning_noise_thresh > 0.0:
heu_net.dfg = clean_dfg_based_on_noise_thresh(
heu_net.dfg,
heu_net.activities,
dfg_pre_cleaning_noise_thresh,
parameters=parameters,
)
if heu_net.dfg_window_2 is not None:
for el in heu_net.dfg_window_2:
act1 = el[0]
act2 = el[1]
value = heu_net.dfg_window_2[el]
if act1 not in heu_net.dfg_window_2_matrix:
heu_net.dfg_window_2_matrix[act1] = {}
heu_net.dfg_window_2_matrix[act1][act2] = value
if heu_net.freq_triples is not None:
for el in heu_net.freq_triples:
act1 = el[0]
act2 = el[1]
act3 = el[2]
value = heu_net.freq_triples[el]
# avoid to consider self-loops
if act1 == act3 and not act1 == act2:
if act1 not in heu_net.freq_triples_matrix:
heu_net.freq_triples_matrix[act1] = {}
heu_net.freq_triples_matrix[act1][act2] = value
for el in heu_net.dfg:
act1 = el[0]
act2 = el[1]
value = heu_net.dfg[el]
perf_value = (
heu_net.performance_dfg[el]
if heu_net.performance_dfg is not None
else heu_net.dfg[el]
)
if act1 not in heu_net.dependency_matrix:
heu_net.dependency_matrix[act1] = {}
heu_net.dfg_matrix[act1] = {}
heu_net.performance_matrix[act1] = {}
heu_net.dfg_matrix[act1][act2] = value
heu_net.performance_matrix[act1][act2] = perf_value
if not act1 == act2:
inv_couple = (act2, act1)
c1 = value
if inv_couple in heu_net.dfg:
c2 = heu_net.dfg[inv_couple]
dep = (c1 - c2) / (c1 + c2 + 1)
else:
dep = c1 / (c1 + 1)
else:
dep = value / (value + 1)
heu_net.dependency_matrix[act1][act2] = dep
for n1 in heu_net.dependency_matrix:
for n2 in heu_net.dependency_matrix[n1]:
condition1 = (
n1 in heu_net.activities_occurrences
and heu_net.activities_occurrences[n1] >= min_act_count
)
condition2 = (
n2 in heu_net.activities_occurrences
and heu_net.activities_occurrences[n2] >= min_act_count
)
condition3 = heu_net.dfg_matrix[n1][n2] >= min_dfg_occurrences
condition4 = heu_net.dependency_matrix[n1][n2] >= dependency_thresh
condition = condition1 and condition2 and condition3 and condition4
if condition:
if n1 not in heu_net.nodes:
heu_net.nodes[n1] = Node(
heu_net,
n1,
heu_net.activities_occurrences[n1],
is_start_node=(n1 in heu_net.start_activities),
is_end_node=(n1 in heu_net.end_activities),
default_edges_color=heu_net.default_edges_color[0],
node_type=heu_net.node_type,
net_name=heu_net.net_name[0],
nodes_dictionary=heu_net.nodes,
)
if n2 not in heu_net.nodes:
heu_net.nodes[n2] = Node(
heu_net,
n2,
heu_net.activities_occurrences[n2],
is_start_node=(n2 in heu_net.start_activities),
is_end_node=(n2 in heu_net.end_activities),
default_edges_color=heu_net.default_edges_color[0],
node_type=heu_net.node_type,
net_name=heu_net.net_name[0],
nodes_dictionary=heu_net.nodes,
)
repr_value = heu_net.performance_matrix[n1][n2]
heu_net.nodes[n1].add_output_connection(
heu_net.nodes[n2],
heu_net.dependency_matrix[n1][n2],
heu_net.dfg_matrix[n1][n2],
repr_value=repr_value,
)
heu_net.nodes[n2].add_input_connection(
heu_net.nodes[n1],
heu_net.dependency_matrix[n1][n2],
heu_net.dfg_matrix[n1][n2],
repr_value=repr_value,
)
for node in heu_net.nodes:
heu_net.nodes[node].calculate_and_measure_out(
and_measure_thresh=and_measure_thresh
)
heu_net.nodes[node].calculate_and_measure_in(
and_measure_thresh=and_measure_thresh
)
heu_net.nodes[node].calculate_loops_length_two(
heu_net.dfg_matrix,
heu_net.freq_triples_matrix,
loops_length_two_thresh=loops_length_two_thresh,
)
nodes = list(heu_net.nodes.keys())
added_loops = set()
for n1 in nodes:
for n2 in heu_net.nodes[n1].loop_length_two:
if (
n1 in heu_net.dfg_matrix
and n2 in heu_net.dfg_matrix[n1]
and heu_net.dfg_matrix[n1][n2] >= min_dfg_occurrences
and n1 in heu_net.activities_occurrences
and heu_net.activities_occurrences[n1] >= min_act_count
and n2 in heu_net.activities_occurrences
and heu_net.activities_occurrences[n2] >= min_act_count
):
if not (
(
n1 in heu_net.dependency_matrix
and n2 in heu_net.dependency_matrix[n1]
and heu_net.dependency_matrix[n1][n2]
>= dependency_thresh
)
or (
n2 in heu_net.dependency_matrix
and n1 in heu_net.dependency_matrix[n2]
and heu_net.dependency_matrix[n2][n1]
>= dependency_thresh
)
):
if n2 not in heu_net.nodes:
heu_net.nodes[n2] = Node(
heu_net,
n2,
heu_net.activities_occurrences[n2],
is_start_node=(n2 in heu_net.start_activities),
is_end_node=(n2 in heu_net.end_activities),
default_edges_color=heu_net.default_edges_color[0],
node_type=heu_net.node_type,
net_name=heu_net.net_name[0],
nodes_dictionary=heu_net.nodes,
)
v_n1_n2 = (
heu_net.dfg_matrix[n1][n2]
if n1 in heu_net.dfg_matrix
and n2 in heu_net.dfg_matrix[n1]
else 0
)
v_n2_n1 = (
heu_net.dfg_matrix[n2][n1]
if n2 in heu_net.dfg_matrix
and n1 in heu_net.dfg_matrix[n2]
else 0
)
if (n1, n2) not in added_loops:
repr_value = (
heu_net.performance_matrix[n1][n2]
if n1 in heu_net.performance_matrix
and n2 in heu_net.performance_matrix[n1]
else 0
)
added_loops.add((n1, n2))
heu_net.nodes[n1].add_output_connection(
heu_net.nodes[n2],
0,
v_n1_n2,
repr_value=repr_value,
)
heu_net.nodes[n2].add_input_connection(
heu_net.nodes[n1],
0,
v_n2_n1,
repr_value=repr_value,
)
if (n2, n1) not in added_loops:
repr_value = (
heu_net.performance_matrix[n2][n1]
if n2 in heu_net.performance_matrix
and n1 in heu_net.performance_matrix[n2]
else 0
)
added_loops.add((n2, n1))
heu_net.nodes[n2].add_output_connection(
heu_net.nodes[n1],
0,
v_n2_n1,
repr_value=repr_value,
)
heu_net.nodes[n1].add_input_connection(
heu_net.nodes[n2],
0,
v_n1_n2,
repr_value=repr_value,
)
if len(heu_net.nodes) == 0:
for act in heu_net.activities:
heu_net.nodes[act] = Node(
heu_net,
act,
heu_net.activities_occurrences[act],
is_start_node=(act in heu_net.start_activities),
is_end_node=(act in heu_net.end_activities),
default_edges_color=heu_net.default_edges_color[0],
node_type=heu_net.node_type,
net_name=heu_net.net_name[0],
nodes_dictionary=heu_net.nodes,
)
return heu_net