Source code for pm4py.objects.conversion.powl.variants.to_process_tree

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import networkx as nx

from pm4py.objects.process_tree.obj import ProcessTree, Operator as PTOperator
from pm4py.objects.powl.obj import POWL, StrictPartialOrder, OperatorPOWL, Transition, SilentTransition
from typing import Optional, Dict, Any
from pm4py.util import nx_utils
from collections import deque
import warnings


[docs] def assign_levels_to_toposort(graph): """ Internal method """ # First, find all nodes with in-degree == 0 to identify start nodes start_nodes = [node for node, degree in graph.in_degree() if degree == 0] # Initialize a queue for BFS queue = deque(start_nodes) # Initialize level dictionary to hold the level of each node levels = {node: 0 for node in start_nodes} while queue: current_node = queue.popleft() current_level = levels[current_node] # For each successor of the current node for successor in graph.successors(current_node): # If the successor has not been assigned a level yet if successor not in levels: # Assign the level as current node's level + 1 levels[successor] = current_level + 1 # Add the successor to the queue to process its successors later queue.append(successor) return levels
[docs] def apply_recursive(powl: POWL, rec_depth=0) -> ProcessTree: """ Internal method """ tree = ProcessTree() if isinstance(powl, Transition): tree.label = powl.label elif isinstance(powl, OperatorPOWL): tree.operator = powl.operator for c in powl.children: tree.children.append(apply_recursive(c, rec_depth + 1)) else: # detects the connected components of the POWL partial order # which correspond to parts of the process executed in parallel DG = nx_utils.DiGraph() for c in powl.children: DG.add_node(c) for n1 in powl.children: for n2 in powl.children: if n1 != n2: if powl.order.is_edge(n1, n2): DG.add_edge(n1, n2) DG = nx.transitive_reduction(DG) G = nx.Graph(DG) conn_comp = list(nx_utils.connected_components(G)) children = [] for scc in conn_comp: # for every connected component, create the subgraph and get the topological sort subgraph = DG.subgraph(scc) # Safety check: the subgraph must be a DAG (no cycles) if not nx.is_directed_acyclic_graph(subgraph): raise Exception("The provided POWL model is invalid!") topo_sort = list(nx.topological_sort(subgraph)) if len(topo_sort) == 1: # take the only node of the topological sort and continue the translation subtree = apply_recursive(topo_sort[0], rec_depth + 1) else: # assign a level using BFS starting from the nodes without any ancestor levels0 = assign_levels_to_toposort(subgraph) max_level = max(v for v in levels0.values()) levels = [] for i in range(max_level+1): levels.append([]) for k, v in levels0.items(): levels[v].append(k) # check if from every node of the level below you can go to every node of the level after for i in range(len(levels)-1): for n in levels[i]: for m in levels[i+1]: if not G.has_edge(n, m): warnings.warn("This POWL model cannot be converted precisely.") # if so, the POWL model can be converted to a process tree. # do that systematically internal_children1 = [] for i in range(len(levels)): internal_children2 = [] for c in levels[i]: internal_children2.append(apply_recursive(c, rec_depth + 1)) if len(internal_children2) == 1: internal_children1.append(internal_children2[0]) else: subtree2 = ProcessTree(operator=PTOperator.PARALLEL, children=internal_children2) for c2 in subtree2.children: c2.parent = subtree2 internal_children1.append(subtree2) # create a sequence including potential many parallel operators (one for every BFS level of the # topological sort) subtree = ProcessTree(operator=PTOperator.SEQUENCE, children=internal_children1) for c1 in subtree.children: c1.parent = subtree children.append(subtree) if len(children) == 1: # if there is only one connected component, return the process tree corresponding to that tree = children[0] else: # create an AND operator tree.operator = PTOperator.PARALLEL for child in children: tree.children.append(child) for child in tree.children: child.parent = tree return tree
[docs] def apply(powl: POWL, parameters: Optional[Dict[Any, Any]] = None) -> ProcessTree: """ Converts a POWL model to a process tree, in the situations in which this is possible Parameters --------------- powl POWL model Returns --------------- process_tree Process tree """ if parameters is None: parameters = {} return apply_recursive(powl)