Source code for pm4py.objects.conversion.process_tree.variants.to_bpmn

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import copy

from pm4py.objects.process_tree.obj import Operator


[docs] class Counts(object): """ Shared variables among executions """ def __init__(self): """ Constructor """ self.num_xor_gateways = 0 self.num_para_gateways = 0 self.num_tau_trans = 0 self.tau_trans = []
[docs] def inc_xor_gateways(self): """ Increase the number of xor gateways (split + join) """ self.num_xor_gateways += 1
[docs] def inc_tau_trans(self): """ Increase the number of tau transitions """ self.num_tau_trans += 1
[docs] def inc_para_gateways(self): """ Increase the number of xor gateways (split + join) """ self.num_para_gateways += 1
[docs] def append_tau(self, tau_id): self.tau_trans.append(tau_id)
[docs] def add_task(bpmn, counts, label): """ Create a task with the specified label in the BPMN """ from pm4py.objects.bpmn.obj import BPMN task = BPMN.Task(name=label) bpmn.add_node(task) return bpmn, task, counts
[docs] def add_tau_task(bpmn, counts): """ Create a task with the specified label in the BPMN """ from pm4py.objects.bpmn.obj import BPMN counts.inc_tau_trans() tau_name = "tau_" + str(counts.num_tau_trans) tau_task = BPMN.Task(name=tau_name) bpmn.add_node(tau_task) counts.append_tau(tau_task) return bpmn, tau_task, counts
[docs] def add_xor_gateway(bpmn, counts): from pm4py.objects.bpmn.obj import BPMN counts.inc_xor_gateways() split_name = "xor_" + str(counts.num_xor_gateways) + "_split" join_name = "xor_" + str(counts.num_xor_gateways) + "_join" split = BPMN.ExclusiveGateway(name="", gateway_direction=BPMN.Gateway.Direction.DIVERGING) join = BPMN.ExclusiveGateway(name="", gateway_direction=BPMN.Gateway.Direction.CONVERGING) bpmn.add_node(split) bpmn.add_node(join) return bpmn, split, join, counts
[docs] def add_parallel_gateway(bpmn, counts): from pm4py.objects.bpmn.obj import BPMN counts.inc_para_gateways() split_name = "parallel_" + str(counts.num_para_gateways) + "_split" join_name = "parallel_" + str(counts.num_para_gateways) + "_join" split = BPMN.ParallelGateway(name="", gateway_direction=BPMN.Gateway.Direction.DIVERGING) join = BPMN.ParallelGateway(name="", gateway_direction=BPMN.Gateway.Direction.CONVERGING) bpmn.add_node(split) bpmn.add_node(join) return bpmn, split, join, counts
[docs] def add_inclusive_gateway(bpmn, counts): from pm4py.objects.bpmn.obj import BPMN counts.inc_para_gateways() split_name = "parallel_" + str(counts.num_para_gateways) + "_split" join_name = "parallel_" + str(counts.num_para_gateways) + "_join" split = BPMN.InclusiveGateway(name="", gateway_direction=BPMN.Gateway.Direction.DIVERGING) join = BPMN.InclusiveGateway(name="", gateway_direction=BPMN.Gateway.Direction.CONVERGING) bpmn.add_node(split) bpmn.add_node(join) return bpmn, split, join, counts
[docs] def recursively_add_tree(parent_tree, tree, bpmn, initial_event, final_event, counts, rec_depth): from pm4py.objects.bpmn.obj import BPMN tree_childs = [child for child in tree.children] initial_connector = None final_connector = None if tree.operator is None: trans = tree if trans.label is None: bpmn, task, counts = add_tau_task(bpmn, counts) bpmn.add_flow(BPMN.SequenceFlow(initial_event, task)) bpmn.add_flow(BPMN.SequenceFlow(task, final_event)) initial_connector = task final_connector = task else: bpmn, task, counts = add_task(bpmn, counts, trans.label) bpmn.add_flow(BPMN.SequenceFlow(initial_event, task)) bpmn.add_flow(BPMN.SequenceFlow(task, final_event)) initial_connector = task final_connector = task elif tree.operator == Operator.XOR: bpmn, split_gateway, join_gateway, counts = add_xor_gateway(bpmn, counts) for subtree in tree_childs: bpmn, counts, x, y = recursively_add_tree(tree, subtree, bpmn, split_gateway, join_gateway, counts, rec_depth + 1) bpmn.add_flow(BPMN.SequenceFlow(initial_event, split_gateway)) bpmn.add_flow(BPMN.SequenceFlow(join_gateway, final_event)) initial_connector = split_gateway final_connector = join_gateway elif tree.operator == Operator.PARALLEL: bpmn, split_gateway, join_gateway, counts = add_parallel_gateway(bpmn, counts) for subtree in tree_childs: bpmn, counts, x, y = recursively_add_tree(tree, subtree, bpmn, split_gateway, join_gateway, counts, rec_depth + 1) bpmn.add_flow(BPMN.SequenceFlow(initial_event, split_gateway)) bpmn.add_flow(BPMN.SequenceFlow(join_gateway, final_event)) initial_connector = split_gateway final_connector = join_gateway elif tree.operator == Operator.OR: bpmn, split_gateway, join_gateway, counts = add_inclusive_gateway(bpmn, counts) for subtree in tree_childs: bpmn, counts, x, y = recursively_add_tree(tree, subtree, bpmn, split_gateway, join_gateway, counts, rec_depth + 1) bpmn.add_flow(BPMN.SequenceFlow(initial_event, split_gateway)) bpmn.add_flow(BPMN.SequenceFlow(join_gateway, final_event)) initial_connector = split_gateway final_connector = join_gateway elif tree.operator == Operator.SEQUENCE: initial_intermediate_task = initial_event bpmn, final_intermediate_task, counts = add_tau_task(bpmn, counts) for i in range(len(tree_childs)): bpmn, counts, initial_connect, final_connect = recursively_add_tree(tree, tree_childs[i], bpmn, initial_intermediate_task, final_intermediate_task, counts, rec_depth + 1) initial_intermediate_task = final_connect if i == 0: initial_connector = initial_connect if i == len(tree_childs) - 2: final_intermediate_task = final_event else: bpmn, final_intermediate_task, counts = add_tau_task(bpmn, counts) final_connector = final_connect elif tree.operator == Operator.LOOP: do = tree_childs[0] bpmn, split, join, counts = add_xor_gateway(bpmn, counts) bpmn, counts, i, y = recursively_add_tree(tree, do, bpmn, join, split, counts, rec_depth + 1) for redo in tree_childs[1:]: bpmn, counts, x, y = recursively_add_tree(tree, redo, bpmn, split, join, counts, rec_depth + 1) bpmn.add_flow(BPMN.SequenceFlow(initial_event, join)) bpmn.add_flow(BPMN.SequenceFlow(split, final_event)) initial_connector = join final_connector = split return bpmn, counts, initial_connector, final_connector
[docs] def delete_tau_transitions(bpmn, counts): from pm4py.objects.bpmn.obj import BPMN for tau_tran in counts.tau_trans: in_arcs = tau_tran.get_in_arcs() out_arcs = tau_tran.get_out_arcs() if len(in_arcs) > 1 or len(out_arcs) > 1: raise Exception("Tau transition has more than one incoming or outgoing edge!") if in_arcs and out_arcs: out_flow = out_arcs[0] in_flow = in_arcs[0] source = in_flow.get_source() target = out_flow.get_target() bpmn.remove_flow(out_flow) bpmn.remove_flow(in_flow) bpmn.add_flow(BPMN.SequenceFlow(source, target)) else: for in_flow in copy.copy(in_arcs): bpmn.remove_flow(in_flow) for out_flow in copy.copy(out_arcs): bpmn.remove_flow(out_flow) bpmn.remove_node(tau_tran) return bpmn
[docs] def apply(tree, parameters=None): """ Converts the process tree into a BPMN diagram Parameters -------------- tree Process tree parameters Parameters of the algorithm Returns -------------- bpmn_graph BPMN diagram """ from pm4py.objects.bpmn.obj import BPMN counts = Counts() bpmn = BPMN() start_event = BPMN.StartEvent(name="start", isInterrupting=True) end_event = BPMN.NormalEndEvent(name="end") bpmn.add_node(start_event) bpmn.add_node(end_event) bpmn, counts, _, _ = recursively_add_tree(tree, tree, bpmn, start_event, end_event, counts, 0) bpmn = delete_tau_transitions(bpmn, counts) for node in bpmn.get_nodes(): node.set_process(bpmn.get_process_id()) for edge in bpmn.get_flows(): edge.set_process(bpmn.get_process_id()) return bpmn