'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from pm4py.objects.log.obj import EventLog
from pm4py.algo.discovery.declare.templates import *
import pandas as pd
from typing import Union, Dict, Optional, Any, Tuple, Collection, Set, List
from typing import Counter as TCounter
from pm4py.util import exec_utils, constants, xes_constants, pandas_utils
from collections import Counter
import numpy as np
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
CONSIDERED_ACTIVITIES = "considered_activities"
MIN_SUPPORT_RATIO = "min_support_ratio"
MIN_CONFIDENCE_RATIO = "min_confidence_ratio"
AUTO_SELECTION_MULTIPLIER = "auto_selection_multiplier"
ALLOWED_TEMPLATES = "allowed_templates"
def __rule_existence_column(act: str) -> Tuple[str, str]:
return (EXISTENCE, act)
def __rule_exactly_one_column(act: str) -> Tuple[str, str]:
return (EXACTLY_ONE, act)
def __rule_init_column(act: str) -> Tuple[str, str]:
return (INIT, act)
def __rule_responded_existence_column(
act: str, act2: str
) -> Tuple[str, str, str]:
return (RESPONDED_EXISTENCE, act, act2)
def __rule_response(act: str, act2: str) -> Tuple[str, str, str]:
return (RESPONSE, act, act2)
def __rule_precedence(act: str, act2: str) -> Tuple[str, str, str]:
return (PRECEDENCE, act, act2)
def __rule_succession(act: str, act2: str) -> Tuple[str, str, str]:
return (SUCCESSION, act, act2)
def __rule_alternate_response(act: str, act2: str) -> Tuple[str, str, str]:
return (ALTRESPONSE, act, act2)
def __rule_alternate_precedence(act: str, act2: str) -> Tuple[str, str, str]:
return (ALTPRECEDENCE, act, act2)
def __rule_alternate_succession(act: str, act2: str) -> Tuple[str, str, str]:
return (ALTSUCCESSION, act, act2)
def __rule_chain_response(act: str, act2: str) -> Tuple[str, str, str]:
return (CHAINRESPONSE, act, act2)
def __rule_chain_precedence(act: str, act2: str) -> Tuple[str, str, str]:
return (CHAINPRECEDENCE, act, act2)
def __rule_chain_succession(act: str, act2: str) -> Tuple[str, str, str]:
return (CHAINSUCCESSION, act, act2)
def __rule_absence_act(act: str) -> Tuple[str, str]:
return (ABSENCE, act)
def __rule_coexistence(act: str, act2: str) -> Tuple[str, str, str]:
return (COEXISTENCE, act, act2)
def __rule_non_coexistence(act: str, act2: str) -> Tuple[str, str, str]:
return (NONCOEXISTENCE, act, act2)
def __rule_non_succession(act: str, act2: str) -> Tuple[str, str, str]:
return (NONSUCCESSION, act, act2)
def __rule_non_chain_succession(act: str, act2: str) -> Tuple[str, str, str]:
return (NONCHAINSUCCESSION, act, act2)
def __col_to_dict_rule(
col_name: Union[Tuple[str, str], Tuple[str, str, str]]
) -> Tuple[str, Any]:
if len(col_name) == 2:
return col_name[0], col_name[1]
else:
if col_name[2] is None or pd.isna(col_name[2]) or not col_name[2]:
return col_name[0], col_name[1]
return col_name[0], (col_name[1], col_name[2])
[docs]
def existence_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if EXISTENCE in allowed_templates:
for act in activities:
if act in act_counter:
rules[__rule_existence_column(act)] = 1
else:
rules[__rule_existence_column(act)] = -1
[docs]
def exactly_one_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if EXACTLY_ONE in allowed_templates:
for act in activities:
if act in act_counter:
if act_counter[act] == 1:
rules[__rule_exactly_one_column(act)] = 1
else:
rules[__rule_exactly_one_column(act)] = -1
[docs]
def init_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if INIT in allowed_templates:
for act in activities:
if act == trace[0]:
rules[__rule_init_column(act)] = 1
else:
rules[__rule_init_column(act)] = -1
[docs]
def responded_existence_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if RESPONDED_EXISTENCE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
rules[__rule_responded_existence_column(act, act2)] = (
-1
)
else:
rules[__rule_responded_existence_column(act, act2)] = 1
[docs]
def response_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if RESPONSE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
rules[__rule_response(act, act2)] = -1
else:
if act_idxs[act][-1] < act_idxs[act2][-1]:
rules[__rule_response(act, act2)] = 1
else:
rules[__rule_response(act, act2)] = -1
[docs]
def precedence_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if PRECEDENCE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
pass
else:
if act_idxs[act2][0] < act_idxs[act][0]:
rules[__rule_precedence(act2, act)] = 1
else:
rules[__rule_precedence(act2, act)] = -1
[docs]
def altresponse_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if ALTRESPONSE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
rules[__rule_alternate_response(act, act2)] = -1
else:
is_ok_alt_resp = False
if len(act_idxs[act]) == len(act_idxs[act2]):
lenn = len(act_idxs[act])
is_ok_alt_resp = True
for i in range(lenn):
if act_idxs[act][i] > act_idxs[act2][i] or (
i < lenn - 1
and act_idxs[act][i + 1]
< act_idxs[act2][i]
):
is_ok_alt_resp = False
break
elif act_idxs[act][i] + 1 != act_idxs[act2][i]:
pass
rules[__rule_alternate_response(act, act2)] = (
1 if is_ok_alt_resp else -1
)
[docs]
def chainresponse_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if CHAINRESPONSE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
rules[__rule_chain_response(act, act2)] = -1
else:
is_ok_chain_resp = False
if len(act_idxs[act]) == len(act_idxs[act2]):
lenn = len(act_idxs[act])
is_ok_chain_resp = True
for i in range(lenn):
if act_idxs[act][i] > act_idxs[act2][i] or (
i < lenn - 1
and act_idxs[act][i + 1]
< act_idxs[act2][i]
):
is_ok_chain_resp = False
break
elif act_idxs[act][i] + 1 != act_idxs[act2][i]:
is_ok_chain_resp = False
break
rules[__rule_chain_response(act, act2)] = (
1 if is_ok_chain_resp else -1
)
[docs]
def altprecedence_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if ALTPRECEDENCE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
pass
else:
is_ok_alt_prec = False
if len(act_idxs[act]) == len(act_idxs[act2]):
lenn = len(act_idxs[act])
is_ok_alt_prec = True
for i in range(lenn):
if act_idxs[act2][i] > act_idxs[act][i] or (
i < lenn - 1
and act_idxs[act2][i + 1]
< act_idxs[act][i]
):
is_ok_alt_prec = False
break
elif act_idxs[act2][i] + 1 != act_idxs[act][i]:
pass
rules[__rule_alternate_precedence(act2, act)] = (
1 if is_ok_alt_prec else -1
)
[docs]
def chainprecedence_template_step1(
rules: Dict[Union[Tuple[str, str], Tuple[str, str, str]], int],
trace: Collection[str],
activities: Set[str],
act_counter: TCounter[str],
act_idxs: Dict[str, List[int]],
allowed_templates: Collection[str],
):
if CHAINPRECEDENCE in allowed_templates:
for act in act_counter:
for act2 in activities:
if act2 != act:
if act2 not in act_counter:
pass
else:
is_ok_chain_prec = False
if len(act_idxs[act]) == len(act_idxs[act2]):
lenn = len(act_idxs[act])
is_ok_chain_prec = True
# check alternate and chain response
for i in range(lenn):
if act_idxs[act2][i] > act_idxs[act][i] or (
i < lenn - 1
and act_idxs[act2][i + 1]
< act_idxs[act][i]
):
is_ok_chain_prec = False
break
elif act_idxs[act2][i] + 1 != act_idxs[act][i]:
is_ok_chain_prec = False
break
rules[__rule_chain_precedence(act2, act)] = (
1 if is_ok_chain_prec else -1
)
[docs]
def absence_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if ABSENCE in allowed_templates and EXISTENCE in allowed_templates:
for act in activities:
table[__rule_absence_act(act)] = (
-1 * table[__rule_existence_column(act)]
)
return table
[docs]
def exactly_one_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if EXACTLY_ONE in allowed_templates:
for act in activities:
if __rule_exactly_one_column(act) not in columns:
table[__rule_exactly_one_column(act)] = [0] * len(
table[list(table.keys())[0]]
)
return table
[docs]
def responded_existence_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if RESPONDED_EXISTENCE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if (
__rule_responded_existence_column(act, act2)
not in columns
):
table[__rule_responded_existence_column(act, act2)] = [
0
] * len(table[list(table.keys())[0]])
return table
[docs]
def response_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if RESPONSE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_response(act, act2) not in columns:
table[__rule_response(act, act2)] = [0] * len(
table[list(table.keys())[0]]
)
return table
[docs]
def precedence_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if PRECEDENCE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_precedence(act, act2) not in columns:
table[__rule_precedence(act, act2)] = [0] * len(
table[list(table.keys())[0]]
)
return table
[docs]
def altresponse_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if ALTRESPONSE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_alternate_response(act, act2) not in columns:
table[__rule_alternate_response(act, act2)] = [
0
] * len(table[list(table.keys())[0]])
return table
[docs]
def chainresponse_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if CHAINRESPONSE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_chain_response(act, act2) not in columns:
table[__rule_chain_response(act, act2)] = [0] * len(
table[list(table.keys())[0]]
)
return table
[docs]
def altprecedence_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if ALTPRECEDENCE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_alternate_precedence(act, act2) not in columns:
table[__rule_alternate_precedence(act, act2)] = [
0
] * len(table[list(table.keys())[0]])
return table
[docs]
def chainprecedence_template_step2(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if CHAINPRECEDENCE in allowed_templates:
for act in activities:
for act2 in activities:
if act2 != act:
if __rule_chain_precedence(act, act2) not in columns:
table[__rule_chain_precedence(act, act2)] = [0] * len(
table[list(table.keys())[0]]
)
return table
[docs]
def succession_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
SUCCESSION in allowed_templates
and RESPONSE in allowed_templates
and PRECEDENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_succession(act, act2)] = np.minimum(
table[__rule_response(act, act2)],
table[__rule_precedence(act, act2)],
)
return table
[docs]
def altsuccession_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
ALTSUCCESSION in allowed_templates
and ALTRESPONSE in allowed_templates
and ALTPRECEDENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_alternate_succession(act, act2)] = np.minimum(
table[__rule_alternate_response(act, act2)],
table[__rule_alternate_precedence(act, act2)],
)
return table
[docs]
def chainsuccession_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
CHAINSUCCESSION in allowed_templates
and CHAINRESPONSE in allowed_templates
and CHAINPRECEDENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_chain_succession(act, act2)] = np.minimum(
table[__rule_chain_response(act, act2)],
table[__rule_chain_precedence(act, act2)],
)
return table
[docs]
def coexistence_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
COEXISTENCE in allowed_templates
and RESPONDED_EXISTENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_coexistence(act, act2)] = np.minimum(
table[__rule_responded_existence_column(act, act2)],
table[__rule_responded_existence_column(act2, act)],
)
return table
[docs]
def noncoexistence_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
NONCOEXISTENCE in allowed_templates
and COEXISTENCE in allowed_templates
and RESPONDED_EXISTENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_non_coexistence(act, act2)] = (
-1 * table[__rule_coexistence(act, act2)]
)
return table
[docs]
def nonsuccession_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
NONSUCCESSION in allowed_templates
and SUCCESSION in allowed_templates
and RESPONSE in allowed_templates
and PRECEDENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_non_succession(act, act2)] = (
-1 * table[__rule_succession(act, act2)]
)
return table
[docs]
def nonchainsuccession_template(
table,
columns: Collection[str],
activities: Set[str],
allowed_templates: Collection[str],
) -> pd.DataFrame:
if (
NONCHAINSUCCESSION in allowed_templates
and CHAINSUCCESSION in allowed_templates
and CHAINRESPONSE in allowed_templates
and CHAINPRECEDENCE in allowed_templates
):
for act in activities:
for act2 in activities:
if act2 != act:
table[__rule_non_chain_succession(act, act2)] = (
-1 * table[__rule_chain_succession(act, act2)]
)
return table
[docs]
def get_rules_from_rules_df(
rules_df, parameters: Optional[Dict[Any, Any]] = None
) -> Dict[str, Dict[Any, Dict[str, int]]]:
if parameters is None:
parameters = {}
min_support_ratio = exec_utils.get_param_value(
Parameters.MIN_SUPPORT_RATIO, parameters, None
)
min_confidence_ratio = exec_utils.get_param_value(
Parameters.MIN_CONFIDENCE_RATIO, parameters, None
)
rules = {}
if min_support_ratio is None and min_confidence_ratio is None:
# auto determine the minimum support and confidence ratio by
# identifying the values for the best feature
auto_selection_multiplier = exec_utils.get_param_value(
Parameters.AUTO_SELECTION_MULTIPLIER, parameters, 0.8
)
cols_prod = []
for col_name in rules_df:
col = rules_df[col_name]
supp = len(col[col != 0])
supp_ratio = float(supp) / float(len(rules_df))
if supp_ratio > 0:
conf_ratio = float(len(col[col == 1])) / float(supp)
prod = supp_ratio * conf_ratio
cols_prod.append((col_name, prod))
cols_prod = sorted(cols_prod, key=lambda x: (x[1], x[0]), reverse=True)
col = rules_df[cols_prod[0][0]]
supp = len(col[col != 0])
min_support_ratio = (
float(supp) / float(len(rules_df)) * auto_selection_multiplier
)
min_confidence_ratio = (
float(len(col[col == 1])) / float(supp) * auto_selection_multiplier
)
if rules_df is not None and len(rules_df) > 0:
for col_name in rules_df:
col = rules_df[col_name]
supp = len(col[col != 0])
if supp >= len(rules_df) * min_support_ratio:
conf = len(col[col == 1])
if conf >= supp * min_confidence_ratio:
rule, key = __col_to_dict_rule(col_name)
if rule not in rules:
rules[rule] = {}
rules[rule][key] = {"support": supp, "confidence": conf}
return rules
[docs]
def apply(
log: Union[EventLog, pd.DataFrame],
parameters: Optional[Dict[Any, Any]] = None,
) -> Dict[str, Dict[Any, Dict[str, int]]]:
"""
Discovers a DECLARE model from the provided event log
Paper:
F. M. Maggi, A. J. Mooij and W. M. P. van der Aalst, "User-guided discovery of declarative process models," 2011 IEEE Symposium on Computational Intelligence and Data Mining (CIDM), Paris, France, 2011, pp. 192-199, doi: 10.1109/CIDM.2011.5949297.
Parameters
---------------
log
Log object (EventLog, Pandas table)
parameters
Possible parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY
- Parameters.CONSIDERED_ACTIVITIES
- Parameters.MIN_SUPPORT_RATIO
- Parameters.MIN_CONFIDENCE_RATIO
- Parameters.AUTO_SELECTION_MULTIPLIER
- Parameters.ALLOWED_TEMPLATES: collection of templates to consider, including:
* existence
* exactly_one
* init
* responded_existence
* response
* precedence
* succession
* altresponse
* altprecedence
* altsuccession
* chainresponse
* chainprecedence
* chainsuccession
* absence
* coexistence
* noncoexistence
* nonsuccession
* nonchainsuccession
Returns
-------------
declare_model
DECLARE model (as Python dictionary), where each template is associated with its own rules
"""
if parameters is None:
parameters = {}
rules_df = form_rules_table(log, parameters=parameters)
rules = get_rules_from_rules_df(rules_df, parameters=parameters)
return rules