'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import copy
from pm4py.objects.bpmn.util.bpmn_utils import *
from pm4py.objects.bpmn.obj import BPMN
from itertools import combinations, chain
import operator
[docs]
def is_enabled(node, bpmn, m):
if node not in bpmn.get_nodes():
return False
elif (
isinstance(node, (BPMN.ParallelGateway, BPMN.InclusiveGateway))
and node.get_gateway_direction() == BPMN.Gateway.Direction.CONVERGING
):
if m[node] < len(node.get_in_arcs()):
return False
else:
if m[node] < 1:
return False
return True
[docs]
def execute(node, bpmn, m):
if not is_enabled(node, bpmn, m):
return None
return weak_execute(node, m)
[docs]
def try_to_execute(node, bpmn, m):
if not is_enabled(node, bpmn, m):
return None
else:
return execute(node, bpmn, m)
[docs]
def add_vector(a, b):
return list(map(operator.add, a, b))
[docs]
def sub_vector(a, b):
return list(map(operator.sub, a, b))
[docs]
def power_set(iterable, min=1):
s = list(iterable)
return chain.from_iterable(
combinations(s, r) for r in range(min, len(s) + 1)
)
[docs]
def weak_execute(node, m, bpmn_graph):
"""
Execute a transition even if it is not fully enabled
Returns multiple possible markings if the node is a gate
"""
# this first if is a workaround. env events inside subprocesses will be handled explicitly
# tokens flow out to the first element outside the subprocess. the
# activated boundary events become deactivated
if (
isinstance(node, (BPMN.NormalEndEvent, BPMN.TerminateEndEvent))
and node.get_process() != bpmn_graph.get_process_id()
):
m_out = copy.copy(m)
# reset marking at current node
del m_out[node]
sub_process_node = get_node_by_id(node.get_process(), bpmn_graph)
# delete markings inside subprocess and from other enabled message
# transitions
for key in get_all_nodes_inside_process(
node.get_process(), bpmn_graph
):
if key in m_out:
del m_out[key]
for key in get_boundary_events_of_activity(
node.get_process(), bpmn_graph
):
if key in m_out and key != node:
del m_out[key]
# add marking at outgoing flow
if len(sub_process_node.get_out_arcs()) > 0:
target_node = sub_process_node.get_out_arcs()[0].target
execute_token_flow(target_node, m_out, bpmn_graph)
return [m_out]
elif isinstance(node, (BPMN.Event, BPMN.Activity)) or (
isinstance(node, BPMN.Gateway)
and node.get_gateway_direction() == BPMN.Gateway.Direction.CONVERGING
):
m_out = copy.copy(m)
# reset marking at current node
del m_out[node]
# add marking at outgoing flow
if len(node.get_out_arcs()) > 0:
target_node = node.get_out_arcs()[0].target
execute_token_flow(target_node, m_out, bpmn_graph)
# external subprocess cancellation
if isinstance(node, BPMN.MessageBoundaryEvent):
# delete markings inside subprocess and from other enabled message
# transitions
for key in get_all_nodes_inside_process(
node.get_activity(), bpmn_graph
):
if key in m_out:
del m_out[key]
for key in get_boundary_events_of_activity(
node.get_activity(), bpmn_graph
):
if key in m_out and key != node:
del m_out[key]
return [m_out]
elif isinstance(node, BPMN.Gateway):
m_outs = []
if node.get_gateway_direction() == BPMN.Gateway.Direction.DIVERGING:
if isinstance(node, BPMN.ParallelGateway):
m_out = copy.copy(m)
del m_out[node]
for out_flow in node.get_out_arcs():
execute_token_flow(out_flow.target, m_out, bpmn_graph)
m_outs.append(m_out)
elif isinstance(node, BPMN.ExclusiveGateway):
for out_flow in node.get_out_arcs():
m_out = copy.copy(m)
del m_out[node]
execute_token_flow(out_flow.target, m_out, bpmn_graph)
m_outs.append(m_out)
elif isinstance(node, BPMN.InclusiveGateway):
for out_flows in power_set(node.get_out_arcs()):
for out_flow in out_flows:
m_out = copy.copy(m)
del m_out[node]
execute_token_flow(out_flow.target, m_out, bpmn_graph)
m_outs.append(m_out)
return m_outs
[docs]
def execute_token_flow(target, marking, bpmn_graph):
# next node is a subprocess --> add tokens in its start events
if isinstance(target, BPMN.SubProcess):
start_events = get_start_events_of_subprocess(
target.get_id(), bpmn_graph
)
for start_event in start_events:
marking[start_event] += 1
# handle external boundary events
for boundary_event in get_external_boundary_events_of_activity(
target.get_id(), bpmn_graph
):
marking[boundary_event] += 1
elif isinstance(target, BPMN.EndEvent):
# end event globally
if target.get_process() == bpmn_graph.get_process_id():
if isinstance(target, BPMN.TerminateEndEvent):
keys_to_delete = [key for key in marking if key != target]
for key in keys_to_delete:
del marking[key]
marking[target] += 1
# end event is inside subprocess
else:
sub_process_node = get_node_by_id(target.get_process(), bpmn_graph)
# end event is connected to boundary transition
for boundary_event in get_boundary_events_of_activity(
sub_process_node.get_id(), bpmn_graph
):
# TODO: also type of event could be checked, e.g. error end and
# boundary event
if (boundary_event.name == target.name):
# add token to boundary event
marking[boundary_event] += 1
# reset all tokens inside subprocess and on other boundary
# events
for key in get_all_nodes_inside_process(
target.get_process(), bpmn_graph
):
if key in marking:
del marking[key]
for key in get_boundary_events_of_activity(
sub_process_node.get_id(), bpmn_graph
):
if key in marking and key != boundary_event:
del marking[key]
return
# internal subprocess termination event
if isinstance(target, BPMN.TerminateEndEvent):
keys_to_delete = [
key
for key in marking
if key != target
and key
in get_all_nodes_inside_process(
target.get_process(), bpmn_graph
)
+ get_boundary_events_of_activity(
sub_process_node.get_id(), bpmn_graph
)
]
for key in keys_to_delete:
del marking[key]
# add marking at outgoing flow
if len(sub_process_node.get_out_arcs()) > 0:
target_node = sub_process_node.get_out_arcs()[0].target
execute_token_flow(target_node, marking, bpmn_graph)
return
# instead add token to end event, workaround to make sure that you
# can fire a message event even after the last event inside
# subprocess
marking[target] += 1
# just normal token flow to next node
else:
marking[target] += 1
[docs]
def enabled_nodes(bpmn, m):
"""
Returns a set of enabled transitions in a Petri net and given marking
Parameters
----------
:param pn: Petri net
:param m: marking of the pn
Returns
-------
:return: set of enabled transitions
"""
enabled = set()
for node in bpmn.get_nodes():
if is_enabled(node, bpmn, m):
enabled.add(node)
if (
isinstance(node, BPMN.EndEvent)
and node.get_process() == bpmn.get_process_id()
): # if there is a token in an end event, we are done and there is no enabled node anymore
return set()
return enabled