'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import copy
from pm4py.objects.petri_net import properties as petri_properties
from pm4py.objects.petri_net.sem_interface import Semantics
import re
security_pattern = re.compile(r"[.]|\\x[0-9a-fA-F]+")
[docs]
class DataPetriNetSemantics(Semantics):
[docs]
def is_enabled(self, t, pn, m, **kwargs):
"""
Verifies whether a given transition is enabled in a given Petri net and marking
Parameters
----------
:param t: transition to check
:param pn: Petri net
:param m: marking to check
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: true if enabled, false otherwise
"""
e = kwargs["e"] if "e" in kwargs else {}
return is_enabled(t, pn, m, e)
[docs]
def execute(self, t, pn, m, **kwargs):
"""
Executes a given transition in a given Petri net, the given data marking and the associated event
Parameters
----------
:param t: transition to execute
:param pn: Petri net
:param m: marking to use
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: newly reached marking if :param t: is enabled, None otherwise
"""
e = kwargs["e"] if "e" in kwargs else {}
return execute(t, pn, m, e)
[docs]
def weak_execute(self, t, pn, m, **kwargs):
"""
Executes a given transition in a given Petri net, the given data marking and the associated event,
even if not fully enabled
Parameters
----------
:param t: transition to execute
:param pn: Petri net
:param m: marking to use
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: newly reached marking
"""
e = kwargs["e"] if "e" in kwargs else {}
return weak_execute(t, m, e)
[docs]
def enabled_transitions(self, pn, m, **kwargs):
"""
Returns a set of enabled transitions in a Petri net, the given data marking and the associated event
Parameters
----------
:param pn: Petri net
:param m: marking of the pn
:param e: associated event (optional, as keyword argument)
Returns
-------
:return: set of enabled transitions
"""
e = kwargs["e"] if "e" in kwargs else {}
return enabled_transitions(pn, m, e)
[docs]
def check_guard_safety(guard):
"""
Checks the security of a guard before evaluating that
Parameters
----------------
guard
Guard
Returns
----------------
safety
True if the guard is safe to execute, False otherwise
"""
return not security_pattern.search(guard)
[docs]
def evaluate_guard(guard, read_variables, data):
"""
Evaluates a data Petri net guard given the current data
Parameters
------------------
guard
Guard
read_variables
Read variables
Returns
------------------
boolean
Boolean value
"""
guard = (
guard.replace("&&", " and ")
.replace("||", " or ")
.replace("true", "True")
.replace("false", "False")
)
try:
dct = {}
for k in read_variables:
dct[k] = None
for k, v in data.items():
dct[k] = v
if check_guard_safety(guard):
ret = eval(guard, dct)
return ret
return False
except BaseException:
# the guard could not be evaluated (for example, given missing data)
return False
# 29/08/2021: the following methods have been incapsulated in the DataPetriNetSemantics class.
# the long term idea is to remove them. However, first we need to adapt the existing code to the new
# structure. Moreover, for performance reason, it is better to leave the code here, without having
# to instantiate a DataPetriNetSemantics object.
[docs]
def is_enabled(t, pn, m, e):
if t not in pn.transitions:
return False
else:
for a in t.in_arcs:
if m[a.source] < a.weight:
return False
if petri_properties.TRANS_GUARD in t.properties:
guard = t.properties[petri_properties.TRANS_GUARD]
read_variables = (
t.properties[petri_properties.READ_VARIABLE]
if petri_properties.READ_VARIABLE in t.properties
else []
)
data = copy.copy(m.data_dict)
data.update(e)
evaluate_guard(guard, read_variables, data)
return True
[docs]
def execute(t, pn, m, e):
if not is_enabled(t, pn, m, e):
return None
m_out = copy.copy(m)
for a in t.in_arcs:
m_out[a.source] -= a.weight
if m_out[a.source] == 0:
del m_out[a.source]
for a in t.out_arcs:
m_out[a.target] += a.weight
m_out.data_dict.update(e)
return m_out
[docs]
def weak_execute(t, m, e):
m_out = copy.copy(m)
for a in t.in_arcs:
m_out[a.source] -= a.weight
if m_out[a.source] <= 0:
del m_out[a.source]
for a in t.out_arcs:
m_out[a.target] += a.weight
m_out.data_dict.update(e)
return m_out
[docs]
def enabled_transitions(pn, m, e):
enabled = set()
for t in pn.transitions:
if is_enabled(t, pn, m, e):
enabled.add(t)
return enabled