Source code for pm4py.streaming.algo.conformance.declare.variants.automata
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import logging
from pm4py.streaming.algo.interface import StreamingAlgorithm
[docs]
class DeclareStreamingConformance(StreamingAlgorithm):
"""
Streaming Conformance Checking Algorithm for DECLARE models.
Attempts to implement state-based checks for all Declare constraint types.
When a violation occurs, prints out which constraints are violated.
Implementation of:
Maggi, Fabrizio Maria, et al. "Monitoring business constraints with linear temporal logic: An approach based on colored automata." Business Process Management: 9th International Conference, BPM 2011, Clermont-Ferrand, France, August 30-September 2, 2011. Proceedings 9. Springer Berlin Heidelberg, 2011.
"""
def __init__(self, declare_model, parameters=None):
super().__init__(parameters=parameters)
self.declare_model = declare_model
self._cases = {}
self._total_events = 0
self._total_deviations = 0
self._deviations_per_time = []
# Parse and build automata for all constraints
self._constraints = self._parse_declare_model(declare_model)
def _parse_declare_model(self, model):
constraints = {}
for template_type, activities_dict in model.items():
constraints[template_type] = {}
for activities, _params in activities_dict.items():
if isinstance(activities, str):
activities = (activities,)
constraints[template_type][activities] = (
self._create_automaton_for_constraint(
template_type, activities
)
)
return constraints
# Automaton construction methods start here
def _dummy_automaton(self):
return {
"initial_state": {"name": "init"},
"states": {"init": {"on_event": lambda a, e, s: ("init", False)}},
}
def _existence_automaton(self, A):
def on_event_init(a, e, s):
if e.get("concept:name") == A:
return ("seen", False)
return ("init", False)
def on_event_seen(a, e, s):
return ("seen", False)
return {
"initial_state": {"name": "init"},
"states": {
"init": {"on_event": on_event_init},
"seen": {"on_event": on_event_seen},
},
}
def _absence_automaton(self, A):
def on_event_init(a, e, s):
if e.get("concept:name") == A:
return ("violated", True)
return ("init", False)
return {
"initial_state": {"name": "init"},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _exactly_one_automaton(self, A):
def on_event_init(a, e, s):
if e.get("concept:name") == A:
return ("seen_once", False)
return ("init", False)
def on_event_seen_once(a, e, s):
if e.get("concept:name") == A:
return ("violated", True)
return ("seen_once", False)
return {
"initial_state": {"name": "init"},
"states": {
"init": {"on_event": on_event_init},
"seen_once": {"on_event": on_event_seen_once},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _init_automaton(self, A):
def on_event_init(a, e, s):
if s.get("received_first", False) is False:
s["received_first"] = True
if e.get("concept:name") != A:
return ("violated", True)
else:
return ("ok", False)
else:
return ("ok", False)
return {
"initial_state": {"name": "init", "received_first": False},
"states": {
"init": {"on_event": on_event_init},
"ok": {"on_event": lambda a, e, s: ("ok", False)},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _responded_existence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == A:
s["A_seen"] = True
if act == B:
s["B_seen"] = True
# No immediate violation, since we can't confirm end of trace
return ("init", False)
return {
"initial_state": {
"name": "init",
"A_seen": False,
"B_seen": False,
},
"states": {"init": {"on_event": on_event_init}},
}
def _coexistence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == A:
s["A_seen"] = True
if act == B:
s["B_seen"] = True
return ("init", False)
return {
"initial_state": {
"name": "init",
"A_seen": False,
"B_seen": False,
},
"states": {"init": {"on_event": on_event_init}},
}
def _response_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
pA = s.get("pending_As", 0)
if act == A:
pA += 1
if act == B and pA > 0:
pA -= 1
s["pending_As"] = pA
return ("init", False)
return {
"initial_state": {"name": "init", "pending_As": 0},
"states": {"init": {"on_event": on_event_init}},
}
def _precedence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == B and not s.get("A_occurred", False):
return ("violated", True)
if act == A:
s["A_occurred"] = True
return ("init", False)
return {
"initial_state": {"name": "init", "A_occurred": False},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _succession_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
pA = s.get("pending_As", 0)
if act == B and not s.get("A_seen", False):
return ("violated", True)
if act == A:
s["A_seen"] = True
pA += 1
elif act == B and pA > 0:
pA -= 1
s["pending_As"] = pA
return ("init", False)
return {
"initial_state": {
"name": "init",
"A_seen": False,
"pending_As": 0,
},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _altresponse_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == A:
if s.get("waiting_for_B", False):
return ("violated", True)
s["waiting_for_B"] = True
if act == B and s.get("waiting_for_B", False):
s["waiting_for_B"] = False
return ("init", False)
return {
"initial_state": {"name": "init", "waiting_for_B": False},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _altprecedence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == B:
if s.get("waiting_for_A", True):
return ("violated", True)
s["waiting_for_A"] = True
if act == A:
s["waiting_for_A"] = False
return ("init", False)
return {
"initial_state": {"name": "init", "waiting_for_A": True},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _altsuccession_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
wB = s.get("waiting_for_B", False)
wA = s.get("waiting_for_A", True)
if act == A:
if wB:
return ("violated", True)
wB = True
wA = False
elif act == B:
if wA:
return ("violated", True)
wA = True
wB = False
s["waiting_for_A"] = wA
s["waiting_for_B"] = wB
return ("init", False)
return {
"initial_state": {
"name": "init",
"waiting_for_B": False,
"waiting_for_A": True,
},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _chainresponse_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
expB = s.get("expecting_B", False)
if expB:
if act != B:
return ("violated", True)
s["expecting_B"] = False
if act == A:
s["expecting_B"] = True
return ("init", False)
return {
"initial_state": {"name": "init", "expecting_B": False},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _chainprecedence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
last = s.get("last", None)
if act == B and last != A:
return ("violated", True)
s["last"] = act
return ("init", False)
return {
"initial_state": {"name": "init", "last": None},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _chainsuccession_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
expB = s.get("expecting_B", False)
last = s.get("last", None)
if expB:
if act != B:
return ("violated", True)
expB = False
if act == B and last != A:
return ("violated", True)
if act == A:
expB = True
s["expecting_B"] = expB
s["last"] = act
return ("init", False)
return {
"initial_state": {
"name": "init",
"expecting_B": False,
"last": None,
},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _noncoexistence_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == A:
s["A_seen"] = True
if s.get("B_seen", False):
return ("violated", True)
if act == B:
s["B_seen"] = True
if s.get("A_seen", False):
return ("violated", True)
return ("init", False)
return {
"initial_state": {
"name": "init",
"A_seen": False,
"B_seen": False,
},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _nonsuccession_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
if act == A:
s["A_occurred"] = True
if act == B and s.get("A_occurred", False):
return ("violated", True)
return ("init", False)
return {
"initial_state": {"name": "init", "A_occurred": False},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _nonchainsuccession_automaton(self, A, B):
def on_event_init(a, e, s):
act = e.get("concept:name")
lA = s.get("last_was_A", False)
if lA and act == B:
return ("violated", True)
lA = act == A
s["last_was_A"] = lA
return ("init", False)
return {
"initial_state": {"name": "init", "last_was_A": False},
"states": {
"init": {"on_event": on_event_init},
"violated": {"on_event": lambda a, e, s: ("violated", False)},
},
}
def _create_automaton_for_constraint(self, template_type, activities):
if template_type == "existence":
return self._existence_automaton(activities[0])
elif template_type == "absence":
return self._absence_automaton(activities[0])
elif template_type == "exactly_one":
return self._exactly_one_automaton(activities[0])
elif template_type == "init":
return self._init_automaton(activities[0])
elif template_type == "responded_existence":
return self._responded_existence_automaton(
activities[0], activities[1]
)
elif template_type == "coexistence":
return self._coexistence_automaton(activities[0], activities[1])
elif template_type == "response":
return self._response_automaton(activities[0], activities[1])
elif template_type == "precedence":
return self._precedence_automaton(activities[0], activities[1])
elif template_type == "succession":
return self._succession_automaton(activities[0], activities[1])
elif template_type == "altresponse":
return self._altresponse_automaton(activities[0], activities[1])
elif template_type == "altprecedence":
return self._altprecedence_automaton(activities[0], activities[1])
elif template_type == "altsuccession":
return self._altsuccession_automaton(activities[0], activities[1])
elif template_type == "chainresponse":
return self._chainresponse_automaton(activities[0], activities[1])
elif template_type == "chainprecedence":
return self._chainprecedence_automaton(
activities[0], activities[1]
)
elif template_type == "chainsuccession":
return self._chainsuccession_automaton(
activities[0], activities[1]
)
elif template_type == "noncoexistence":
return self._noncoexistence_automaton(activities[0], activities[1])
elif template_type == "nonsuccession":
return self._nonsuccession_automaton(activities[0], activities[1])
elif template_type == "nonchainsuccession":
return self._nonchainsuccession_automaton(
activities[0], activities[1]
)
else:
return self._dummy_automaton()
def _process(self, event):
case_id = event.get("case:concept:name", "undefined_case")
self._total_events += 1
if case_id not in self._cases:
case_data = {"constraints_state": {}, "deviations": 0, "events": 0}
for template_type, constraints_dict in self._constraints.items():
for activities, automaton in constraints_dict.items():
state_data = dict(automaton["initial_state"])
case_data["constraints_state"][
(template_type, activities)
] = (state_data["name"], state_data)
self._cases[case_id] = case_data
self._cases[case_id]["events"] += 1
current_case_data = self._cases[case_id]
deviations_in_this_event = 0
violated_constraints = []
for (template_type, activities), (
state_name,
state_data,
) in current_case_data["constraints_state"].items():
automaton = self._constraints[template_type][activities]
current_state = automaton["states"][state_name]
on_event = current_state["on_event"]
new_state_name, violated = on_event(automaton, event, state_data)
current_case_data["constraints_state"][
(template_type, activities)
] = (new_state_name, state_data)
if violated:
deviations_in_this_event += 1
violated_constraints.append((template_type, activities))
if deviations_in_this_event > 0:
current_case_data["deviations"] += deviations_in_this_event
self._total_deviations += deviations_in_this_event
# Print types of violated constraints
violated_types = [vt for vt, _acts in violated_constraints]
logging.error(
f"Case {case_id} - Deviations detected: {deviations_in_this_event}. Violated constraint types: {violated_types}")
timestamp = event.get("time:timestamp", self._total_events)
self._deviations_per_time.append((timestamp, deviations_in_this_event))
def _current_result(self):
result = {
"total_events_processed": self._total_events,
"total_deviations": self._total_deviations,
"deviations_per_time": self._deviations_per_time,
"cases": {},
}
for c_id, c_data in self._cases.items():
constraints_state = {}
for k, (st_name, st_data) in c_data["constraints_state"].items():
constraints_state[str(k)] = st_name
result["cases"][c_id] = {
"events": c_data["events"],
"deviations": c_data["deviations"],
"constraints_state": constraints_state,
}
return result
[docs]
def apply(declare_model, parameters=None):
return DeclareStreamingConformance(declare_model, parameters)