Source code for pm4py.visualization.ocel.ocpn.variants.brachmann

import uuid
import numpy as np
import math
import copy
from typing import Optional, Dict, Any, List, Tuple, Set
from graphviz import Digraph
from enum import Enum
from pm4py.util import exec_utils, constants, vis_utils
from pm4py.util.lp import solver as lp_solver
from pm4py.objects.petri_net.obj import PetriNet  # For type hinting if needed
from copy import deepcopy


# --- Constants matching TypeScript OCPNLayout ---
PLACE_TYPE = "place"
TRANSITION_TYPE = "transition"
DUMMY_TYPE = "dummy"


# --- Helper Functions ---

[docs] def ot_to_color(ot: str) -> str: """Generates a deterministic hex color based on the object type string.""" # Use a deterministic hash function (e.g., MD5) to ensure consistency across runs import hashlib # Create a hash from the object type string hash_obj = hashlib.md5(ot.encode('utf-8')) hash_hex = hash_obj.hexdigest() # Take the first 6 characters of the hash to form a hex color color = "#" + hash_hex[:6] return color
[docs] def generate_dummy_id() -> str: """Generates a unique ID for a dummy node.""" return "dummy_" + str(uuid.uuid4())
[docs] def generate_arc_id(source_id: str, target_id: str) -> str: """Generates a unique ID for an arc.""" return f"arc_{source_id}_{target_id}_{uuid.uuid4()}"
[docs] def get_neighbors(vertex_id: str, layout_data: Dict, direction: str) -> List[str]: """Gets upper ('up') or lower ('down') neighbors of a vertex.""" neighbors = set() arcs = layout_data.get("arcs", {}) if direction == 'up': # Find arcs where the current vertex is the target for arc_id, arc_data in arcs.items(): if arc_data['target'] == vertex_id: neighbors.add(arc_data['source']) elif direction == 'down': # Find arcs where the current vertex is the source for arc_id, arc_data in arcs.items(): if arc_data['source'] == vertex_id: neighbors.add(arc_data['target']) return list(neighbors)
[docs] def get_arcs_between(u: str, v: str, layout_data: Dict) -> List[Dict]: """Gets arcs connecting vertex u and vertex v directly.""" connecting_arcs = [] for arc_id, arc_data in layout_data.get("arcs", {}).items(): if (arc_data['source'] == u and arc_data['target'] == v) or \ (arc_data['source'] == v and arc_data['target'] == u): connecting_arcs.append(arc_data) return connecting_arcs
[docs] def get_arcs_between_layers(layer_idx1: int, layer_idx2: int, layout_data: Dict) -> List[Dict]: """Gets all arcs connecting nodes between two specific layers.""" arcs_between = [] vertices = layout_data.get("vertices", {}) layering = layout_data.get("layering", []) if layer_idx1 < 0 or layer_idx1 >= len(layering) or \ layer_idx2 < 0 or layer_idx2 >= len(layering): return [] layer1_nodes = set(layering[layer_idx1]) layer2_nodes = set(layering[layer_idx2]) for arc_id, arc_data in layout_data.get("arcs", {}).items(): src = arc_data['source'] tgt = arc_data['target'] if (src in layer1_nodes and tgt in layer2_nodes) or \ (src in layer2_nodes and tgt in layer1_nodes): # Ensure the arc direction aligns with layer indices for simplicity later # This assumes layer_idx2 > layer_idx1 typically represents downward flow # Let's return based on the layer index order passed in if src in layer1_nodes and tgt in layer2_nodes: arcs_between.append(arc_data) elif src in layer2_nodes and tgt in layer1_nodes: # Add but maybe flag directionality if needed by caller arcs_between.append(arc_data) return arcs_between
[docs] def is_incident_to_inner_segment(ocpn_layout: Dict, vertex_id: str) -> bool: """Checks if a vertex is a dummy node connected to another dummy node above it.""" vertices = ocpn_layout.get("vertices", {}) v = vertices.get(vertex_id) if v and v.get("type") == DUMMY_TYPE: upper_neighbor_id = v.get("upper_neighbor") # Assuming upper_neighbor stores the ID if upper_neighbor_id: upper_neighbor = vertices.get(upper_neighbor_id) if upper_neighbor and upper_neighbor.get("type") == DUMMY_TYPE: return True return False
# --- Core Sugiyama Steps --- def _build_initial_layout(ocpn: Dict[str, Any], parameters: Dict) -> Dict: # Ensure object_types is a sorted list for deterministic ordering object_types = sorted(ocpn.get("object_types", [])) # Sort alphabetically layout_data = { "vertices": {}, "arcs": {}, "original_arcs": {}, "object_types": object_types, "config": parameters } vertex_map = {} # Map (type, original_node) -> layout_id # 1. Create vertices for all places and transitions in the individual nets # Use the sorted object_types instead of iterating over petri_nets directly for ot in object_types: net_data = ocpn.get("petri_nets", {}).get(ot) if not net_data: print(f"Warning: No Petri net found for object type '{ot}', skipping.") continue net, im, fm = net_data # Create places for place in net.places: place_id = str(place) # Use inherent ID if available, else generate if (PLACE_TYPE, place_id) not in vertex_map: layout_id = f"place_{ot}_{place.name}_{uuid.uuid4()}" # Ensure uniqueness vertex_map[(PLACE_TYPE, place_id)] = layout_id is_source = place in im is_sink = place in fm layout_data["vertices"][layout_id] = { "id": layout_id, "original_id": place_id, "name": place.name or f"p_{ot}", "label": place.name or f"p_{ot}", # Or maybe object type? "objectType": ot, "type": PLACE_TYPE, "layer": -1, "pos": -1, "x": None, "y": None, "is_source": is_source, "is_sink": is_sink, # For positioning "root": layout_id, "align": layout_id, "sink": layout_id, "shift": float('inf'), "upper_neighbor": None, # For dummy tracking "lower_neighbor": None, # For dummy tracking } # Create transitions (link to activities if possible) for trans in net.transitions: trans_id = str(trans) # Use inherent ID layout_id = None if trans.label: # Visible transition corresponds to an activity # Check if an activity node already exists if (TRANSITION_TYPE, trans.label) in vertex_map: layout_id = vertex_map[(TRANSITION_TYPE, trans.label)] # Add this object type to the transition's knowledge layout_data["vertices"][layout_id].setdefault("adjacentObjectTypes", set()).add(ot) else: layout_id = f"trans_{trans.label}_{uuid.uuid4()}" vertex_map[(TRANSITION_TYPE, trans.label)] = layout_id layout_data["vertices"][layout_id] = { "id": layout_id, "original_id": trans_id, # Keep track of original PNet transition ID too? Maybe needed? "name": trans.label, "label": trans.label, "objectType": None, # Transitions don't belong to one OT "adjacentObjectTypes": {ot}, # OTs involved "type": TRANSITION_TYPE, "silent": False, "layer": -1, "pos": -1, "x": None, "y": None, # For positioning "root": layout_id, "align": layout_id, "sink": layout_id, "shift": float('inf'), "upper_neighbor": None, "lower_neighbor": None, } else: # Silent transition layout_id = f"silent_{ot}_{trans.name}_{uuid.uuid4()}" vertex_map[(TRANSITION_TYPE, trans_id)] = layout_id # Map original silent trans ID layout_data["vertices"][layout_id] = { "id": layout_id, "original_id": trans_id, "name": trans.name or "silent", "label": None, # Or 'τ' ? "objectType": ot, # Silent transitions often tied to one net context "adjacentObjectTypes": {ot}, "type": TRANSITION_TYPE, "silent": True, "layer": -1, "pos": -1, "x": None, "y": None, # For positioning "root": layout_id, "align": layout_id, "sink": layout_id, "shift": float('inf'), "upper_neighbor": None, "lower_neighbor": None, } # 2. Create arcs based on the Petri net structure for arc in net.arcs: source_node = arc.source target_node = arc.target source_id_orig = str(source_node) target_id_orig = str(target_node) source_type = PLACE_TYPE if isinstance(source_node, PetriNet.Place) else TRANSITION_TYPE target_type = PLACE_TYPE if isinstance(target_node, PetriNet.Place) else TRANSITION_TYPE # Need to map the Petri Net element to the correct layout ID # Handle visible transitions potentially shared across OTs if source_type == TRANSITION_TYPE and source_node.label: source_layout_id = vertex_map.get((source_type, source_node.label)) else: source_layout_id = vertex_map.get((source_type, source_id_orig)) if target_type == TRANSITION_TYPE and target_node.label: target_layout_id = vertex_map.get((target_type, target_node.label)) else: target_layout_id = vertex_map.get((target_type, target_id_orig)) if source_layout_id and target_layout_id: # Check if an arc (in either direction) already exists between these layout nodes # This handles cases where the same high-level connection appears in multiple OTs arc_exists = False existing_arc_id = None for arc_id_check, arc_data_check in layout_data["arcs"].items(): if (arc_data_check['source'] == source_layout_id and arc_data_check[ 'target'] == target_layout_id) or \ (arc_data_check['source'] == target_layout_id and arc_data_check[ 'target'] == source_layout_id): arc_exists = True existing_arc_id = arc_id_check break if not arc_exists: arc_id = generate_arc_id(source_layout_id, target_layout_id) # Determine if it's a variable arc (double arc) - simplified check is_variable = False if ot in ocpn.get("double_arcs_on_activity", {}): act_label = None if source_type == TRANSITION_TYPE and source_node.label: act_label = source_node.label elif target_type == TRANSITION_TYPE and target_node.label: act_label = target_node.label if act_label and ocpn["double_arcs_on_activity"][ot].get(act_label, False): is_variable = True # Simplified: if *activity* has double arc for this OT arc_data = { "id": arc_id, "source": source_layout_id, "target": target_layout_id, "objectType": ot, # Associate arc with this OT's net initially "original": True, # Mark as an original arc from the PN "reversed": False, # For cycle breaking "dummy_nodes": [], # Path of dummies "minLayer": -1, # For dummy insertion "maxLayer": -1, # For dummy insertion "type1": False, # For positioning conflicts "weight": arc.weight if hasattr(arc, 'weight') else 1, # Use arc weight if available "variable": is_variable, # Mark variable arcs (e.g., double arcs in OCPN paper) } layout_data["arcs"][arc_id] = arc_data layout_data["original_arcs"][arc_id] = copy.deepcopy(arc_data) # Keep pristine copy else: # Arc already exists, maybe update weight or mark as variable if needed? # For now, just ensure the involved object types are tracked on the vertices if source_type == TRANSITION_TYPE: layout_data["vertices"][source_layout_id].setdefault("adjacentObjectTypes", set()).add(ot) if target_type == TRANSITION_TYPE: layout_data["vertices"][target_layout_id].setdefault("adjacentObjectTypes", set()).add(ot) pass # Don't add duplicate arc structure else: print(f"Warning: Could not find layout nodes for arc {source_id_orig} -> {target_id_orig} in OT {ot}") # Add global activities not represented in any Petri net transition? # This layout focuses on the discovered nets. Add activities might be needed if some are unconnected. all_activity_nodes = {v['name'] for v in layout_data['vertices'].values() if v['type'] == TRANSITION_TYPE and not v['silent']} for act in ocpn.get("activities", set()): if act not in all_activity_nodes: # This activity wasn't part of any discovered Petri net transition layout_id = f"trans_{act}_unconnected_{uuid.uuid4()}" layout_data["vertices"][layout_id] = { "id": layout_id, "original_id": None, "name": act, "label": act, "objectType": None, "adjacentObjectTypes": set(), "type": TRANSITION_TYPE, "silent": False, "layer": -1, "pos": -1, "x": None, "y": None, "root": layout_id, "align": layout_id, "sink": layout_id, "shift": float('inf'), "upper_neighbor": None, "lower_neighbor": None, } print(f"Warning: Activity '{act}' not found in Petri net transitions, adding as unconnected node.") return layout_data # --- Graph representation for algorithms ---
[docs] class OCPNGraph: """Helper class to represent the graph for cycle breaking and layer assignment.""" def __init__(self, layout_data: Dict): self.nodes = list(layout_data["vertices"].keys()) self.arcs_data = layout_data["arcs"] # Reference to main arc data self.adj = {n: [] for n in self.nodes} self.rev_adj = {n: [] for n in self.nodes} self.arc_list = [] # List of {'source': id, 'target': id, 'id': arc_id} for arc_id, arc in self.arcs_data.items(): src, tgt = arc['source'], arc['target'] # Skip self-loops if any exist for layout purposes if src == tgt: continue if src in self.nodes and tgt in self.nodes: self.adj[src].append(tgt) self.rev_adj[tgt].append(src) self.arc_list.append({'source': src, 'target': tgt, 'id': arc_id})
[docs] def get_out_degree(self, node: str) -> int: return len(self.adj.get(node, []))
[docs] def get_in_degree(self, node: str) -> int: return len(self.rev_adj.get(node, []))
[docs] def remove_node(self, node: str): if node not in self.nodes: return # Remove from adjacency lists successors = self.adj.pop(node, []) predecessors = self.rev_adj.pop(node, []) for succ in successors: if succ in self.rev_adj: self.rev_adj[succ].remove(node) for pred in predecessors: if pred in self.adj: self.adj[pred].remove(node) # Remove from node list self.nodes.remove(node) # Remove related arcs from arc_list (inefficient, but simple for now) self.arc_list = [a for a in self.arc_list if a['source'] != node and a['target'] != node]
[docs] def remove_nodes(self, nodes_to_remove: List[str]): # It's often more efficient to rebuild adjacencies if removing many nodes # But for simplicity, we call remove_node iteratively for node in nodes_to_remove: self.remove_node(node)
[docs] def get_sink(self) -> Optional[str]: for node in self.nodes: if self.get_out_degree(node) == 0: return node return None
[docs] def get_source(self) -> Optional[str]: for node in self.nodes: if self.get_in_degree(node) == 0: return node return None
[docs] def get_max_out_degree_node(self) -> Optional[str]: max_degree = -1 max_node = None for node in self.nodes: degree = self.get_out_degree(node) - self.get_in_degree(node) if degree > max_degree: max_degree = degree max_node = node return max_node
def _modified_greedy_fas(graph: OCPNGraph, sources: List[str], sinks: List[str]) -> List[str]: """Computes Feedback Arc Set using greedy algorithm considering sources/sinks.""" s1 = list(sources) # Nodes forced to the beginning s2 = list(sinks) # Nodes forced to the end # Filter sources/sinks that might not be in the graph (e.g., unconnected activities) s1 = [n for n in s1 if n in graph.nodes] s2 = [n for n in s2 if n in graph.nodes] # Pre-sort sources/sinks based on degree difference (high out-degree first for sources) if s1: s1.sort(key=lambda n: graph.get_out_degree(n) - graph.get_in_degree(n), reverse=True) graph.remove_nodes(s1) # Temporarily remove from graph if s2: # High in-degree first for sinks (or low (out-in) diff) s2.sort(key=lambda n: graph.get_out_degree(n) - graph.get_in_degree(n), reverse=True) # Same sorting as TS graph.remove_nodes(s2) # Temporarily remove # Main greedy loop temp_s1 = [] temp_s2 = [] while graph.nodes: # Remove all current sinks sink = graph.get_sink() while sink: temp_s2.insert(0, sink) # Add to front of temp_s2 graph.remove_node(sink) sink = graph.get_sink() # Remove all current sources source = graph.get_source() while source: temp_s1.append(source) # Add to end of temp_s1 graph.remove_node(source) source = graph.get_source() # If nodes remain, pick highest (out-degree - in-degree) if graph.nodes: node = graph.get_max_out_degree_node() if node: # Should always find one if graph.nodes is not empty temp_s1.append(node) graph.remove_node(node) else: # Should not happen if graph.nodes is non-empty and graph was connected print("Warning: No node found in FAS loop, graph might be empty or disconnected unexpectedly.") break # Combine results: sources + temp_s1 + temp_s2 + sinks return s1 + temp_s1 + temp_s2 + s2 def _reverse_cycles(layout_data: Dict, config: Dict) -> int: """Reverses arcs based on FAS to break cycles.""" print("Step 1: Cycle Breaking...") graph = OCPNGraph(layout_data) # Identify potential sources/sinks from layout data (places marked is_source/is_sink) # Note: The TS config allows arbitrary node IDs. Here we derive from Petri net markings. potential_sources = [vid for vid, vdata in layout_data["vertices"].items() if vdata.get("is_source")] potential_sinks = [vid for vid, vdata in layout_data["vertices"].items() if vdata.get("is_sink")] # Allow user override via parameters if needed sources = config.get("sources", potential_sources) sinks = config.get("sinks", potential_sinks) # Ensure sources/sinks provided in config actually exist sources = [s for s in sources if s in layout_data["vertices"]] sinks = [s for s in sinks if s in layout_data["vertices"]] fas_order = _modified_greedy_fas(graph, sources, sinks) fas_indices = {node_id: index for index, node_id in enumerate(fas_order)} reversed_count = 0 for arc_id, arc in layout_data["arcs"].items(): src = arc['source'] tgt = arc['target'] # Check if src and tgt are in the FAS order (they might have been removed if sources/sinks) src_index = fas_indices.get(src) tgt_index = fas_indices.get(tgt) if src_index is not None and tgt_index is not None: if src_index > tgt_index: # physically swap them arc['source'], arc['target'] = arc['target'], arc['source'] # store the original direction so we can revert later arc['original_source'] = tgt arc['original_target'] = src # no need for arc['reversed'] at all reversed_count += 1 else: arc['reversed'] = False else: # Arc involves a pre-defined source/sink, assume original direction is fine # Or handle edge cases if needed arc['reversed'] = False print(f" Reversed {reversed_count} arcs.") return reversed_count def _assign_layers(layout_data: Dict) -> bool: """Assigns layers to vertices using LP to minimize edge length.""" print("Step 2: Layer Assignment (using LP)...") vertices = layout_data["vertices"] arcs = layout_data["arcs"] node_list = list(vertices.keys()) node_to_index = {node_id: i for i, node_id in enumerate(node_list)} num_vars = len(node_list) # Objective: Minimize sum(weight * (layer[target] - layer[source])) # c vector (cost for each variable/node's layer) c = np.zeros(num_vars) active_arcs = [] # Arcs between nodes currently in the graph for arc_id, arc in arcs.items(): src_orig, tgt_orig = arc['source'], arc['target'] weight = arc.get('weight', 1) # Consider arc weights # Use the 'reversed' flag to determine the direction for layering if arc['reversed']: src, tgt = tgt_orig, src_orig else: src, tgt = src_orig, tgt_orig if src in node_to_index and tgt in node_to_index: src_idx = node_to_index[src] tgt_idx = node_to_index[tgt] c[tgt_idx] += weight c[src_idx] -= weight active_arcs.append({'source': src, 'target': tgt, 'id': arc_id}) # Constraints: layer[target] - layer[source] >= 1 (for non-reversed arcs) # Represented as: -layer[source] + layer[target] >= 1 num_constraints = len(active_arcs) # Aub matrix for inequalities Aub * x <= bub # We have A * x >= b, so -A * x <= -b Aub = np.zeros((num_constraints, num_vars)) bub = np.full(num_constraints, -1.0) # Lower bound is 1, so upper bound for <= form is -1 for i, arc in enumerate(active_arcs): src_idx = node_to_index[arc['source']] tgt_idx = node_to_index[arc['target']] Aub[i, src_idx] = 1.0 # Coefficient for -layer[source] becomes +1 in -A Aub[i, tgt_idx] = -1.0 # Coefficient for +layer[target] becomes -1 in -A # Constraints: layer[node] >= 0 (Implicit in some solvers, but can add) # -layer[node] <= 0 num_positivity_constraints = num_vars Aub_pos = np.zeros((num_positivity_constraints, num_vars)) bub_pos = np.zeros(num_positivity_constraints) for i in range(num_vars): Aub_pos[i, i] = -1.0 # -layer[i] <= 0 # Combine constraints Aub_combined = np.vstack((Aub, Aub_pos)) bub_combined = np.concatenate((bub, bub_pos)) # Equality constraints (Aeq * x = beq) - None in this basic formulation Aeq = None # np.zeros((0, num_vars)) # Or None beq = None # np.zeros(0) # Or None # --- Solve LP using pm4py utility --- try: # Ensure integer solution - Requires a solver that supports MILP (e.g., Pulp, CBC, GLPK) # Check pm4py's default or specify one that supports integers if needed. # If the default ('scipy') doesn't support integers, this might yield float layers. # We can round them, but a true MILP solver is better. # Let's try first and see. SciPy's HiGHS might handle MILP. # TODO: Add parameter to specify integer variables if solver supports it print(f" Setting up LP with {num_vars} variables and {Aub_combined.shape[0]} constraints.") sol = lp_solver.apply(c, Aub_combined, bub_combined, Aeq, beq, parameters={"verbose": False}, variant=lp_solver.SCIPY) # Add verbose=True for debugging if sol is None: print(" LP Solver did not return a solution.") # Fallback: Simple topological sort based layering? (Less optimal) # Or error out return False points = lp_solver.get_points_from_sol(sol, variant=lp_solver.SCIPY) if points is None or len(points) != num_vars: print(" LP Solver failed to extract points or returned incorrect number.") # print("Solver status:", sol...) # Need specific solver's status info return False # Assign layers (rounding if necessary) max_layer = 0 layering_dict = {} for i, node_id in enumerate(node_list): # Round to nearest integer. Add small epsilon for robustness? layer = int(round(points[i])) vertices[node_id]['layer'] = layer if layer not in layering_dict: layering_dict[layer] = [] layering_dict[layer].append(node_id) max_layer = max(max_layer, layer) # Convert dictionary to sorted list of lists layout_data['layering'] = [] for i in range(max_layer + 1): layout_data['layering'].append(layering_dict.get(i, [])) print(f" Assigned nodes to {max_layer + 1} layers.") return True except Exception as e: print(f" Error during LP solving or layer assignment: {e}") import traceback traceback.print_exc() return False def _insert_dummy_vertices(layout_data: Dict): """Inserts dummy nodes for arcs spanning multiple layers.""" print("Step 3: Dummy Vertex Insertion...") vertices = layout_data["vertices"] arcs = layout_data["arcs"] layering = layout_data["layering"] arcs_to_add = {} arcs_to_remove = [] dummy_count = 0 # Iterate over a copy of arc IDs as we modify the arcs dictionary original_arc_ids = list(arcs.keys()) for arc_id in original_arc_ids: arc = arcs[arc_id] src_id_orig, tgt_id_orig = arc['source'], arc['target'] # Use the 'reversed' flag to determine logical source/target for layering if arc['reversed']: src_id, tgt_id = tgt_id_orig, src_id_orig else: src_id, tgt_id = src_id_orig, tgt_id_orig if src_id not in vertices or tgt_id not in vertices: print(f"Warning: Skipping dummy insertion for arc {arc_id} - source or target not found.") continue src_node = vertices[src_id] tgt_node = vertices[tgt_id] src_layer = src_node.get('layer', -1) tgt_layer = tgt_node.get('layer', -1) if src_layer == -1 or tgt_layer == -1: print(f"Warning: Skipping dummy insertion for arc {arc_id} ({src_id} -> {tgt_id}) - layers not assigned.") continue arc['minLayer'] = min(src_layer, tgt_layer) # Store original layer span arc['maxLayer'] = max(src_layer, tgt_layer) slack = tgt_layer - src_layer # Based on non-reversed direction if slack > 1: # Remove original arc, will be replaced by segments arcs_to_remove.append(arc_id) dummies_in_path = [] # Determine object type for dummies (use place's OT if exists) ot = arc.get("objectType") # Initial guess if vertices[src_id_orig]["type"] == PLACE_TYPE: ot = vertices[src_id_orig]["objectType"] elif vertices[tgt_id_orig]["type"] == PLACE_TYPE: ot = vertices[tgt_id_orig]["objectType"] # Find rough horizontal position for inserting dummies src_pos = -1 tgt_pos = -1 if src_layer < len(layering) and src_id in layering[src_layer]: src_pos = layering[src_layer].index(src_id) if tgt_layer < len(layering) and tgt_id in layering[tgt_layer]: tgt_pos = layering[tgt_layer].index(tgt_id) median_pos = len(layering[src_layer + 1]) // 2 # Default if positions unknown if src_pos != -1 and tgt_pos != -1: # Simple median estimate based on original nodes' positions # More sophisticated placement could be done in vertex ordering/positioning median_pos = (src_pos + tgt_pos) // 2 last_node_id = src_id # Start from the source for i in range(1, slack): dummy_layer_idx = src_layer + i dummy_id = generate_dummy_id() dummy_count += 1 dummies_in_path.append(dummy_id) dummy_node = { "id": dummy_id, "original_id": None, "name": f"dummy_{dummy_count}", "label": None, "objectType": ot, # Inherit OT for coloring/grouping "type": DUMMY_TYPE, "layer": dummy_layer_idx, "pos": -1, # Will be set by ordering "x": None, "y": None, "dummy_of_arc": arc_id, # Link back to the original arc ID "root": dummy_id, # Initialize positioning fields "align": dummy_id, "sink": dummy_id, "shift": float('inf'), "upper_neighbor": last_node_id, # Track connectivity "lower_neighbor": None, # Will be set next iteration or by target } vertices[dummy_id] = dummy_node # Update previous dummy's lower neighbor if i > 1: vertices[dummies_in_path[-2]]["lower_neighbor"] = dummy_id # Insert dummy into the layering structure (approximate position) if dummy_layer_idx < len(layering): insert_pos = min(median_pos, len(layering[dummy_layer_idx])) layering[dummy_layer_idx].insert(insert_pos, dummy_id) else: # Should not happen if layers are contiguous print(f"Warning: Dummy layer index {dummy_layer_idx} out of bounds.") layering.append([dummy_id]) # Append as new layer? # Create arc segment: last_node -> dummy segment_arc_id = generate_arc_id(last_node_id, dummy_id) # New arcs inherit properties from the original arc they replace arcs_to_add[segment_arc_id] = { "id": segment_arc_id, "source": last_node_id, # Actual source/target for segment "target": dummy_id, "objectType": ot, "original": False, # Not an original PN arc "reversed": False, # Segments follow layer order "dummy_nodes": [], "minLayer": dummy_layer_idx - 1, "maxLayer": dummy_layer_idx, "type1": False, "weight": arc.get('weight', 1), # Inherit weight "variable": arc.get('variable', False), # Inherit variability "original_arc_id": arc_id # Link back } last_node_id = dummy_id # Move to the new dummy # Final segment: last_dummy -> target segment_arc_id = generate_arc_id(last_node_id, tgt_id) arcs_to_add[segment_arc_id] = { "id": segment_arc_id, "source": last_node_id, "target": tgt_id, "objectType": ot, "original": False, "reversed": False, "dummy_nodes": [], "minLayer": tgt_layer - 1, "maxLayer": tgt_layer, "type1": False, "weight": arc.get('weight', 1), "variable": arc.get('variable', False), "original_arc_id": arc_id } # Set lower neighbor for the last dummy vertices[last_node_id]["lower_neighbor"] = tgt_id # Store the path of dummies on the *original* arc data (useful later?) # Find the original arc in the pristine copy if arc_id in layout_data["original_arcs"]: layout_data["original_arcs"][arc_id]["dummy_nodes"] = dummies_in_path # Also add dummy path info somewhere accessible for drawing? Maybe not needed if segments are drawn. # Let's add it to the removed arc's data just in case arcs[arc_id]['dummy_nodes'] = dummies_in_path # Apply changes for arc_id in arcs_to_remove: del arcs[arc_id] arcs.update(arcs_to_add) print(f" Inserted {dummy_count} dummy vertices.") # --- Vertex Ordering (Barycenter Heuristic) --- def _compute_barycenter(ocpn_layout: Dict, vertex_id: str, layer_idx: int, fixed_layer_idx: int, config: Dict) -> float: """Computes the barycenter value for a vertex based on neighbors in the fixed layer.""" vertices = ocpn_layout["vertices"] layering = ocpn_layout["layering"] vertex = vertices[vertex_id] if fixed_layer_idx < 0 or fixed_layer_idx >= len(layering): return 0.0 # No fixed layer neighbors fixed_layer = layering[fixed_layer_idx] fixed_layer_positions = {node_id: pos for pos, node_id in enumerate(fixed_layer)} # Find neighbors in the fixed layer neighbors_in_fixed = [] arcs = ocpn_layout.get("arcs", {}) for arc_id, arc in arcs.items(): # Consider original direction before reversal for neighborhood src_orig, tgt_orig = arc['source'], arc['target'] other_node = None if src_orig == vertex_id and tgt_orig in fixed_layer_positions: other_node = tgt_orig elif tgt_orig == vertex_id and src_orig in fixed_layer_positions: other_node = src_orig if other_node: # Check if the arc actually connects the two layers of interest other_node_layer = vertices.get(other_node, {}).get('layer', -1) if other_node_layer == fixed_layer_idx: neighbors_in_fixed.append(other_node) if not neighbors_in_fixed: return -1.0 # Indicate no neighbors barycenter_sum = 0.0 for neighbor in neighbors_in_fixed: barycenter_sum += fixed_layer_positions[neighbor] # Use 0-based index avg_barycenter = barycenter_sum / len(neighbors_in_fixed) # --- Add Object Attraction component (for places) --- if vertex["type"] == PLACE_TYPE and config.get("object_attraction", 0) > 0: object_attraction = config.get("object_attraction", 0.5) # Default weight obj_attr_min = config.get("object_attraction_range_min", 1) obj_attr_max = config.get("object_attraction_range_max", 1) current_ot = vertex["objectType"] object_neighbor_positions = [] direction = 1 if fixed_layer_idx > layer_idx else -1 # +1 if fixed layer is below, -1 if above for i in range(obj_attr_min, obj_attr_max + 1): target_layer_idx = layer_idx + direction * (2 * i) # Look 2*i layers away if 0 <= target_layer_idx < len(layering): target_layer = layering[target_layer_idx] for neighbor_pos, node_id in enumerate(target_layer): if vertices[node_id]["type"] == PLACE_TYPE and vertices[node_id]["objectType"] == current_ot: object_neighbor_positions.append(neighbor_pos) if object_neighbor_positions: avg_object_pos = sum(object_neighbor_positions) / len(object_neighbor_positions) # Weighted average: (1 - attraction) * neighbor_barycenter + attraction * object_barycenter return (1.0 - object_attraction) * avg_barycenter + object_attraction * avg_object_pos else: return avg_barycenter # No object neighbors found, use only structural barycenter elif vertex["type"] == DUMMY_TYPE: # Dummy nodes strongly follow their single neighbor in the fixed layer # This simplifies things and keeps dummy chains straight # Note: TS code seems to average if multiple dummies connect, but a single connection is standard if neighbors_in_fixed: # Should only have one neighbor in the fixed layer by construction return fixed_layer_positions[neighbors_in_fixed[0]] else: return -1.0 # Should not happen for a connected dummy else: # Standard transition return avg_barycenter def _barycenter_ordering_pass(ocpn_layout: Dict, current_layering: List[List[str]], fixed_layer_idx: int, sweep_down: bool, config: Dict) -> List[List[str]]: """Performs one pass of barycenter ordering for layers adjacent to fixed_layer_idx.""" new_layering = copy.deepcopy(current_layering) # Work on a copy layer_to_order_idx = fixed_layer_idx + 1 if sweep_down else fixed_layer_idx - 1 if layer_to_order_idx < 0 or layer_to_order_idx >= len(new_layering): return new_layering # Nothing to order layer_to_order = new_layering[layer_to_order_idx] if not layer_to_order: return new_layering # Empty layer # Compute barycenters for all nodes in the layer_to_order based on fixed_layer_idx barycenters = {} for node_id in layer_to_order: bc_val = _compute_barycenter(ocpn_layout, node_id, layer_to_order_idx, fixed_layer_idx, config) barycenters[node_id] = bc_val # Adjust barycenters for nodes with no neighbors (-1) adjusted_barycenters = {} last_valid_bc = 0.0 for i, node_id in enumerate(layer_to_order): # Iterate in original order first bc = barycenters[node_id] if bc == -1.0: # Assign based on the left neighbor's adjusted value, or 0 if first # Add small offset to maintain original relative order among no-neighbor nodes adjusted_bc = last_valid_bc + i * 1e-6 # Add tiny offset based on original pos adjusted_barycenters[node_id] = adjusted_bc # Don't update last_valid_bc here, use the *actual* last valid one else: adjusted_barycenters[node_id] = bc last_valid_bc = bc # Update the last *valid* barycenter found # Sort the layer based on adjusted barycenters # Stable sort: maintain original order for equal barycenters original_indices = {node_id: i for i, node_id in enumerate(layer_to_order)} def sort_key(node_id): # Primary key: adjusted barycenter # Secondary key: original index for stability return (adjusted_barycenters[node_id], original_indices[node_id]) ordered_layer = sorted(layer_to_order, key=sort_key) new_layering[layer_to_order_idx] = ordered_layer return new_layering def _count_crossings(ocpn_layout: Dict, layering: List[List[str]]) -> int: """Counts the number of edge crossings in the current layering.""" crossings = 0 vertices = ocpn_layout["vertices"] arcs_data = ocpn_layout.get("arcs", {}) # Build a map of node_id to its position within its layer node_positions = {} for r, layer in enumerate(layering): for c, node_id in enumerate(layer): node_positions[node_id] = (r, c) # Iterate through pairs of layers for r in range(len(layering) - 1): layer1 = layering[r] layer2 = layering[r + 1] # Get all arcs between layer r and layer r+1 # Need arcs considering dummy nodes represent segments arcs_between = [] for arc_id, arc in arcs_data.items(): src, tgt = arc['source'], arc['target'] pos_src = node_positions.get(src) pos_tgt = node_positions.get(tgt) if pos_src and pos_tgt: # Check if layers match r and r+1 (in either direction) if (pos_src[0] == r and pos_tgt[0] == r + 1): arcs_between.append( {'source': src, 'target': tgt, 'source_pos': pos_src[1], 'target_pos': pos_tgt[1]}) elif (pos_src[0] == r + 1 and pos_tgt[0] == r): arcs_between.append( {'source': tgt, 'target': src, 'source_pos': pos_tgt[1], 'target_pos': pos_src[1]}) # Bilayer crossing counting (simplified - optimal is complex) # This uses the standard pairwise comparison method for i in range(len(arcs_between)): for j in range(i + 1, len(arcs_between)): arc1 = arcs_between[i] arc2 = arcs_between[j] # Check if endpoints cross if (arc1['source_pos'] < arc2['source_pos'] and arc1['target_pos'] > arc2['target_pos']) or \ (arc1['source_pos'] > arc2['source_pos'] and arc1['target_pos'] < arc2['target_pos']): crossings += 1 return crossings def _order_vertices(layout_data: Dict, config: Dict): """Orders vertices within layers using the barycenter method.""" print("Step 4: Vertex Ordering (Barycenter Heuristic)...") max_iterations = config.get("max_barycenter_iterations", 24) # Max iterations without improvement current_layering = layout_data["layering"] best_layering = copy.deepcopy(current_layering) best_score = _count_crossings(layout_data, best_layering) print(f" Initial crossing score: {best_score}") no_improvement_counter = 0 iter_count = 0 # Initial adjustment based on object centrality (if provided) - simplified # TODO: Implement object centrality pre-sorting if needed, TS version does this. while no_improvement_counter < max_iterations: iter_count += 1 layering_before_sweep = copy.deepcopy(current_layering) # Downward sweep (fix layer i, order layer i+1) for i in range(len(current_layering) - 1): current_layering = _barycenter_ordering_pass(layout_data, current_layering, fixed_layer_idx=i, sweep_down=True, config=config) # Upward sweep (fix layer i, order layer i-1) for i in range(len(current_layering) - 1, 0, -1): current_layering = _barycenter_ordering_pass(layout_data, current_layering, fixed_layer_idx=i, sweep_down=False, config=config) current_score = _count_crossings(layout_data, current_layering) if current_score < best_score: best_score = current_score best_layering = copy.deepcopy(current_layering) no_improvement_counter = 0 print(f" Iteration {iter_count}: Improved score to {best_score}") else: no_improvement_counter += 1 print(f" Iteration {iter_count}: Score {current_score} (no improvement)") # Check for convergence (layering didn't change) if current_layering == layering_before_sweep: print(f" Converged after {iter_count} iterations.") break # Check for oscillations (revisiting a previous state)? More complex to track. print(f" Final crossing score: {best_score} after {iter_count} iterations.") layout_data["layering"] = best_layering # Update the 'pos' attribute in the vertex data for r, layer in enumerate(best_layering): for c, node_id in enumerate(layer): layout_data["vertices"][node_id]['pos'] = c # --- Vertex Positioning (Brandes & Köpf Heuristic Variant) --- def _mark_type1_conflicts(layout_data: Dict): """Marks arcs that cross 'inner segments' (dummy-dummy connections).""" vertices = layout_data["vertices"] arcs = layout_data["arcs"] layering = layout_data["layering"] for arc_id, arc in arcs.items(): arc['type1'] = False # Reset # Iterate through layers where conflicts can occur for i in range(len(layering) - 2): # Need layers i, i+1, i+2 layer_i = layering[i] layer_i1 = layering[i + 1] layer_i2 = layering[i + 2] pos_i1 = {node_id: pos for pos, node_id in enumerate(layer_i1)} pos_i = {node_id: pos for pos, node_id in enumerate(layer_i)} inner_segments = [] # Tuples (upper_dummy_pos, lower_dummy_pos) in layers i, i+1 # Find inner segments between layer i and i+1 for k, v_i1 in enumerate(layer_i1): node_i1_data = vertices[v_i1] if node_i1_data['type'] == DUMMY_TYPE: upper_neighbor_id = node_i1_data.get("upper_neighbor") if upper_neighbor_id and upper_neighbor_id in pos_i: upper_neighbor_data = vertices[upper_neighbor_id] if upper_neighbor_data['type'] == DUMMY_TYPE: # Found inner segment (dummy -> dummy) inner_segments.append((pos_i[upper_neighbor_id], k)) if not inner_segments: continue # No conflicts possible if no inner segments inner_segments.sort() # Sort by position in layer i k0 = 0 current_inner_idx = 0 # Check non-inner segments between i and i+1 for crossings for l1, v_i1 in enumerate(layer_i1): # Iterate through nodes in layer i+1 k1 = len(layer_i) - 1 # Default right boundary is_lower_end_of_inner = False # Find the right boundary k1 defined by the next inner segment starting at or after v_i1 while current_inner_idx < len(inner_segments) and inner_segments[current_inner_idx][1] <= l1: k1 = inner_segments[current_inner_idx][0] is_lower_end_of_inner = (inner_segments[current_inner_idx][1] == l1) current_inner_idx += 1 if not is_lower_end_of_inner: # Only check non-inner segments # Get upper neighbors in layer i for v_i1 upper_neighbors_i = [] for arc_id, arc_data in arcs.items(): if arc_data['target'] == v_i1 and arc_data['source'] in pos_i: upper_neighbors_i.append(arc_data['source']) elif arc_data['source'] == v_i1 and arc_data[ 'target'] in pos_i: # Should not happen if directed correctly pass for u_i in upper_neighbors_i: k = pos_i[u_i] # Check if upper neighbor u_i is outside the bounds [k0, k1] defined by inner segments if k < k0 or k > k1: # This non-inner segment crosses an inner segment - mark as type 1 connecting_arcs = get_arcs_between(u_i, v_i1, layout_data) for conn_arc in connecting_arcs: # Ensure we only mark non-inner segments is_seg_inner = ( vertices[u_i]['type'] == DUMMY_TYPE and vertices[v_i1]['type'] == DUMMY_TYPE) if not is_seg_inner: conn_arc['type1'] = True # Update left boundary for next iteration if is_lower_end_of_inner: k0 = k1 def _vertical_alignment(ocpn_layout: Dict, current_layering: List[List[str]], pos: Dict[str, int], sweep_down: bool) -> \ Tuple[Dict[str, str], Dict[str, str]]: """Aligns vertices vertically with median neighbors.""" root = {vid: vid for vid in ocpn_layout["vertices"]} align = {vid: vid for vid in ocpn_layout["vertices"]} vertices = ocpn_layout["vertices"] # Iterate through layers in sweep direction layer_indices = range(len(current_layering)) if sweep_down else range(len(current_layering) - 1, -1, -1) for i in layer_indices: fixed_layer_idx = i - 1 if sweep_down else i + 1 if fixed_layer_idx < 0 or fixed_layer_idx >= len(current_layering): continue # Boundary layer layer_k = current_layering[i] r = -1 # Tracks the position of the rightmost aligned node in fixed layer for vk in layer_k: # Iterate through nodes vk in current layer i # Get neighbors in the fixed layer (upper if sweep_down, lower otherwise) neighbors = [] neighbor_dir = 'up' if sweep_down else 'down' # Find neighbors based on arc directionality *relative to the layers* arcs_data = ocpn_layout.get("arcs", {}) fixed_layer_nodes = set(current_layering[fixed_layer_idx]) for arc_id, arc in arcs_data.items(): src, tgt = arc['source'], arc['target'] if sweep_down: # Looking for upper neighbors (in fixed_layer_idx = i-1) if tgt == vk and src in fixed_layer_nodes: neighbors.append(src) else: # Looking for lower neighbors (in fixed_layer_idx = i+1) if src == vk and tgt in fixed_layer_nodes: neighbors.append(tgt) if not neighbors: continue # Sort neighbors by position in their layer neighbors.sort(key=lambda n: pos[n]) # Get median neighbor(s) indices m_low = math.floor((len(neighbors) - 1) / 2) m_high = math.ceil((len(neighbors) - 1) / 2) median_indices = list(set([m_low, m_high])) # Use set to handle single median case for m_idx in median_indices: median_neighbor = neighbors[m_idx] # Check if vk can be aligned with this median neighbor if align[vk] == vk: # If vk is not already aligned # Check if the edge is marked as type 1 conflict connecting_arcs = get_arcs_between(median_neighbor, vk, ocpn_layout) is_marked = any(a.get('type1', False) for a in connecting_arcs) if not is_marked and pos[median_neighbor] > r: # Align vk to median_neighbor align[median_neighbor] = vk root[vk] = root[median_neighbor] align[vk] = root[vk] # Point vk to the root of the block r = pos[median_neighbor] # Update rightmost position return root, align def _place_block(ocpn_layout: Dict, layering: List[List[str]], v: str, x: Dict[str, Optional[float]], pos: Dict[str, int], roots: Dict[str, str], sink: Dict[str, str], shift: Dict[str, float], aligns: Dict[str, str], config: Dict): """Recursive part of horizontal compaction.""" if x.get(v) is None: # If x[v] is not yet defined x[v] = 0.0 w = v while True: w_pos = pos[w] # Find layer index for w w_layer_idx = -1 for idx, l in enumerate(layering): if w in l: w_layer_idx = idx break if w_pos > 0 and w_layer_idx != -1: # If w has a predecessor in its layer predecessor = layering[w_layer_idx][w_pos - 1] u = roots[predecessor] # Root of the predecessor block # Recursively place the predecessor block _place_block(ocpn_layout, layering, u, x, pos, roots, sink, shift, aligns, config) # Update sink relationship if sink[v] == v: sink[v] = sink[u] # Calculate separation delta # Use max dimension based on rankdir place_dim = config.get("place_radius", 10) * 2 trans_h = config.get("transition_height", 20) trans_w = config.get("transition_width", 50) silent_w = config.get("silent_transition_width", 10) max_trans_dim = max(trans_h, trans_w, silent_w) # Simplified max size delta = config.get("vertex_sep", 20) + max(place_dim, max_trans_dim) # Use a generic max size estimate if sink[v] != sink[u]: # Blocks have different sinks, calculate required shift current_shift = (x.get(v, 0.0) or 0.0) - (x.get(u, 0.0) or 0.0) - delta shift[sink[u]] = min(shift.get(sink[u], float('inf')), current_shift) else: # Blocks have the same sink, update position directly new_x = (x.get(u, 0.0) or 0.0) + delta x[v] = max(x.get(v, 0.0) or 0.0, new_x) # Move to the next node in the alignment block w = aligns[w] if w == v: # Completed the block cycle break def _horizontal_compaction(ocpn_layout: Dict, layering: List[List[str]], roots: Dict[str, str], aligns: Dict[str, str], pos: Dict[str, int], config: Dict) -> Tuple[Dict[str, Optional[float]], float]: """Assigns preliminary x-coordinates based on vertical alignment.""" x: Dict[str, Optional[float]] = {vid: None for vid in ocpn_layout["vertices"]} sink: Dict[str, str] = {vid: vid for vid in ocpn_layout["vertices"]} shift: Dict[str, float] = {vid: float('inf') for vid in ocpn_layout["vertices"]} # Place blocks rooted at v for layer in layering: for v in layer: if roots[v] == v: # If v is a root _place_block(ocpn_layout, layering, v, x, pos, roots, sink, shift, aligns, config) # Calculate absolute coordinates abs_x: Dict[str, Optional[float]] = {} max_coord = 0.0 min_coord = float('inf') for layer in layering: for v in layer: root_v = roots[v] if x.get(root_v) is not None: current_x = x[root_v] sink_root_v = sink[root_v] # Apply shift if the sink has a finite shift value if shift.get(sink_root_v, float('inf')) != float('inf'): current_x += shift[sink_root_v] abs_x[v] = current_x max_coord = max(max_coord, current_x) min_coord = min(min_coord, current_x) else: abs_x[v] = None # Should not happen if graph is connected # Normalize coordinates to start from 0 (or config.borderPadding) border = config.get("border_padding", 20) shift_val = border - min_coord if min_coord != float('inf') else border final_x = {} final_max = 0.0 for v, val in abs_x.items(): if val is not None: final_x[v] = val + shift_val final_max = max(final_max, final_x[v]) else: final_x[v] = None # Add padding to the max coordinate as well final_max += border return final_x, final_max def _position_vertices(layout_data: Dict, config: Dict): """Assigns final X/Y coordinates using 4-pass heuristic.""" print("Step 5: Vertex Positioning...") # Mark type 1 conflicts first _mark_type1_conflicts(layout_data) layouts = [] # Store results of 4 passes {vertex_id: coord} original_layering = copy.deepcopy(layout_data["layering"]) # --- Perform 4 passes --- for vert_dir in [0, 1]: # 0: Down, 1: Up for horz_dir in [0, 1]: # 0: Left->Right, 1: Right->Left print(f" Running positioning pass (vert_dir={vert_dir}, horz_dir={horz_dir})...") current_layering = copy.deepcopy(original_layering) # 1. Transform layering based on direction if vert_dir == 1: # Up current_layering.reverse() if horz_dir == 1: # Right->Left for layer in current_layering: layer.reverse() # Create position map for this transformed layering pos = {} for r, layer in enumerate(current_layering): for c, node_id in enumerate(layer): pos[node_id] = c # 2. Vertical Alignment roots, aligns = _vertical_alignment(layout_data, current_layering, pos, sweep_down=(vert_dir == 0)) # 3. Horizontal Compaction coords, max_coord = _horizontal_compaction(layout_data, current_layering, roots, aligns, pos, config) # 4. Undo horizontal transformation if needed if horz_dir == 1: final_coords = {} for v, c in coords.items(): if c is not None: final_coords[v] = max_coord - c else: final_coords[v] = None layouts.append(final_coords) else: layouts.append(coords) # --- Combine results --- final_coords_x = {} final_coords_y = {} vertices = layout_data["vertices"] layering = layout_data["layering"] # Use original layering for Y-coord calc border_padding = config.get("border_padding", 20) layer_sep = config.get("layer_sep", 50) direction = config.get("rankdir", "TB") # TB or LR # 1. Calculate final X coordinate (median of 4 passes) for v_id in vertices.keys(): candidate_coords = [] for layout_pass in layouts: coord = layout_pass.get(v_id) if coord is not None: candidate_coords.append(coord) if candidate_coords: candidate_coords.sort() # Use median (average of middle two for even number) if len(candidate_coords) > 0: median_idx1 = math.floor((len(candidate_coords) - 1) / 2) median_idx2 = math.ceil((len(candidate_coords) - 1) / 2) median_coord = (candidate_coords[median_idx1] + candidate_coords[median_idx2]) / 2 final_coords_x[v_id] = median_coord else: final_coords_x[v_id] = border_padding # Default if no coord found? # 2. Calculate Y coordinate based on layer and layer sizes layer_sizes = [] # Store { 'layer': idx, 'size': height/width } max_layer_dim = 0 for i, layer in enumerate(layering): max_dim = 0 for node_id in layer: node = vertices[node_id] node_h, node_w = 0, 0 if node['type'] == PLACE_TYPE: r = config.get("place_radius", 10) node_h, node_w = r * 2, r * 2 elif node['type'] == TRANSITION_TYPE: node_h = config.get("transition_height", 20) if node['silent']: node_w = config.get("silent_transition_width", 10) else: node_w = config.get("transition_width", 50) elif node['type'] == DUMMY_TYPE: # Give dummies minimal size for separation calculation node_h, node_w = 1, 1 if direction == "TB": max_dim = max(max_dim, node_h) else: # LR max_dim = max(max_dim, node_w) layer_sizes.append({'layer': i, 'size': max_dim}) max_layer_dim = max(max_layer_dim, max_dim) current_pos = border_padding layer_centers = {} for i in range(len(layering)): layer_size = layer_sizes[i]['size'] if i < len(layer_sizes) else max_layer_dim # Use max if missing layer_center = current_pos + layer_size / 2.0 layer_centers[i] = layer_center current_pos += layer_size + layer_sep # Assign coordinates based on rankdir max_x, max_y = 0.0, 0.0 for v_id, node in vertices.items(): x_coord = final_coords_x.get(v_id, border_padding) # Horizontal position from passes y_coord = layer_centers.get(node['layer'], border_padding) # Vertical position from layer if direction == "TB": node['x'] = x_coord node['y'] = y_coord # Graphviz Y is often inverted, handle during drawing max_x = max(max_x, x_coord) max_y = max(max_y, y_coord) else: # LR node['x'] = y_coord # Layer position becomes X node['y'] = x_coord # Calculated position becomes Y max_x = max(max_x, y_coord) max_y = max(max_y, x_coord) layout_data["max_x"] = max_x + border_padding layout_data["max_y"] = max_y + border_padding # Store bounds for SVG sizing? print(f" Assigned final coordinates (max_x={max_x:.2f}, max_y={max_y:.2f}).") # --- Graphviz Drawing --- def _create_graphviz_digraph(ocpn: Dict, layout_data: Dict, parameters: Dict) -> Digraph: """Generates the Graphviz Digraph object from the layout with separate handling for direct edges and waypoint edges.""" print("Step 6: Generating Graphviz output...") image_format = exec_utils.get_param_value(Parameters.FORMAT, parameters, "png") bgcolor = exec_utils.get_param_value(Parameters.BGCOLOR, parameters, constants.DEFAULT_BGCOLOR) rankdir = exec_utils.get_param_value(Parameters.RANKDIR, parameters, "TB") engine = exec_utils.get_param_value(Parameters.ENGINE, parameters, "dot") enable_graph_title = exec_utils.get_param_value(Parameters.ENABLE_GRAPH_TITLE, parameters, constants.DEFAULT_ENABLE_GRAPH_TITLES) graph_title = exec_utils.get_param_value(Parameters.GRAPH_TITLE, parameters, "Object-Centric Petri Net (Sugiyama Layout)") viz = Digraph( "ocpn_sugiyama", engine=engine, graph_attr={ "bgcolor": bgcolor, "rankdir": rankdir, "splines": "line", }, node_attr={'shape': 'box'}, ) if enable_graph_title: viz.attr(label=f'<<FONT POINT-SIZE="20">{graph_title}</FONT>>', labelloc="t") vertices = layout_data["vertices"] max_y = layout_data.get("max_y", 600) # Helper function to compute luminance and pick contrasting font color def get_contrasting_color(hex_color: str) -> str: """Returns black or white based on luminance of the hex color for readability.""" # Remove '#' and convert to RGB hex_color = hex_color.lstrip('#') rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) # Calculate relative luminance (simplified sRGB luminance) luminance = 0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2] return "white" if luminance < 128 else "black" # Add Nodes for node_id, node in vertices.items(): if node.get('x') is None or node.get('y') is None: print(f"Warning: Node {node_id} has no coordinates, skipping.") continue pos_x = node['x'] pos_y = node['y'] pos_str = f"{pos_x:.2f},{pos_y:.2f}!" attrs = { "pos": pos_str, "label": node['label'] if node['label'] else "", "id": node_id } if node['type'] == PLACE_TYPE: ot_color = ot_to_color(node['objectType']) font_color = get_contrasting_color(ot_color) radius_gv = parameters.get("place_radius", 10) / 72.0 * 1.5 # Increase size by 50% if node.get("is_source"): ot_label = f"{node['objectType']}" attrs.update({ "shape": "ellipse", "fillcolor": ot_color, "style": "filled", "fontcolor": font_color, "width": f"{radius_gv * 2.5:.2f}", # Larger ellipse "height": f"{radius_gv * 1.5:.2f}", "fixedsize": "true", "label": f"<<FONT POINT-SIZE='8'>{ot_label}</FONT>>", # Smaller font (4pt) "labelloc": "c", }) if node.get("is_source") and node.get("is_sink"): attrs["peripheries"] = "2" else: attrs.update({ "shape": "circle", "fillcolor": ot_color, "style": "filled", "fontcolor": font_color, "width": f"{radius_gv * 2:.2f}", # Larger circle "height": f"{radius_gv * 2:.2f}", "fixedsize": "true", "label": "", }) if node.get("is_sink"): attrs["peripheries"] = "2" elif node['type'] == TRANSITION_TYPE: height_gv = parameters.get("transition_height", 20) / 72.0 if node['silent']: width_gv = parameters.get("silent_transition_width", 10) / 72.0 attrs.update({ "shape": "box", "fillcolor": parameters.get("transition_fill_color", "black"), "style": "filled", "width": f"{width_gv:.2f}", "height": f"{height_gv:.2f}", "fixedsize": "true", "label": "", }) else: base_width_gv = parameters.get("transition_width", 40) / 72.0 width_gv = base_width_gv * 1.5 font_size = parameters.get("font_size", 10) - 2 label = node['label'] max_label_width = 10 if len(label) > max_label_width: wrapped_label = "<BR/>".join([label[i:i+max_label_width] for i in range(0, len(label), max_label_width)]) attrs["label"] = f"<{wrapped_label}>" else: attrs["label"] = label attrs.update({ "shape": "box", "fillcolor": parameters.get("transition_fill_color", "white"), "style": "filled", "color": parameters.get("transition_color", "black"), "width": f"{width_gv:.2f}", "height": f"{height_gv:.2f}", "fixedsize": "true", "fontname": parameters.get("font_name", "Arial"), "fontsize": str(font_size), "fontcolor": parameters.get("transition_textcolor", "black"), }) elif node['type'] == DUMMY_TYPE: attrs.update({ "shape": "point", "width": "0.01", "height": "0.01", "label": "", "fixedsize": "true", "style": "invis", }) viz.node(node_id, **attrs) # Add Edges (unchanged from original) arcs = layout_data["arcs"] original_arcs = layout_data["original_arcs"] direct_edges = [] waypoint_edges = {} for arc_id, arc in arcs.items(): src = arc['source'] tgt = arc['target'] if src not in vertices or tgt not in vertices or \ vertices[src].get('x') is None or vertices[tgt].get('x') is None: continue src_type = vertices[src]['type'] tgt_type = vertices[tgt]['type'] original_arc_id = arc.get('original_arc_id', arc_id) original_arc = original_arcs.get(original_arc_id, arc) if src_type != DUMMY_TYPE and tgt_type != DUMMY_TYPE and not original_arc.get('dummy_nodes'): direct_edges.append(arc) else: if original_arc_id not in waypoint_edges: ultimate_src = original_arc.get('source') ultimate_tgt = original_arc.get('target') dummy_nodes = original_arc.get('dummy_nodes', []) waypoint_edges[original_arc_id] = { 'original_arc': original_arc, 'segments': [], 'ultimate_source': ultimate_src, 'ultimate_target': ultimate_tgt, 'dummy_nodes': dummy_nodes } waypoint_edges[original_arc_id]['segments'].append(arc) for arc in direct_edges: src = arc['source'] tgt = arc['target'] ot = arc.get("objectType", layout_data["object_types"][0] if layout_data["object_types"] else "unknown") ot_color = ot_to_color(ot) edge_attrs = { "color": ot_color, "penwidth": str(parameters.get("arc_size", 1.0) * arc.get("weight", 1.0)), "arrowsize": str(parameters.get("arrowhead_size", 0.7)), "id": arc['id'], "splines": "line", } if arc.get('variable'): edge_attrs["penwidth"] = str(float(edge_attrs["penwidth"]) * 1.5) edge_attrs["style"] = "dashed" if arc.get('reversed', False): edge_attrs['dir'] = 'back' else: edge_attrs['dir'] = 'forward' viz.edge(src, tgt, **edge_attrs) for orig_arc_id, wp_data in waypoint_edges.items(): orig_arc = wp_data['original_arc'] segments = wp_data['segments'] ultimate_src = wp_data['ultimate_source'] ultimate_tgt = wp_data['ultimate_target'] ot = orig_arc.get("objectType", layout_data["object_types"][0] if layout_data["object_types"] else "unknown") ot_color = ot_to_color(ot) base_edge_attrs = { "color": ot_color, "penwidth": str(parameters.get("arc_size", 1.0) * orig_arc.get("weight", 1.0)), "arrowsize": str(parameters.get("arrowhead_size", 0.7)), "splines": "polyline", } if orig_arc.get('variable'): base_edge_attrs["penwidth"] = str(float(base_edge_attrs["penwidth"]) * 1.5) base_edge_attrs["style"] = "dashed" for segment in segments: src = segment['source'] tgt = segment['target'] segment_attrs = base_edge_attrs.copy() segment_attrs['id'] = segment['id'] if tgt == ultimate_tgt: segment_attrs['arrowhead'] = 'normal' segment_attrs['dir'] = 'forward' elif src == ultimate_tgt: segment_attrs['arrowhead'] = 'normal' segment_attrs['dir'] = 'back' else: segment_attrs['arrowhead'] = 'none' viz.edge(src, tgt, **segment_attrs) viz.format = image_format.replace("html", "plain-ext") return viz # --- Main Apply Function ---
[docs] class Parameters(Enum): """ Parameters for the Sugiyama OCPN visualization """ FORMAT = "format" BGCOLOR = "bgcolor" RANKDIR = "rankdir" # TB or LR ENGINE = "engine" # dot, neato, fdp ENABLE_GRAPH_TITLE = "enable_graph_title" GRAPH_TITLE = "graph_title" # Sugiyama specific parameters (matching TS config where possible) SOURCES = "sources" # List of node IDs to force as sources SINKS = "sinks" # List of node IDs to force as sinks VERTEX_SEP = "vertex_sep" # Minimum horizontal distance between nodes LAYER_SEP = "layer_sep" # Minimum vertical distance between layers EDGE_SEP = "edge_sep" # (Not directly used here, vertex_sep dominates) BORDER_PADDING = "border_padding" # Padding around the graph OBJECT_CENTRALITY = "object_centrality" # Dict {ot: centrality_value} (Lower value = more central/left) OBJECT_ATTRACTION = "object_attraction" # Weight (0-1) for place attraction in barycenter OBJECT_ATTRACTION_RANGE_MIN = "object_attraction_range_min" # Min layers distance for attraction OBJECT_ATTRACTION_RANGE_MAX = "object_attraction_range_max" # Max layers distance for attraction MAX_BARYCENTER_ITERATIONS = "max_barycenter_iterations" # Max iterations for ordering # Visual element sizes (in points, roughly 1/72 inch) PLACE_RADIUS = "place_radius" TRANSITION_WIDTH = "transition_width" TRANSITION_HEIGHT = "transition_height" SILENT_TRANSITION_WIDTH = "silent_transition_width" ARC_SIZE = "arc_size" # Base thickness ARROWHEAD_SIZE = "arrowhead_size" INDICATE_ARC_WEIGHT = "indicate_arc_weight" # Boolean INDICATE_VARIABLE_ARCS = "indicate_variable_arcs" # Boolean VARIABLE_ARC_INDICATOR_COLOR = "variable_arc_indicator_color" VARIABLE_ARC_INDICATOR_SIZE = "variable_arc_indicator_size" # Multiplier for thickness # Colors TRANSITION_COLOR = "transition_color" # Border TRANSITION_FILL_COLOR = "transition_fill_color" TRANSITION_TEXT_COLOR = "transition_text_color" DEFAULT_PLACE_COLOR = "default_place_color" # Fallback if OT color fails ARC_DEFAULT_COLOR = "arc_default_color" TYPE_COLOR_MAPPING = "type_color_mapping" # Optional dict {ot: color} override # Font FONT_NAME = "font_name" FONT_SIZE = "font_size"
[docs] def apply(ocpn: Dict[str, Any], parameters: Optional[Dict[Any, Any]] = None) -> Digraph: """ Obtains a visualization of the provided object-centric Petri net using a Sugiyama-based layout algorithm. Args: ocpn (Dict[str, Any]): Object-centric Petri net structure. parameters (Optional[Dict[Any, Any]], optional): Algorithm parameters. Returns: Digraph: A Graphviz digraph object representing the layout. """ if parameters is None: parameters = {} ocpn = deepcopy(ocpn) for ot, (net, im, fm) in ocpn.get("petri_nets", {}).items(): for place in net.places: place.name = ot + "@@" + place.name # --- Set default configuration parameters --- config = { # Basic viz Parameters.FORMAT.value: constants.DEFAULT_FORMAT_GVIZ_VIEW, Parameters.BGCOLOR.value: constants.DEFAULT_BGCOLOR, Parameters.RANKDIR.value: constants.DEFAULT_RANKDIR_GVIZ, Parameters.ENGINE.value: "dot", Parameters.ENABLE_GRAPH_TITLE.value: constants.DEFAULT_ENABLE_GRAPH_TITLES, Parameters.GRAPH_TITLE.value: "Object-Centric Petri Net (Sugiyama Layout)", # Sugiyama layout params Parameters.SOURCES.value: [], Parameters.SINKS.value: [], Parameters.VERTEX_SEP.value: 30, # Adjusted default based on typical node sizes Parameters.LAYER_SEP.value: 50, Parameters.BORDER_PADDING.value: 20, Parameters.OBJECT_CENTRALITY.value: {}, Parameters.OBJECT_ATTRACTION.value: 0.2, # Moderate attraction Parameters.OBJECT_ATTRACTION_RANGE_MIN.value: 1, Parameters.OBJECT_ATTRACTION_RANGE_MAX.value: 2, # Look 1 or 2 place-layers away Parameters.MAX_BARYCENTER_ITERATIONS.value: 24, # Visual sizes (points) Parameters.PLACE_RADIUS.value: 10.0, Parameters.TRANSITION_WIDTH.value: 40.0, Parameters.TRANSITION_HEIGHT.value: 20.0, Parameters.SILENT_TRANSITION_WIDTH.value: 8.0, Parameters.ARC_SIZE.value: 1.0, Parameters.ARROWHEAD_SIZE.value: 0.7, Parameters.INDICATE_ARC_WEIGHT.value: False, Parameters.INDICATE_VARIABLE_ARCS.value: True, Parameters.VARIABLE_ARC_INDICATOR_COLOR.value: "#FF0000", # Red indicator for testing Parameters.VARIABLE_ARC_INDICATOR_SIZE.value: 1.5, # Multiplier # Colors Parameters.TRANSITION_COLOR.value: "black", Parameters.TRANSITION_FILL_COLOR.value: "white", Parameters.TRANSITION_TEXT_COLOR.value: "black", Parameters.DEFAULT_PLACE_COLOR.value: "#D3D3D3", # Light gray Parameters.ARC_DEFAULT_COLOR.value: "black", Parameters.TYPE_COLOR_MAPPING.value: {}, # Font Parameters.FONT_NAME.value: "Arial", Parameters.FONT_SIZE.value: 10, } # Update defaults with user-provided parameters config.update({param.value: val for param, val in parameters.items() if isinstance(param, Parameters)}) # Also allow string keys from user parameters config.update({k: v for k, v in parameters.items() if isinstance(k, str) and k in [p.value for p in Parameters]}) # --- Build initial layout structure --- layout_data = _build_initial_layout(ocpn, config) if not layout_data["vertices"]: print("Error: No vertices found in OCPN structure.") return Digraph() # Return empty graph # --- Run Sugiyama Steps --- _reverse_cycles(layout_data, config) if not _assign_layers(layout_data): print("Error: Layer assignment failed. Cannot proceed.") # Optionally: return a basic graphviz layout without positions? # For now, return empty. return Digraph() # Or maybe the original non-sugiyama viz? _insert_dummy_vertices(layout_data) _order_vertices(layout_data, config) _position_vertices(layout_data, config) # --- Generate Graphviz Output --- gviz_graph = _create_graphviz_digraph(ocpn, layout_data, config) return gviz_graph