Source code for pm4py.objects.bpmn.util.bpmn_utils

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects.bpmn.obj import BPMN, Marking


[docs] def get_node_by_id(id, bpmn_graph): """ Returns the node with specified id Parameters ------------ id id string bpmn_graph BPMN graph Returns ------------ node that matches the id """ for node in bpmn_graph.get_nodes(): if node.get_id() == id: return node return None
[docs] def get_global_start_events(bpmn_graph): start_events = [] for node in bpmn_graph.get_nodes(): if isinstance(node, BPMN.StartEvent): if node.get_process() == bpmn_graph.get_process_id(): start_events.append(node) return start_events
[docs] def get_initial_marking(bpmn_graph): m = Marking() for event in get_global_start_events(bpmn_graph): m[event] = 1 return m
[docs] def get_boundary_events_of_activity(activity_id, bpmn_graph): boundary_events = [] for node in bpmn_graph.get_nodes(): if isinstance(node, BPMN.BoundaryEvent): node_activity_id = node.get_activity() if node_activity_id == activity_id: boundary_events.append(node) return boundary_events
[docs] def get_external_boundary_events_of_activity(activity_id, bpmn_graph): boundary_events = [] for node in bpmn_graph.get_nodes(): if isinstance(node, BPMN.MessageBoundaryEvent): node_activity_id = node.get_activity() if node_activity_id == activity_id: boundary_events.append(node) return boundary_events
# TODO: Include nodes that are in deeper level, too
[docs] def get_all_nodes_inside_process(process_id, bpmn_graph, deep=True): nodes = [] for node in bpmn_graph.get_nodes(): if deep: if process_id in get_processes_deep(node, bpmn_graph): nodes.append(node) else: if node.get_process() == process_id: nodes.append(node) return nodes
# return the direct children of a subprocess
[docs] def get_all_direct_child_subprocesses( process_id, bpmn_graph, include_normal=False ): nodes = set() for node in bpmn_graph.get_nodes(): if ( isinstance(node, BPMN.SubProcess) and node.get_process() == process_id ): if not include_normal: boundary_events = get_boundary_events_of_activity( node.get_id(), bpmn_graph ) # termination events inside subprocess termination_events = ( get_termination_events_of_subprocess_for_pnet( node.get_id(), bpmn_graph ) ) if len(boundary_events) > 0 or len(termination_events) > 0: nodes.add(node) else: nodes.add(node) return nodes
# return the direct and indirect children of a subprocess
[docs] def get_all_child_subprocesses(process_id, bpmn_graph, include_normal=False): sub_processes = set() for child in get_all_direct_child_subprocesses( process_id, bpmn_graph, include_normal ): sub_processes.add(child) sub_processes = sub_processes.union( get_all_child_subprocesses( child.get_id(), bpmn_graph, include_normal ) ) return sub_processes
[docs] def get_processes_deep(node, bpmn_graph): if node.get_process() == bpmn_graph.get_process_id(): return [bpmn_graph.get_process_id()] else: return [node.get_process()] + get_processes_deep( get_node_by_id(node.get_process(), bpmn_graph), bpmn_graph )
[docs] def get_subprocesses_sorted_by_depth(bpmn_graph): return sorted( [ node for node in bpmn_graph.get_nodes() if isinstance(node, BPMN.SubProcess) ], key=lambda x: x.get_depth(), reverse=True, )
[docs] def get_termination_events_of_subprocess(activity_id, bpmn_graph): events = [] for node in bpmn_graph.get_nodes(): if node.get_process() == activity_id and isinstance( node, BPMN.TerminateEndEvent ): events.append(node) return events
[docs] def get_termination_events_of_subprocess_for_pnet(activity_id, bpmn_graph): events = [] for node in bpmn_graph.get_nodes(): if node.get_process() == activity_id and isinstance( node, (BPMN.IntermediateCatchEvent, BPMN.IntermediateThrowEvent) ): cond = False for boundary_event in get_boundary_events_of_activity( activity_id, bpmn_graph ): if boundary_event.get_name() == node.get_name(): cond = True if not cond: events.append(node) return events
[docs] def get_start_events_of_subprocess(activity_id, bpmn_graph): events = [] for node in bpmn_graph.get_nodes(): if node.get_process() == activity_id and isinstance( node, BPMN.StartEvent ): events.append(node) return events
[docs] def get_end_events_of_subprocess(activity_id, bpmn_graph): events = [] for node in bpmn_graph.get_nodes(): if node.get_process() == activity_id and isinstance( node, BPMN.EndEvent ): events.append(node) return events
[docs] def bpmn_graph_end_events_as_throw_events(bpmn_graph): events = [] for node in bpmn_graph.get_nodes(): # subprocess end event if ( node.get_process() != bpmn_graph.get_process_id() and isinstance(node, BPMN.EndEvent) and not isinstance(node, BPMN.NormalEndEvent) ): events.append(node) return events