'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from copy import copy
from enum import Enum
from typing import Optional, Dict, Any, Tuple
import pandas as pd
from pm4py.algo.discovery.dfg import algorithm as dfg_alg
from pm4py.algo.discovery.dfg.adapters.pandas import df_statistics
from pm4py.objects.conversion.heuristics_net import converter as hn_conv_alg
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.heuristics_net import defaults
from pm4py.objects.heuristics_net.obj import HeuristicsNet
from pm4py.objects.heuristics_net.node import Node
from pm4py.objects.log.obj import EventLog
from pm4py.objects.log.util import interval_lifecycle
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.statistics.attributes.log import get as log_attributes
from pm4py.statistics.attributes.pandas import get as pd_attributes
from pm4py.statistics.concurrent_activities.log import get as conc_act_get
from pm4py.statistics.concurrent_activities.pandas import get as pd_conc_act
from pm4py.statistics.end_activities.log import get as log_ea
from pm4py.statistics.end_activities.pandas import get as pd_ea
from pm4py.statistics.eventually_follows.log import get as efg_get
from pm4py.statistics.eventually_follows.pandas import get as pd_efg
from pm4py.statistics.service_time.log import get as soj_get
from pm4py.statistics.service_time.pandas import get as pd_soj_time
from pm4py.statistics.start_activities.log import get as log_sa
from pm4py.statistics.start_activities.pandas import get as pd_sa
from pm4py.util import exec_utils, constants, xes_constants as xes
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
DEPENDENCY_THRESH = "dependency_thresh"
AND_MEASURE_THRESH = "and_measure_thresh"
MIN_ACT_COUNT = "min_act_count"
MIN_DFG_OCCURRENCES = "min_dfg_occurrences"
HEU_NET_DECORATION = "heu_net_decoration"
[docs]
def apply(
log: EventLog, parameters: Optional[Dict[Any, Any]] = None
) -> Tuple[PetriNet, Marking, Marking]:
"""
Discovers a Petri net using the Heuristics Miner ++ algorithm
Implements the approach described in
Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010.
https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf
Parameters
--------------
log
Event log
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.HEU_NET_DECORATION
Returns
--------------
net
Petri net
im
Initial marking
fm
Final marking
"""
heu_net = apply_heu(log, parameters=parameters)
net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters)
return net, im, fm
[docs]
def apply_pandas(
df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
) -> Tuple[PetriNet, Marking, Marking]:
"""
Discovers a Petri net using the Heuristics Miner ++ algorithm
Implements the approach described in
Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010.
https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf
Parameters
--------------
df
Dataframe
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.HEU_NET_DECORATION
Returns
--------------
net
Petri net
im
Initial marking
fm
Final marking
"""
heu_net = apply_heu_pandas(df, parameters=parameters)
net, im, fm = hn_conv_alg.apply(heu_net, parameters=parameters)
return net, im, fm
[docs]
def apply_heu(
log: EventLog, parameters: Optional[Dict[Any, Any]] = None
) -> HeuristicsNet:
"""
Discovers an heuristics net using the Heuristics Miner ++ algorithm
Implements the approach described in
Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010.
https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf
Parameters
--------------
log
Event log
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.HEU_NET_DECORATION
Returns
--------------
heu_net
Heuristics net
"""
if parameters is None:
parameters = {}
log = log_converter.apply(
log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=parameters
)
log = interval_lifecycle.to_interval(log, parameters=parameters)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY, parameters, None
)
if start_timestamp_key is None:
start_timestamp_key = xes.DEFAULT_START_TIMESTAMP_KEY
parameters = copy(parameters)
parameters[Parameters.START_TIMESTAMP_KEY] = start_timestamp_key
(
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
) = discover_abstraction_log(log, parameters=parameters)
return discover_heu_net_plus_plus(
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
parameters=parameters,
)
[docs]
def discover_abstraction_log(
log: EventLog, parameters: Optional[Dict[Any, Any]] = None
) -> Tuple[Any, Any, Any, Any, Any, Any, Any]:
"""
Discovers an abstraction from a log that is useful for the Heuristics Miner ++ algorithm
Parameters
--------------
log
Event log
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
Returns
--------------
start_activities
Start activities
end_activities
End activities
activities_occurrences
Activities along with their number of occurrences
dfg
Directly-follows graph
performance_dfg
(Performance) Directly-follows graph
sojourn_time
Sojourn time for each activity
concurrent_activities
Concurrent activities
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY
)
start_activities = log_sa.get_start_activities(log, parameters=parameters)
end_activities = log_ea.get_end_activities(log, parameters=parameters)
activities_occurrences = log_attributes.get_attribute_values(
log, activity_key, parameters=parameters
)
efg_parameters = copy(parameters)
efg_parameters[efg_get.Parameters.KEEP_FIRST_FOLLOWING] = True
dfg = efg_get.apply(log, parameters=efg_parameters)
performance_dfg = dfg_alg.apply(
log, variant=dfg_alg.Variants.PERFORMANCE, parameters=parameters
)
sojourn_time = soj_get.apply(log, parameters=parameters)
concurrent_activities = conc_act_get.apply(log, parameters=parameters)
return (
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
)
[docs]
def apply_heu_pandas(
df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
) -> HeuristicsNet:
"""
Discovers an heuristics net using the Heuristics Miner ++ algorithm
Implements the approach described in
Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010.
https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf
Parameters
--------------
df
Dataframe
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.HEU_NET_DECORATION
Returns
--------------
heu_net
Heuristics net
"""
if parameters is None:
parameters = {}
(
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
) = discover_abstraction_dataframe(df, parameters=parameters)
return discover_heu_net_plus_plus(
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
parameters=parameters,
)
[docs]
def discover_abstraction_dataframe(
df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
) -> Tuple[Any, Any, Any, Any, Any, Any, Any]:
"""
Discovers an abstraction from a dataframe that is useful for the Heuristics Miner ++ algorithm
Parameters
--------------
df
Dataframe
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.START_TIMESTAMP_KEY
- Parameters.TIMESTAMP_KEY
- Parameters.CASE_ID_KEY
Returns
--------------
start_activities
Start activities
end_activities
End activities
activities_occurrences
Activities along with their number of occurrences
dfg
Directly-follows graph
performance_dfg
(Performance) Directly-follows graph
sojourn_time
Sojourn time for each activity
concurrent_activities
Concurrent activities
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY, parameters, None
)
if start_timestamp_key is None:
start_timestamp_key = xes.DEFAULT_START_TIMESTAMP_KEY
parameters = copy(parameters)
parameters[Parameters.START_TIMESTAMP_KEY] = start_timestamp_key
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY
)
case_id_glue = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
start_activities = pd_sa.get_start_activities(df, parameters=parameters)
end_activities = pd_ea.get_end_activities(df, parameters=parameters)
activities_occurrences = pd_attributes.get_attribute_values(
df, activity_key, parameters=parameters
)
efg_parameters = copy(parameters)
efg_parameters[pd_efg.Parameters.KEEP_FIRST_FOLLOWING] = True
dfg = pd_efg.apply(df, parameters=efg_parameters)
performance_dfg = df_statistics.get_dfg_graph(
df,
case_id_glue=case_id_glue,
activity_key=activity_key,
timestamp_key=timestamp_key,
start_timestamp_key=start_timestamp_key,
measure="performance",
)
sojourn_time = pd_soj_time.apply(df, parameters=parameters)
concurrent_activities = pd_conc_act.apply(df, parameters=parameters)
return (
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
)
[docs]
def discover_heu_net_plus_plus(
start_activities,
end_activities,
activities_occurrences,
dfg,
performance_dfg,
sojourn_time,
concurrent_activities,
parameters: Optional[Dict[Any, Any]] = None,
):
"""
Discovers an heuristics net using the Heuristics Miner ++ algorithm
Implements the approach described in
Burattin, Andrea, and Alessandro Sperduti. "Heuristics Miner for Time Intervals." ESANN. 2010.
https://andrea.burattin.net/public-files/publications/2010-esann-slides.pdf
Parameters
--------------
start_activities
Start activities
end_activities
End activities
activities_occurrences
Activities along with their number of occurrences
dfg
Directly-follows graph
performance_dfg
(Performance) Directly-follows graph
sojourn_time
Sojourn time for each activity
concurrent_activities
Concurrent activities
parameters
Parameters of the algorithm, including:
- Parameters.DEPENDENCY_THRESH
- Parameters.AND_MEASURE_THRESH
- Parameters.MIN_ACT_COUNT
- Parameters.MIN_DFG_OCCURRENCES
- Parameters.HEU_NET_DECORATION
Returns
--------------
heu_net
Heuristics net
"""
if parameters is None:
parameters = {}
dependency_thresh = exec_utils.get_param_value(
Parameters.DEPENDENCY_THRESH,
parameters,
defaults.DEFAULT_DEPENDENCY_THRESH,
)
and_measure_thresh = exec_utils.get_param_value(
Parameters.AND_MEASURE_THRESH,
parameters,
defaults.DEFAULT_AND_MEASURE_THRESH,
)
min_act_count = exec_utils.get_param_value(
Parameters.MIN_ACT_COUNT, parameters, defaults.DEFAULT_MIN_ACT_COUNT
)
min_dfg_occurrences = exec_utils.get_param_value(
Parameters.MIN_DFG_OCCURRENCES,
parameters,
defaults.DEFAULT_MIN_DFG_OCCURRENCES,
)
heu_net_decoration = exec_utils.get_param_value(
Parameters.HEU_NET_DECORATION, parameters, "frequency"
)
# filter on activity and paths occurrence
activities_occurrences = {
x: y for x, y in activities_occurrences.items() if y >= min_act_count
}
dfg = {
x: y
for x, y in dfg.items()
if y >= min_dfg_occurrences
and x[0] in activities_occurrences
and x[1] in activities_occurrences
}
performance_dfg = {x: y for x, y in performance_dfg.items() if x in dfg}
start_activities = {
x: y
for x, y in start_activities.items()
if x in activities_occurrences
}
end_activities = {
x: y for x, y in end_activities.items() if x in activities_occurrences
}
activities = list(activities_occurrences.keys())
if heu_net_decoration == "frequency":
heu_net = HeuristicsNet(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
)
else:
heu_net = HeuristicsNet(
dfg,
activities=activities,
activities_occurrences=activities_occurrences,
start_activities=start_activities,
end_activities=end_activities,
performance_dfg=performance_dfg,
)
heu_net.min_dfg_occurrences = min_dfg_occurrences
heu_net.sojourn_times = sojourn_time
heu_net.concurrent_activities = concurrent_activities
return calculate(
heu_net, dependency_thresh, and_measure_thresh, heu_net_decoration
)
[docs]
def calculate(
heu_net: HeuristicsNet,
dependency_thresh: float,
and_measure_thresh: float,
heu_net_decoration: str,
) -> HeuristicsNet:
"""
Calculates the dependency matrix and the AND measures using the Heuristics Miner ++ formulas
Parameters
----------------
heu_net
Heuristics net
dependency_thresh
Dependency threshold
and_measure_thresh
AND measure threshold
heu_net_decoration
Decoration to use (frequency/performance)
Returns
----------------
heu_net
Heuristics net
"""
heu_net.performance_matrix = {}
heu_net.dependency_matrix = {}
heu_net.dfg_matrix = {}
for el in heu_net.dfg:
act1 = el[0]
act2 = el[1]
if act1 not in heu_net.dfg_matrix:
heu_net.dfg_matrix[act1] = {}
heu_net.dependency_matrix[act1] = {}
heu_net.performance_matrix[act1] = {}
heu_net.dfg_matrix[act1][act2] = heu_net.dfg[el]
heu_net.dependency_matrix[act1][act2] = -1
heu_net.performance_matrix[act1][act2] = (
heu_net.performance_dfg[el]
if heu_net.performance_dfg and el in heu_net.performance_dfg
else 0.0
)
for act1 in heu_net.activities:
heu_net.nodes[act1] = Node(
heu_net,
act1,
heu_net.activities_occurrences[act1],
node_type=heu_net.node_type,
)
# calculates the dependencies between the activities
heu_net = calculate_dependency(
heu_net, dependency_thresh, heu_net_decoration
)
# calculates the AND measure for outgoing edges (e.g. which activities
# happen in parallel after a given activity)
heu_net = calculate_and_out_measure(heu_net, and_measure_thresh)
# calculates the AND measure for ingoing edges (e.g. which activities
# happen in parallel before a given activity)
heu_net = calculate_and_in_measure(heu_net, and_measure_thresh)
return heu_net
[docs]
def calculate_dependency(
heu_net: HeuristicsNet, dependency_thresh: float, heu_net_decoration: str
) -> HeuristicsNet:
"""
Calculates the dependency matrix using the Heuristics Miner ++ formula
Parameters
--------------
heu_net
Heuristics net
dependency_thresh
Dependency threshold
heu_net_decoration
Decoration to include (frequency/performance)
Returns
---------------
heu_net
Heuristics net (enriched)
"""
for act1 in heu_net.activities:
if act1 in heu_net.dfg_matrix:
for act2 in heu_net.dfg_matrix[act1]:
v1 = heu_net.dfg_matrix[act1][act2]
v2 = (
heu_net.dfg_matrix[act2][act1]
if act2 in heu_net.dfg_matrix
and act1 in heu_net.dfg_matrix[act2]
else 0.0
)
tup = tuple(sorted((act1, act2)))
# added term for Heuristics Miner ++
v3 = (
heu_net.concurrent_activities[tup]
if tup in heu_net.concurrent_activities
else 0.0
)
dep = (v1 - v2) / (v1 + v2 + v3)
heu_net.dependency_matrix[act1][act2] = dep
if dep > dependency_thresh:
repr_value = (
v1
if heu_net_decoration == "frequency"
else heu_net.performance_matrix[act1][act2]
)
heu_net.nodes[act1].add_output_connection(
heu_net.nodes[act2], dep, v1, repr_value=repr_value
)
heu_net.nodes[act2].add_input_connection(
heu_net.nodes[act1], dep, v1, repr_value=repr_value
)
return heu_net
[docs]
def calculate_and_out_measure(
heu_net: HeuristicsNet, and_measure_thresh: float
) -> HeuristicsNet:
"""
Calculates the AND measure for outgoing edges using the Heuristics Miner ++ formula
Parameters
---------------
heu_net
Heuristics net
and_measure_thresh
And measure threshold
Returns
---------------
heu_net
Heuristics net (enriched)
"""
for act in heu_net.nodes:
nodes = sorted(
x.node_name for x in heu_net.nodes[act].output_connections
)
i = 0
while i < len(nodes):
n1 = nodes[i]
v3 = (
heu_net.dfg_matrix[act][n1]
if act in heu_net.dfg_matrix and n1 in heu_net.dfg_matrix[act]
else 0.0
)
j = i + 1
while j < len(nodes):
n2 = nodes[j]
tup = tuple(sorted((n1, n2)))
v1 = (
heu_net.dfg_matrix[n1][n2]
if n1 in heu_net.dfg_matrix
and n2 in heu_net.dfg_matrix[n1]
else 0.0
)
v2 = (
heu_net.dfg_matrix[n2][n1]
if n2 in heu_net.dfg_matrix
and n1 in heu_net.dfg_matrix[n2]
else 0.0
)
v4 = (
heu_net.dfg_matrix[act][n2]
if act in heu_net.dfg_matrix
and n2 in heu_net.dfg_matrix[act]
else 0.0
)
# added term for Heuristics Miner ++
v5 = (
heu_net.concurrent_activities[tup]
if tup in heu_net.concurrent_activities
else 0.0
)
this_value = (v1 + v2 + v5) / (v3 + v4)
if this_value > and_measure_thresh:
if n1 not in heu_net.nodes[act].and_measures_out:
heu_net.nodes[act].and_measures_out[n1] = {}
heu_net.nodes[act].and_measures_out[n1][n2] = this_value
j = j + 1
i = i + 1
return heu_net
[docs]
def calculate_and_in_measure(
heu_net: HeuristicsNet, and_measure_thresh: float
) -> HeuristicsNet:
"""
Calculates the AND measure for incoming edges using the Heuristics Miner ++ formula
Parameters
---------------
heu_net
Heuristics net
and_measure_thresh
And measure threshold
Returns
---------------
heu_net
Heuristics net (enriched)
"""
for act in heu_net.nodes:
nodes = sorted(
x.node_name for x in heu_net.nodes[act].input_connections
)
i = 0
while i < len(nodes):
n1 = nodes[i]
v3 = (
heu_net.dfg_matrix[n1][act]
if n1 in heu_net.dfg_matrix and act in heu_net.dfg_matrix[n1]
else 0.0
)
j = i + 1
while j < len(nodes):
n2 = nodes[j]
tup = tuple(sorted((n1, n2)))
v1 = (
heu_net.dfg_matrix[n1][n2]
if n1 in heu_net.dfg_matrix
and n2 in heu_net.dfg_matrix[n1]
else 0.0
)
v2 = (
heu_net.dfg_matrix[n2][n1]
if n2 in heu_net.dfg_matrix
and n1 in heu_net.dfg_matrix[n2]
else 0.0
)
v4 = (
heu_net.dfg_matrix[n2][act]
if n2 in heu_net.dfg_matrix
and act in heu_net.dfg_matrix[n2]
else 0.0
)
# added term for Heuristics Miner ++
v5 = (
heu_net.concurrent_activities[tup]
if tup in heu_net.concurrent_activities
else 0.0
)
this_value = (v1 + v2 + v5) / (v3 + v4)
if this_value > and_measure_thresh:
if n1 not in heu_net.nodes[act].and_measures_in:
heu_net.nodes[act].and_measures_in[n1] = {}
heu_net.nodes[act].and_measures_in[n1][n2] = this_value
j = j + 1
i = i + 1
return heu_net
[docs]
def apply_dfg(
dfg,
activities=None,
activities_occurrences=None,
start_activities=None,
end_activities=None,
parameters=None,
):
raise Exception("not implemented for plusplus version")
[docs]
def apply_heu_dfg(
dfg,
activities=None,
activities_occurrences=None,
start_activities=None,
end_activities=None,
dfg_window_2=None,
freq_triples=None,
parameters=None,
):
raise Exception("not implemented for plusplus version")