'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import uuid
import numpy as np
import math
import copy
from typing import Optional, Dict, Any, List, Tuple, Set
from graphviz import Digraph
from enum import Enum
from pm4py.util import exec_utils, constants, vis_utils
from pm4py.util.lp import solver as lp_solver
from pm4py.objects.petri_net.obj import PetriNet # For type hinting if needed
from copy import deepcopy
# --- Constants matching TypeScript OCPNLayout ---
PLACE_TYPE = "place"
TRANSITION_TYPE = "transition"
DUMMY_TYPE = "dummy"
# --- Helper Functions ---
[docs]
def ot_to_color(ot: str) -> str:
"""Generates a deterministic hex color based on the object type string."""
import hashlib
hash_obj = hashlib.md5(ot.encode('utf-8'))
hash_hex = hash_obj.hexdigest()
color = "#" + hash_hex[:6]
return color
[docs]
def generate_dummy_id() -> str:
"""Generates a unique ID for a dummy node."""
return "dummy_" + str(uuid.uuid4())
[docs]
def generate_arc_id(source_id: str, target_id: str) -> str:
"""Generates a unique ID for an arc."""
return f"arc_{source_id}_{target_id}_{uuid.uuid4()}"
[docs]
def get_neighbors(vertex_id: str, layout_data: Dict, direction: str) -> List[str]:
"""Gets upper ('up') or lower ('down') neighbors of a vertex."""
neighbors = set()
arcs = layout_data.get("arcs", {})
if direction == 'up':
for arc_id, arc_data in arcs.items():
if arc_data['target'] == vertex_id:
neighbors.add(arc_data['source'])
elif direction == 'down':
for arc_id, arc_data in arcs.items():
if arc_data['source'] == vertex_id:
neighbors.add(arc_data['target'])
return list(neighbors)
[docs]
def get_arcs_between(u: str, v: str, layout_data: Dict) -> List[Dict]:
"""Gets arcs connecting vertex u and vertex v directly."""
connecting_arcs = []
for arc_id, arc_data in layout_data.get("arcs", {}).items():
if (arc_data['source'] == u and arc_data['target'] == v) or \
(arc_data['source'] == v and arc_data['target'] == u):
connecting_arcs.append(arc_data)
return connecting_arcs
[docs]
def get_arcs_between_layers(layer_idx1: int, layer_idx2: int, layout_data: Dict) -> List[Dict]:
"""Gets all arcs connecting nodes between two specific layers."""
arcs_between = []
vertices = layout_data.get("vertices", {})
layering = layout_data.get("layering", [])
if layer_idx1 < 0 or layer_idx1 >= len(layering) or \
layer_idx2 < 0 or layer_idx2 >= len(layering):
return []
layer1_nodes = set(layering[layer_idx1])
layer2_nodes = set(layering[layer_idx2])
for arc_id, arc_data in layout_data.get("arcs", {}).items():
src = arc_data['source']
tgt = arc_data['target']
if (src in layer1_nodes and tgt in layer2_nodes) or \
(src in layer2_nodes and tgt in layer1_nodes):
if src in layer1_nodes and tgt in layer2_nodes:
arcs_between.append(arc_data)
elif src in layer2_nodes and tgt in layer1_nodes:
arcs_between.append(arc_data)
return arcs_between
[docs]
def is_incident_to_inner_segment(ocpn_layout: Dict, vertex_id: str) -> bool:
"""Checks if a vertex is a dummy node connected to another dummy node above it."""
vertices = ocpn_layout.get("vertices", {})
v = vertices.get(vertex_id)
if v and v.get("type") == DUMMY_TYPE:
upper_neighbor_id = v.get("upper_neighbor")
if upper_neighbor_id:
upper_neighbor = vertices.get(upper_neighbor_id)
if upper_neighbor and upper_neighbor.get("type") == DUMMY_TYPE:
return True
return False
# --- Core Sugiyama Steps ---
def _build_initial_layout(ocpn: Dict[str, Any], parameters: Dict) -> Dict:
object_types = sorted(ocpn.get("object_types", []))
layout_data = {
"vertices": {},
"arcs": {},
"original_arcs": {},
"object_types": object_types,
"config": parameters
}
vertex_map = {}
for ot in object_types:
net_data = ocpn.get("petri_nets", {}).get(ot)
if not net_data:
print(f"Warning: No Petri net found for object type '{ot}', skipping.")
continue
net, im, fm = net_data
# Create places
for place in net.places:
place_id = str(place)
if (PLACE_TYPE, place_id) not in vertex_map:
layout_id = f"place_{ot}_{place.name}_{uuid.uuid4()}"
vertex_map[(PLACE_TYPE, place_id)] = layout_id
is_source = place in im
is_sink = place in fm
layout_data["vertices"][layout_id] = {
"id": layout_id,
"original_id": place_id,
"name": place.name or f"p_{ot}",
"label": place.name or f"p_{ot}",
"objectType": ot,
"type": PLACE_TYPE,
"layer": -1,
"pos": -1,
"x": None,
"y": None,
"is_source": is_source,
"is_sink": is_sink,
"root": layout_id,
"align": layout_id,
"sink": layout_id,
"shift": float('inf'),
"upper_neighbor": None,
"lower_neighbor": None,
}
# Create transitions
for trans in net.transitions:
trans_id = str(trans)
layout_id = None
if trans.label:
if (TRANSITION_TYPE, trans.label) in vertex_map:
layout_id = vertex_map[(TRANSITION_TYPE, trans.label)]
layout_data["vertices"][layout_id].setdefault("adjacentObjectTypes", set()).add(ot)
else:
layout_id = f"trans_{trans.label}_{uuid.uuid4()}"
vertex_map[(TRANSITION_TYPE, trans.label)] = layout_id
layout_data["vertices"][layout_id] = {
"id": layout_id,
"original_id": trans_id,
"name": trans.label,
"label": trans.label,
"objectType": None,
"adjacentObjectTypes": {ot},
"type": TRANSITION_TYPE,
"silent": False,
"layer": -1,
"pos": -1,
"x": None,
"y": None,
"root": layout_id,
"align": layout_id,
"sink": layout_id,
"shift": float('inf'),
"upper_neighbor": None,
"lower_neighbor": None,
}
else:
layout_id = f"silent_{ot}_{trans.name}_{uuid.uuid4()}"
vertex_map[(TRANSITION_TYPE, trans_id)] = layout_id
layout_data["vertices"][layout_id] = {
"id": layout_id,
"original_id": trans_id,
"name": trans.name or "silent",
"label": None,
"objectType": ot,
"adjacentObjectTypes": {ot},
"type": TRANSITION_TYPE,
"silent": True,
"layer": -1,
"pos": -1,
"x": None,
"y": None,
"root": layout_id,
"align": layout_id,
"sink": layout_id,
"shift": float('inf'),
"upper_neighbor": None,
"lower_neighbor": None,
}
# Create arcs
for arc in net.arcs:
source_node = arc.source
target_node = arc.target
source_id_orig = str(source_node)
target_id_orig = str(target_node)
source_type = PLACE_TYPE if isinstance(source_node, PetriNet.Place) else TRANSITION_TYPE
target_type = PLACE_TYPE if isinstance(target_node, PetriNet.Place) else TRANSITION_TYPE
if source_type == TRANSITION_TYPE and source_node.label:
source_layout_id = vertex_map.get((source_type, source_node.label))
else:
source_layout_id = vertex_map.get((source_type, source_id_orig))
if target_type == TRANSITION_TYPE and target_node.label:
target_layout_id = vertex_map.get((target_type, target_node.label))
else:
target_layout_id = vertex_map.get((target_type, target_id_orig))
if source_layout_id and target_layout_id:
arc_exists = False
existing_arc_id = None
for arc_id_check, arc_data_check in layout_data["arcs"].items():
if (arc_data_check['source'] == source_layout_id and arc_data_check['target'] == target_layout_id) or \
(arc_data_check['source'] == target_layout_id and arc_data_check['target'] == source_layout_id):
arc_exists = True
existing_arc_id = arc_id_check
break
if not arc_exists:
arc_id = generate_arc_id(source_layout_id, target_layout_id)
is_variable = False
if ot in ocpn.get("double_arcs_on_activity", {}):
act_label = None
if source_type == TRANSITION_TYPE and source_node.label:
act_label = source_node.label
elif target_type == TRANSITION_TYPE and target_node.label:
act_label = target_node.label
if act_label and ocpn["double_arcs_on_activity"][ot].get(act_label, False):
is_variable = True
arc_data = {
"id": arc_id,
"source": source_layout_id, # keep ORIGINAL semantics
"target": target_layout_id, # keep ORIGINAL semantics
"objectType": ot,
"original": True,
"reversed": False, # will be set True if reversed for DAG
"dummy_nodes": [],
"minLayer": -1,
"maxLayer": -1,
"type1": False,
"weight": arc.weight if hasattr(arc, 'weight') else 1,
"variable": is_variable,
}
layout_data["arcs"][arc_id] = arc_data
layout_data["original_arcs"][arc_id] = copy.deepcopy(arc_data)
else:
if source_type == TRANSITION_TYPE:
layout_data["vertices"][source_layout_id].setdefault("adjacentObjectTypes", set()).add(ot)
if target_type == TRANSITION_TYPE:
layout_data["vertices"][target_layout_id].setdefault("adjacentObjectTypes", set()).add(ot)
pass
else:
print(f"Warning: Could not find layout nodes for arc {source_id_orig} -> {target_id_orig} in OT {ot}")
all_activity_nodes = {v['name'] for v in layout_data['vertices'].values() if
v['type'] == TRANSITION_TYPE and not v['silent']}
for act in ocpn.get("activities", set()):
if act not in all_activity_nodes:
layout_id = f"trans_{act}_unconnected_{uuid.uuid4()}"
layout_data["vertices"][layout_id] = {
"id": layout_id, "original_id": None, "name": act, "label": act,
"objectType": None, "adjacentObjectTypes": set(), "type": TRANSITION_TYPE,
"silent": False, "layer": -1, "pos": -1, "x": None, "y": None,
"root": layout_id, "align": layout_id, "sink": layout_id, "shift": float('inf'),
"upper_neighbor": None, "lower_neighbor": None,
}
print(f"Warning: Activity '{act}' not found in Petri net transitions, adding as unconnected node.")
return layout_data
# --- Graph representation for algorithms ---
[docs]
class OCPNGraph:
"""Helper class to represent the graph for cycle breaking and layer assignment."""
def __init__(self, layout_data: Dict):
self.nodes = list(layout_data["vertices"].keys())
self.arcs_data = layout_data["arcs"]
self.adj = {n: [] for n in self.nodes}
self.rev_adj = {n: [] for n in self.nodes}
self.arc_list = []
for arc_id, arc in self.arcs_data.items():
src, tgt = arc['source'], arc['target']
if src == tgt:
continue
if src in self.nodes and tgt in self.nodes:
self.adj[src].append(tgt)
self.rev_adj[tgt].append(src)
self.arc_list.append({'source': src, 'target': tgt, 'id': arc_id})
[docs]
def get_out_degree(self, node: str) -> int:
return len(self.adj.get(node, []))
[docs]
def get_in_degree(self, node: str) -> int:
return len(self.rev_adj.get(node, []))
[docs]
def remove_node(self, node: str):
if node not in self.nodes:
return
successors = self.adj.pop(node, [])
predecessors = self.rev_adj.pop(node, [])
for succ in successors:
if succ in self.rev_adj:
self.rev_adj[succ].remove(node)
for pred in predecessors:
if pred in self.adj:
self.adj[pred].remove(node)
self.nodes.remove(node)
self.arc_list = [a for a in self.arc_list if a['source'] != node and a['target'] != node]
[docs]
def remove_nodes(self, nodes_to_remove: List[str]):
for node in nodes_to_remove:
self.remove_node(node)
[docs]
def get_sink(self) -> Optional[str]:
for node in self.nodes:
if self.get_out_degree(node) == 0:
return node
return None
[docs]
def get_source(self) -> Optional[str]:
for node in self.nodes:
if self.get_in_degree(node) == 0:
return node
return None
[docs]
def get_max_out_degree_node(self) -> Optional[str]:
max_degree = -1
max_node = None
for node in self.nodes:
degree = self.get_out_degree(node) - self.get_in_degree(node)
if degree > max_degree:
max_degree = degree
max_node = node
return max_node
def _modified_greedy_fas(graph: OCPNGraph, sources: List[str], sinks: List[str]) -> List[str]:
"""Computes Feedback Arc Set using greedy algorithm considering sources/sinks."""
s1 = list(sources)
s2 = list(sinks)
s1 = [n for n in s1 if n in graph.nodes]
s2 = [n for n in s2 if n in graph.nodes]
if s1:
s1.sort(key=lambda n: graph.get_out_degree(n) - graph.get_in_degree(n), reverse=True)
graph.remove_nodes(s1)
if s2:
s2.sort(key=lambda n: graph.get_out_degree(n) - graph.get_in_degree(n), reverse=True)
graph.remove_nodes(s2)
temp_s1 = []
temp_s2 = []
while graph.nodes:
sink = graph.get_sink()
while sink:
temp_s2.insert(0, sink)
graph.remove_node(sink)
sink = graph.get_sink()
source = graph.get_source()
while source:
temp_s1.append(source)
graph.remove_node(source)
source = graph.get_source()
if graph.nodes:
node = graph.get_max_out_degree_node()
if node:
temp_s1.append(node)
graph.remove_node(node)
else:
print("Warning: No node found in FAS loop, graph might be empty or disconnected unexpectedly.")
break
return s1 + temp_s1 + temp_s2 + s2
def _reverse_cycles(layout_data: Dict, config: Dict) -> int:
"""Marks arcs to reverse (for DAG) without swapping endpoints."""
print("Step 1: Cycle Breaking...")
graph = OCPNGraph(layout_data)
potential_sources = [vid for vid, vdata in layout_data["vertices"].items() if vdata.get("is_source")]
potential_sinks = [vid for vid, vdata in layout_data["vertices"].items() if vdata.get("is_sink")]
sources = config.get("sources", potential_sources)
sinks = config.get("sinks", potential_sinks)
sources = [s for s in sources if s in layout_data["vertices"]]
sinks = [s for s in sinks if s in layout_data["vertices"]]
fas_order = _modified_greedy_fas(graph, sources, sinks)
fas_indices = {node_id: index for index, node_id in enumerate(fas_order)}
reversed_count = 0
for arc_id, arc in layout_data["arcs"].items():
src = arc['source']
tgt = arc['target']
src_index = fas_indices.get(src)
tgt_index = fas_indices.get(tgt)
if src_index is not None and tgt_index is not None:
if src_index > tgt_index:
# Do NOT swap. Just mark as reversed for layout purposes.
arc['reversed'] = True
reversed_count += 1
else:
arc['reversed'] = False
else:
arc['reversed'] = False
print(f" Marked {reversed_count} arcs as reversed for DAG.")
return reversed_count
def _assign_layers(layout_data: Dict) -> bool:
"""Assigns layers to vertices using LP to minimize edge length."""
print("Step 2: Layer Assignment (using LP)...")
vertices = layout_data["vertices"]
arcs = layout_data["arcs"]
node_list = list(vertices.keys())
node_to_index = {node_id: i for i, node_id in enumerate(node_list)}
num_vars = len(node_list)
# Objective: Minimize sum(weight * (layer[target] - layer[source]))
c = np.zeros(num_vars)
active_arcs = []
for arc_id, arc in arcs.items():
src_orig, tgt_orig = arc['source'], arc['target']
weight = arc.get('weight', 1)
# Use the 'reversed' flag to determine DAG direction
if arc['reversed']:
src, tgt = tgt_orig, src_orig
else:
src, tgt = src_orig, tgt_orig
if src in node_to_index and tgt in node_to_index:
src_idx = node_to_index[src]
tgt_idx = node_to_index[tgt]
c[tgt_idx] += weight
c[src_idx] -= weight
active_arcs.append({'source': src, 'target': tgt, 'id': arc_id})
num_constraints = len(active_arcs)
Aub = np.zeros((num_constraints, num_vars))
bub = np.full(num_constraints, -1.0)
for i, arc in enumerate(active_arcs):
src_idx = node_to_index[arc['source']]
tgt_idx = node_to_index[arc['target']]
Aub[i, src_idx] = 1.0
Aub[i, tgt_idx] = -1.0
num_positivity_constraints = num_vars
Aub_pos = np.zeros((num_positivity_constraints, num_vars))
bub_pos = np.zeros(num_positivity_constraints)
for i in range(num_vars):
Aub_pos[i, i] = -1.0
Aub_combined = np.vstack((Aub, Aub_pos))
bub_combined = np.concatenate((bub, bub_pos))
Aeq = None
beq = None
try:
print(f" Setting up LP with {num_vars} variables and {Aub_combined.shape[0]} constraints.")
sol = lp_solver.apply(c, Aub_combined, bub_combined, Aeq, beq,
parameters={"verbose": False}, variant=lp_solver.SCIPY)
if sol is None:
print(" LP Solver did not return a solution.")
return False
points = lp_solver.get_points_from_sol(sol, variant=lp_solver.SCIPY)
if points is None or len(points) != num_vars:
print(" LP Solver failed to extract points or returned incorrect number.")
return False
max_layer = 0
layering_dict = {}
for i, node_id in enumerate(node_list):
layer = int(round(points[i]))
vertices[node_id]['layer'] = layer
if layer not in layering_dict:
layering_dict[layer] = []
layering_dict[layer].append(node_id)
max_layer = max(max_layer, layer)
layout_data['layering'] = []
for i in range(max_layer + 1):
layout_data['layering'].append(layering_dict.get(i, []))
print(f" Assigned nodes to {max_layer + 1} layers.")
return True
except Exception as e:
print(f" Error during LP solving or layer assignment: {e}")
import traceback
traceback.print_exc()
return False
def _insert_dummy_vertices(layout_data: Dict):
"""Inserts dummy nodes for arcs spanning multiple layers."""
print("Step 3: Dummy Vertex Insertion...")
vertices = layout_data["vertices"]
arcs = layout_data["arcs"]
layering = layout_data["layering"]
arcs_to_add = {}
arcs_to_remove = []
dummy_count = 0
original_arc_ids = list(arcs.keys())
for arc_id in original_arc_ids:
arc = arcs[arc_id]
src_id_orig, tgt_id_orig = arc['source'], arc['target']
# Use DAG direction when computing span
if arc['reversed']:
src_id, tgt_id = tgt_id_orig, src_id_orig
else:
src_id, tgt_id = src_id_orig, tgt_id_orig
if src_id not in vertices or tgt_id not in vertices:
print(f"Warning: Skipping dummy insertion for arc {arc_id} - source or target not found.")
continue
src_node = vertices[src_id]
tgt_node = vertices[tgt_id]
src_layer = src_node.get('layer', -1)
tgt_layer = tgt_node.get('layer', -1)
if src_layer == -1 or tgt_layer == -1:
print(f"Warning: Skipping dummy insertion for arc {arc_id} ({src_id} -> {tgt_id}) - layers not assigned.")
continue
arc['minLayer'] = min(src_layer, tgt_layer)
arc['maxLayer'] = max(src_layer, tgt_layer)
slack = tgt_layer - src_layer
if slack > 1:
arcs_to_remove.append(arc_id)
dummies_in_path = []
ot = arc.get("objectType")
if vertices[src_id_orig]["type"] == PLACE_TYPE:
ot = vertices[src_id_orig]["objectType"]
elif vertices[tgt_id_orig]["type"] == PLACE_TYPE:
ot = vertices[tgt_id_orig]["objectType"]
src_pos = -1
tgt_pos = -1
if src_layer < len(layering) and src_id in layering[src_layer]:
src_pos = layering[src_layer].index(src_id)
if tgt_layer < len(layering) and tgt_id in layering[tgt_layer]:
tgt_pos = layering[tgt_layer].index(tgt_id)
median_pos = len(layering[src_layer + 1]) // 2
if src_pos != -1 and tgt_pos != -1:
median_pos = (src_pos + tgt_pos) // 2
last_node_id = src_id
for i in range(1, slack):
dummy_layer_idx = src_layer + i
dummy_id = generate_dummy_id()
dummy_count += 1
dummies_in_path.append(dummy_id)
dummy_node = {
"id": dummy_id,
"original_id": None,
"name": f"dummy_{dummy_count}",
"label": None,
"objectType": ot,
"type": DUMMY_TYPE,
"layer": dummy_layer_idx,
"pos": -1,
"x": None,
"y": None,
"dummy_of_arc": arc_id,
"root": dummy_id,
"align": dummy_id,
"sink": dummy_id,
"shift": float('inf'),
"upper_neighbor": last_node_id,
"lower_neighbor": None,
}
vertices[dummy_id] = dummy_node
if i > 1:
vertices[dummies_in_path[-2]]["lower_neighbor"] = dummy_id
if dummy_layer_idx < len(layering):
insert_pos = min(median_pos, len(layering[dummy_layer_idx]))
layering[dummy_layer_idx].insert(insert_pos, dummy_id)
else:
print(f"Warning: Dummy layer index {dummy_layer_idx} out of bounds.")
layering.append([dummy_id])
segment_arc_id = generate_arc_id(last_node_id, dummy_id)
arcs_to_add[segment_arc_id] = {
"id": segment_arc_id,
"source": last_node_id,
"target": dummy_id,
"objectType": ot,
"original": False,
"reversed": False,
"dummy_nodes": [],
"minLayer": dummy_layer_idx - 1,
"maxLayer": dummy_layer_idx,
"type1": False,
"weight": arc.get('weight', 1),
"variable": arc.get('variable', False),
"original_arc_id": arc_id
}
last_node_id = dummy_id
segment_arc_id = generate_arc_id(last_node_id, tgt_id)
arcs_to_add[segment_arc_id] = {
"id": segment_arc_id,
"source": last_node_id,
"target": tgt_id,
"objectType": ot,
"original": False,
"reversed": False,
"dummy_nodes": [],
"minLayer": tgt_layer - 1,
"maxLayer": tgt_layer,
"type1": False,
"weight": arc.get('weight', 1),
"variable": arc.get('variable', False),
"original_arc_id": arc_id
}
vertices[last_node_id]["lower_neighbor"] = tgt_id
if arc_id in layout_data["original_arcs"]:
layout_data["original_arcs"][arc_id]["dummy_nodes"] = dummies_in_path
arcs[arc_id]['dummy_nodes'] = dummies_in_path
for arc_id in arcs_to_remove:
del arcs[arc_id]
arcs.update(arcs_to_add)
print(f" Inserted {dummy_count} dummy vertices.")
# --- Vertex Ordering (Barycenter Heuristic) ---
def _compute_barycenter(ocpn_layout: Dict, vertex_id: str, layer_idx: int, fixed_layer_idx: int, config: Dict) -> float:
"""Computes the barycenter value for a vertex based on neighbors in the fixed layer."""
vertices = ocpn_layout["vertices"]
layering = ocpn_layout["layering"]
if fixed_layer_idx < 0 or fixed_layer_idx >= len(layering):
return 0.0
fixed_layer = layering[fixed_layer_idx]
fixed_layer_positions = {node_id: pos for pos, node_id in enumerate(fixed_layer)}
neighbors_in_fixed = []
arcs = ocpn_layout.get("arcs", {})
for arc_id, arc in arcs.items():
src_orig, tgt_orig = arc['source'], arc['target']
other_node = None
if src_orig == vertex_id and tgt_orig in fixed_layer_positions:
other_node = tgt_orig
elif tgt_orig == vertex_id and src_orig in fixed_layer_positions:
other_node = src_orig
if other_node:
other_node_layer = vertices.get(other_node, {}).get('layer', -1)
if other_node_layer == fixed_layer_idx:
neighbors_in_fixed.append(other_node)
if not neighbors_in_fixed:
return -1.0
barycenter_sum = 0.0
for neighbor in neighbors_in_fixed:
barycenter_sum += fixed_layer_positions[neighbor]
avg_barycenter = barycenter_sum / len(neighbors_in_fixed)
if vertices[vertex_id]["type"] == PLACE_TYPE and config.get("object_attraction", 0) > 0:
object_attraction = config.get("object_attraction", 0.5)
obj_attr_min = config.get("object_attraction_range_min", 1)
obj_attr_max = config.get("object_attraction_range_max", 1)
current_ot = vertices[vertex_id]["objectType"]
object_neighbor_positions = []
direction = 1 if fixed_layer_idx > layer_idx else -1
for i in range(obj_attr_min, obj_attr_max + 1):
target_layer_idx = layer_idx + direction * (2 * i)
if 0 <= target_layer_idx < len(layering):
target_layer = layering[target_layer_idx]
for neighbor_pos, node_id in enumerate(target_layer):
if vertices[node_id]["type"] == PLACE_TYPE and vertices[node_id]["objectType"] == current_ot:
object_neighbor_positions.append(neighbor_pos)
if object_neighbor_positions:
avg_object_pos = sum(object_neighbor_positions) / len(object_neighbor_positions)
return (1.0 - object_attraction) * avg_barycenter + object_attraction * avg_object_pos
else:
return avg_barycenter
elif vertices[vertex_id]["type"] == DUMMY_TYPE:
if neighbors_in_fixed:
return fixed_layer_positions[neighbors_in_fixed[0]]
else:
return -1.0
else:
return avg_barycenter
def _barycenter_ordering_pass(ocpn_layout: Dict, current_layering: List[List[str]], fixed_layer_idx: int,
sweep_down: bool, config: Dict) -> List[List[str]]:
new_layering = copy.deepcopy(current_layering)
layer_to_order_idx = fixed_layer_idx + 1 if sweep_down else fixed_layer_idx - 1
if layer_to_order_idx < 0 or layer_to_order_idx >= len(new_layering):
return new_layering
layer_to_order = new_layering[layer_to_order_idx]
if not layer_to_order:
return new_layering
barycenters = {}
for node_id in layer_to_order:
bc_val = _compute_barycenter(ocpn_layout, node_id, layer_to_order_idx, fixed_layer_idx, config)
barycenters[node_id] = bc_val
adjusted_barycenters = {}
last_valid_bc = 0.0
for i, node_id in enumerate(layer_to_order):
bc = barycenters[node_id]
if bc == -1.0:
adjusted_bc = last_valid_bc + i * 1e-6
adjusted_barycenters[node_id] = adjusted_bc
else:
adjusted_barycenters[node_id] = bc
last_valid_bc = bc
original_indices = {node_id: i for i, node_id in enumerate(layer_to_order)}
def sort_key(node_id):
return (adjusted_barycenters[node_id], original_indices[node_id])
ordered_layer = sorted(layer_to_order, key=sort_key)
new_layering[layer_to_order_idx] = ordered_layer
return new_layering
def _count_crossings(ocpn_layout: Dict, layering: List[List[str]]) -> int:
crossings = 0
vertices = ocpn_layout["vertices"]
arcs_data = ocpn_layout.get("arcs", {})
node_positions = {}
for r, layer in enumerate(layering):
for c, node_id in enumerate(layer):
node_positions[node_id] = (r, c)
for r in range(len(layering) - 1):
layer1 = layering[r]
layer2 = layering[r + 1]
arcs_between = []
for arc_id, arc in arcs_data.items():
src, tgt = arc['source'], arc['target']
pos_src = node_positions.get(src)
pos_tgt = node_positions.get(tgt)
if pos_src and pos_tgt:
if (pos_src[0] == r and pos_tgt[0] == r + 1):
arcs_between.append(
{'source': src, 'target': tgt, 'source_pos': pos_src[1], 'target_pos': pos_tgt[1]})
elif (pos_src[0] == r + 1 and pos_tgt[0] == r):
arcs_between.append(
{'source': tgt, 'target': src, 'source_pos': pos_tgt[1], 'target_pos': pos_src[1]})
for i in range(len(arcs_between)):
for j in range(i + 1, len(arcs_between)):
arc1 = arcs_between[i]
arc2 = arcs_between[j]
if (arc1['source_pos'] < arc2['source_pos'] and arc1['target_pos'] > arc2['target_pos']) or \
(arc1['source_pos'] > arc2['source_pos'] and arc1['target_pos'] < arc2['target_pos']):
crossings += 1
return crossings
def _order_vertices(layout_data: Dict, config: Dict):
"""Orders vertices within layers using the barycenter method."""
print("Step 4: Vertex Ordering (Barycenter Heuristic)...")
max_iterations = config.get("max_barycenter_iterations", 24)
current_layering = layout_data["layering"]
best_layering = copy.deepcopy(current_layering)
best_score = _count_crossings(layout_data, best_layering)
print(f" Initial crossing score: {best_score}")
no_improvement_counter = 0
iter_count = 0
while no_improvement_counter < max_iterations:
iter_count += 1
layering_before_sweep = copy.deepcopy(current_layering)
for i in range(len(current_layering) - 1):
current_layering = _barycenter_ordering_pass(layout_data, current_layering, fixed_layer_idx=i,
sweep_down=True, config=config)
for i in range(len(current_layering) - 1, 0, -1):
current_layering = _barycenter_ordering_pass(layout_data, current_layering, fixed_layer_idx=i,
sweep_down=False, config=config)
current_score = _count_crossings(layout_data, current_layering)
if current_score < best_score:
best_score = current_score
best_layering = copy.deepcopy(current_layering)
no_improvement_counter = 0
print(f" Iteration {iter_count}: Improved score to {best_score}")
else:
no_improvement_counter += 1
print(f" Iteration {iter_count}: Score {current_score} (no improvement)")
if current_layering == layering_before_sweep:
print(f" Converged after {iter_count} iterations.")
break
print(f" Final crossing score: {best_score} after {iter_count} iterations.")
layout_data["layering"] = best_layering
for r, layer in enumerate(best_layering):
for c, node_id in enumerate(layer):
layout_data["vertices"][node_id]['pos'] = c
# --- Vertex Positioning (Brandes & Köpf Heuristic Variant) ---
def _mark_type1_conflicts(layout_data: Dict):
vertices = layout_data["vertices"]
arcs = layout_data["arcs"]
layering = layout_data["layering"]
for arc_id, arc in arcs.items():
arc['type1'] = False
for i in range(len(layering) - 2):
layer_i = layering[i]
layer_i1 = layering[i + 1]
pos_i1 = {node_id: pos for pos, node_id in enumerate(layer_i1)}
pos_i = {node_id: pos for pos, node_id in enumerate(layer_i)}
inner_segments = []
for k, v_i1 in enumerate(layer_i1):
node_i1_data = vertices[v_i1]
if node_i1_data['type'] == DUMMY_TYPE:
upper_neighbor_id = node_i1_data.get("upper_neighbor")
if upper_neighbor_id and upper_neighbor_id in pos_i:
upper_neighbor_data = vertices[upper_neighbor_id]
if upper_neighbor_data['type'] == DUMMY_TYPE:
inner_segments.append((pos_i[upper_neighbor_id], k))
if not inner_segments:
continue
inner_segments.sort()
k0 = 0
current_inner_idx = 0
for l1, v_i1 in enumerate(layer_i1):
k1 = len(layer_i) - 1
is_lower_end_of_inner = False
while current_inner_idx < len(inner_segments) and inner_segments[current_inner_idx][1] <= l1:
k1 = inner_segments[current_inner_idx][0]
is_lower_end_of_inner = (inner_segments[current_inner_idx][1] == l1)
current_inner_idx += 1
if not is_lower_end_of_inner:
upper_neighbors_i = []
for arc_id, arc_data in arcs.items():
if arc_data['target'] == v_i1 and arc_data['source'] in pos_i:
upper_neighbors_i.append(arc_data['source'])
for u_i in upper_neighbors_i:
k = pos_i[u_i]
if k < k0 or k > k1:
connecting_arcs = get_arcs_between(u_i, v_i1, layout_data)
for conn_arc in connecting_arcs:
is_seg_inner = (vertices[u_i]['type'] == DUMMY_TYPE and vertices[v_i1]['type'] == DUMMY_TYPE)
if not is_seg_inner:
conn_arc['type1'] = True
if is_lower_end_of_inner:
k0 = k1
def _vertical_alignment(ocpn_layout: Dict, current_layering: List[List[str]], pos: Dict[str, int], sweep_down: bool) -> \
Tuple[Dict[str, str], Dict[str, str]]:
root = {vid: vid for vid in ocpn_layout["vertices"]}
align = {vid: vid for vid in ocpn_layout["vertices"]}
vertices = ocpn_layout["vertices"]
layer_indices = range(len(current_layering)) if sweep_down else range(len(current_layering) - 1, -1, -1)
for i in layer_indices:
fixed_layer_idx = i - 1 if sweep_down else i + 1
if fixed_layer_idx < 0 or fixed_layer_idx >= len(current_layering):
continue
layer_k = current_layering[i]
r = -1
for vk in layer_k:
neighbors = []
neighbor_dir = 'up' if sweep_down else 'down'
arcs_data = ocpn_layout.get("arcs", {})
fixed_layer_nodes = set(current_layering[fixed_layer_idx])
for arc_id, arc in arcs_data.items():
src, tgt = arc['source'], arc['target']
if sweep_down:
if tgt == vk and src in fixed_layer_nodes:
neighbors.append(src)
else:
if src == vk and tgt in fixed_layer_nodes:
neighbors.append(tgt)
if not neighbors:
continue
neighbors.sort(key=lambda n: pos[n])
m_low = math.floor((len(neighbors) - 1) / 2)
m_high = math.ceil((len(neighbors) - 1) / 2)
median_indices = list(set([m_low, m_high]))
for m_idx in median_indices:
median_neighbor = neighbors[m_idx]
if align[vk] == vk:
connecting_arcs = get_arcs_between(median_neighbor, vk, ocpn_layout)
is_marked = any(a.get('type1', False) for a in connecting_arcs)
if not is_marked and pos[median_neighbor] > r:
align[median_neighbor] = vk
root[vk] = root[median_neighbor]
align[vk] = root[vk]
r = pos[median_neighbor]
return root, align
def _place_block(ocpn_layout: Dict, layering: List[List[str]], v: str, x: Dict[str, Optional[float]],
pos: Dict[str, int], roots: Dict[str, str], sink: Dict[str, str], shift: Dict[str, float],
aligns: Dict[str, str], config: Dict):
if x.get(v) is None:
x[v] = 0.0
w = v
while True:
w_pos = pos[w]
w_layer_idx = -1
for idx, l in enumerate(layering):
if w in l:
w_layer_idx = idx
break
if w_pos > 0 and w_layer_idx != -1:
predecessor = layering[w_layer_idx][w_pos - 1]
u = roots[predecessor]
_place_block(ocpn_layout, layering, u, x, pos, roots, sink, shift, aligns, config)
if sink[v] == v:
sink[v] = sink[u]
place_dim = config.get("place_radius", 10) * 2
trans_h = config.get("transition_height", 20)
trans_w = config.get("transition_width", 50)
silent_w = config.get("silent_transition_width", 10)
max_trans_dim = max(trans_h, trans_w, silent_w)
delta = config.get("vertex_sep", 20) + max(place_dim, max_trans_dim)
if sink[v] != sink[u]:
current_shift = (x.get(v, 0.0) or 0.0) - (x.get(u, 0.0) or 0.0) - delta
shift[sink[u]] = min(shift.get(sink[u], float('inf')), current_shift)
else:
new_x = (x.get(u, 0.0) or 0.0) + delta
x[v] = max(x.get(v, 0.0) or 0.0, new_x)
w = aligns[w]
if w == v:
break
def _horizontal_compaction(ocpn_layout: Dict, layering: List[List[str]], roots: Dict[str, str], aligns: Dict[str, str],
pos: Dict[str, int], config: Dict) -> Tuple[Dict[str, Optional[float]], float]:
x: Dict[str, Optional[float]] = {vid: None for vid in ocpn_layout["vertices"]}
sink: Dict[str, str] = {vid: vid for vid in ocpn_layout["vertices"]}
shift: Dict[str, float] = {vid: float('inf') for vid in ocpn_layout["vertices"]}
for layer in layering:
for v in layer:
if roots[v] == v:
_place_block(ocpn_layout, layering, v, x, pos, roots, sink, shift, aligns, config)
abs_x: Dict[str, Optional[float]] = {}
max_coord = 0.0
min_coord = float('inf')
for layer in layering:
for v in layer:
root_v = roots[v]
if x.get(root_v) is not None:
current_x = x[root_v]
sink_root_v = sink[root_v]
if shift.get(sink_root_v, float('inf')) != float('inf'):
current_x += shift[sink_root_v]
abs_x[v] = current_x
max_coord = max(max_coord, current_x)
min_coord = min(min_coord, current_x)
else:
abs_x[v] = None
border = config.get("border_padding", 20)
shift_val = border - min_coord if min_coord != float('inf') else border
final_x = {}
final_max = 0.0
for v, val in abs_x.items():
if val is not None:
final_x[v] = val + shift_val
final_max = max(final_max, final_x[v])
else:
final_x[v] = None
final_max += border
return final_x, final_max
def _position_vertices(layout_data: Dict, config: Dict):
"""Assigns final X/Y coordinates using 4-pass heuristic."""
print("Step 5: Vertex Positioning...")
_mark_type1_conflicts(layout_data)
layouts = []
original_layering = copy.deepcopy(layout_data["layering"])
for vert_dir in [0, 1]: # 0: Down, 1: Up
for horz_dir in [0, 1]: # 0: Left->Right, 1: Right->Left
print(f" Running positioning pass (vert_dir={vert_dir}, horz_dir={horz_dir})...")
current_layering = copy.deepcopy(original_layering)
if vert_dir == 1:
current_layering.reverse()
if horz_dir == 1:
for layer in current_layering:
layer.reverse()
pos = {}
for r, layer in enumerate(current_layering):
for c, node_id in enumerate(layer):
pos[node_id] = c
roots, aligns = _vertical_alignment(layout_data, current_layering, pos, sweep_down=(vert_dir == 0))
coords, max_coord = _horizontal_compaction(layout_data, current_layering, roots, aligns, pos, config)
if horz_dir == 1:
final_coords = {}
for v, c in coords.items():
if c is not None:
final_coords[v] = max_coord - c
else:
final_coords[v] = None
layouts.append(final_coords)
else:
layouts.append(coords)
final_coords_x = {}
final_coords_y = {}
vertices = layout_data["vertices"]
layering = layout_data["layering"]
border_padding = config.get("border_padding", 20)
layer_sep = config.get("layer_sep", 50)
direction = config.get("rankdir", "TB")
for v_id in vertices.keys():
candidate_coords = []
for layout_pass in layouts:
coord = layout_pass.get(v_id)
if coord is not None:
candidate_coords.append(coord)
if candidate_coords:
candidate_coords.sort()
if len(candidate_coords) > 0:
median_idx1 = math.floor((len(candidate_coords) - 1) / 2)
median_idx2 = math.ceil((len(candidate_coords) - 1) / 2)
median_coord = (candidate_coords[median_idx1] + candidate_coords[median_idx2]) / 2
final_coords_x[v_id] = median_coord
else:
final_coords_x[v_id] = border_padding
layer_sizes = []
max_layer_dim = 0
for i, layer in enumerate(layering):
max_dim = 0
for node_id in layer:
node = vertices[node_id]
node_h, node_w = 0, 0
if node['type'] == PLACE_TYPE:
r = config.get("place_radius", 10)
node_h, node_w = r * 2, r * 2
elif node['type'] == TRANSITION_TYPE:
node_h = config.get("transition_height", 20)
if node['silent']:
node_w = config.get("silent_transition_width", 10)
else:
node_w = config.get("transition_width", 50)
elif node['type'] == DUMMY_TYPE:
node_h, node_w = 1, 1
if direction == "TB":
max_dim = max(max_dim, node_h)
else:
max_dim = max(max_dim, node_w)
layer_sizes.append({'layer': i, 'size': max_dim})
max_layer_dim = max(max_layer_dim, max_dim)
current_pos = border_padding
layer_centers = {}
for i in range(len(layering)):
layer_size = layer_sizes[i]['size'] if i < len(layer_sizes) else max_layer_dim
layer_center = current_pos + layer_size / 2.0
layer_centers[i] = layer_center
current_pos += layer_size + layer_sep
max_x, max_y = 0.0, 0.0
for v_id, node in vertices.items():
x_coord = final_coords_x.get(v_id, border_padding)
y_coord = layer_centers.get(node['layer'], border_padding)
if direction == "TB":
node['x'] = x_coord
node['y'] = y_coord
max_x = max(max_x, x_coord)
max_y = max(max_y, y_coord)
else:
node['x'] = y_coord
node['y'] = x_coord
max_x = max(max_x, y_coord)
max_y = max(max_y, x_coord)
layout_data["max_x"] = max_x + border_padding
layout_data["max_y"] = max_y + border_padding
print(f" Assigned final coordinates (max_x={max_x:.2f}, max_y={max_y:.2f}).")
# --- Graphviz Drawing ---
def _create_graphviz_digraph(ocpn: Dict, layout_data: Dict, parameters: Dict) -> Digraph:
"""Generates the Graphviz Digraph object from the layout with separate handling for direct edges and waypoint edges."""
print("Step 6: Generating Graphviz output...")
image_format = exec_utils.get_param_value(Parameters.FORMAT, parameters, "png")
bgcolor = exec_utils.get_param_value(Parameters.BGCOLOR, parameters, constants.DEFAULT_BGCOLOR)
rankdir = exec_utils.get_param_value(Parameters.RANKDIR, parameters, "TB")
engine = exec_utils.get_param_value(Parameters.ENGINE, parameters, "dot")
enable_graph_title = exec_utils.get_param_value(Parameters.ENABLE_GRAPH_TITLE, parameters, constants.DEFAULT_ENABLE_GRAPH_TITLES)
graph_title = exec_utils.get_param_value(Parameters.GRAPH_TITLE, parameters, "Object-Centric Petri Net (Sugiyama Layout)")
viz = Digraph(
"ocpn_sugiyama",
engine=engine,
graph_attr={
"bgcolor": bgcolor,
"rankdir": rankdir,
"splines": "line",
},
node_attr={'shape': 'box'},
)
if enable_graph_title:
viz.attr(label=f'<<FONT POINT-SIZE="20">{graph_title}</FONT>>', labelloc="t")
vertices = layout_data["vertices"]
def get_contrasting_color(hex_color: str) -> str:
hex_color = hex_color.lstrip('#')
rgb = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
luminance = 0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2]
return "white" if luminance < 128 else "black"
# Add Nodes
for node_id, node in vertices.items():
if node.get('x') is None or node.get('y') is None:
print(f"Warning: Node {node_id} has no coordinates, skipping.")
continue
pos_x = node['x']
pos_y = node['y']
pos_str = f"{pos_x:.2f},{pos_y:.2f}!"
attrs = {
"pos": pos_str,
"label": node['label'] if node['label'] else "",
"id": node_id
}
if node['type'] == PLACE_TYPE:
ot_color = ot_to_color(node['objectType'])
font_color = get_contrasting_color(ot_color)
radius_gv = parameters.get("place_radius", 10) / 72.0 * 1.5
if node.get("is_source"):
ot_label = f"{node['objectType']}"
attrs.update({
"shape": "ellipse",
"fillcolor": ot_color,
"style": "filled",
"fontcolor": font_color,
"width": f"{radius_gv * 2.5:.2f}",
"height": f"{radius_gv * 1.5:.2f}",
"fixedsize": "true",
"label": f"<<FONT POINT-SIZE='8'>{ot_label}</FONT>>",
"labelloc": "c",
})
if node.get("is_source") and node.get("is_sink"):
attrs["peripheries"] = "2"
else:
attrs.update({
"shape": "circle",
"fillcolor": ot_color,
"style": "filled",
"fontcolor": font_color,
"width": f"{radius_gv * 2:.2f}",
"height": f"{radius_gv * 2:.2f}",
"fixedsize": "true",
"label": "",
})
if node.get("is_sink"):
attrs["peripheries"] = "2"
elif node['type'] == TRANSITION_TYPE:
height_gv = parameters.get("transition_height", 20) / 72.0
if node.get('silent', False):
width_gv = parameters.get("silent_transition_width", 10) / 72.0
attrs.update({
"shape": "box",
"fillcolor": parameters.get("transition_fill_color", "black"),
"style": "filled",
"width": f"{width_gv:.2f}",
"height": f"{height_gv:.2f}",
"fixedsize": "true",
"label": "",
})
else:
base_width_gv = parameters.get("transition_width", 40) / 72.0
width_gv = base_width_gv * 1.5
font_size = parameters.get("font_size", 10) - 2
label = node['label']
max_label_width = 10
if label and len(label) > max_label_width:
wrapped_label = "<BR/>".join([label[i:i+max_label_width] for i in range(0, len(label), max_label_width)])
attrs["label"] = f"<{wrapped_label}>"
else:
attrs["label"] = label or ""
attrs.update({
"shape": "box",
"fillcolor": parameters.get("transition_fill_color", "white"),
"style": "filled",
"color": parameters.get("transition_color", "black"),
"width": f"{width_gv:.2f}",
"height": f"{height_gv:.2f}",
"fixedsize": "true",
"fontname": parameters.get("font_name", "Arial"),
"fontsize": str(font_size),
"fontcolor": parameters.get("transition_textcolor", "black"),
})
elif node['type'] == DUMMY_TYPE:
attrs.update({
"shape": "point",
"width": "0.01",
"height": "0.01",
"label": "",
"fixedsize": "true",
"style": "invis",
})
viz.node(node_id, **attrs)
# Add Edges
arcs = layout_data["arcs"]
original_arcs = layout_data["original_arcs"]
direct_edges = []
waypoint_edges = {}
for arc_id, arc in arcs.items():
src = arc['source']
tgt = arc['target']
if src not in vertices or tgt not in vertices or \
vertices[src].get('x') is None or vertices[tgt].get('x') is None:
continue
src_type = vertices[src]['type']
tgt_type = vertices[tgt]['type']
original_arc_id = arc.get('original_arc_id', arc_id)
original_arc = original_arcs.get(original_arc_id, arc)
if src_type != DUMMY_TYPE and tgt_type != DUMMY_TYPE and not original_arc.get('dummy_nodes'):
direct_edges.append(arc)
else:
if original_arc_id not in waypoint_edges:
ultimate_src = original_arc.get('source')
ultimate_tgt = original_arc.get('target')
dummy_nodes = original_arc.get('dummy_nodes', [])
waypoint_edges[original_arc_id] = {
'original_arc': original_arc,
'segments': [],
'ultimate_source': ultimate_src,
'ultimate_target': ultimate_tgt,
'dummy_nodes': dummy_nodes
}
waypoint_edges[original_arc_id]['segments'].append(arc)
# Draw direct edges: ALWAYS in original semantic direction (forward)
for arc in direct_edges:
src = arc['source']
tgt = arc['target']
ot = arc.get("objectType", layout_data["object_types"][0] if layout_data["object_types"] else "unknown")
ot_color = ot_to_color(ot)
edge_attrs = {
"color": ot_color,
"penwidth": str(parameters.get("arc_size", 1.0) * arc.get("weight", 1.0)),
"arrowsize": str(parameters.get("arrowhead_size", 0.7)),
"id": arc['id'],
"splines": "line",
"dir": "forward", # <<--- force forward to avoid inverted arrows
}
if arc.get('variable'):
edge_attrs["penwidth"] = str(float(edge_attrs["penwidth"]) * 1.5)
edge_attrs["style"] = "dashed"
viz.edge(src, tgt, **edge_attrs)
# Draw segmented (dummy) edges with arrow at the original target side
for orig_arc_id, wp_data in waypoint_edges.items():
orig_arc = wp_data['original_arc']
segments = wp_data['segments']
ultimate_src = wp_data['ultimate_source']
ultimate_tgt = wp_data['ultimate_target']
ot = orig_arc.get("objectType", layout_data["object_types"][0] if layout_data["object_types"] else "unknown")
ot_color = ot_to_color(ot)
base_edge_attrs = {
"color": ot_color,
"penwidth": str(parameters.get("arc_size", 1.0) * orig_arc.get("weight", 1.0)),
"arrowsize": str(parameters.get("arrowhead_size", 0.7)),
"splines": "polyline",
}
if orig_arc.get('variable'):
base_edge_attrs["penwidth"] = str(float(base_edge_attrs["penwidth"]) * 1.5)
base_edge_attrs["style"] = "dashed"
for segment in segments:
src = segment['source']
tgt = segment['target']
segment_attrs = base_edge_attrs.copy()
segment_attrs['id'] = segment['id']
# Put the arrow at the original target side:
if tgt == ultimate_tgt:
segment_attrs['arrowhead'] = 'normal'
segment_attrs['dir'] = 'forward'
elif src == ultimate_tgt:
segment_attrs['arrowhead'] = 'normal'
segment_attrs['dir'] = 'back'
else:
segment_attrs['arrowhead'] = 'none'
viz.edge(src, tgt, **segment_attrs)
viz.format = image_format.replace("html", "plain-ext")
return viz
# --- Main Apply Function ---
[docs]
class Parameters(Enum):
""" Parameters for the Sugiyama OCPN visualization """
FORMAT = "format"
BGCOLOR = "bgcolor"
RANKDIR = "rankdir" # TB or LR
ENGINE = "engine" # dot, neato, fdp
ENABLE_GRAPH_TITLE = "enable_graph_title"
GRAPH_TITLE = "graph_title"
# Sugiyama specific parameters (matching TS config where possible)
SOURCES = "sources" # List of node IDs to force as sources
SINKS = "sinks" # List of node IDs to force as sinks
VERTEX_SEP = "vertex_sep" # Minimum horizontal distance between nodes
LAYER_SEP = "layer_sep" # Minimum vertical distance between layers
EDGE_SEP = "edge_sep" # (Not directly used here, vertex_sep dominates)
BORDER_PADDING = "border_padding" # Padding around the graph
OBJECT_CENTRALITY = "object_centrality" # Dict {ot: centrality_value}
OBJECT_ATTRACTION = "object_attraction" # Weight (0-1) for place attraction in barycenter
OBJECT_ATTRACTION_RANGE_MIN = "object_attraction_range_min"
OBJECT_ATTRACTION_RANGE_MAX = "object_attraction_range_max"
MAX_BARYCENTER_ITERATIONS = "max_barycenter_iterations"
# Visual element sizes (in points, roughly 1/72 inch)
PLACE_RADIUS = "place_radius"
TRANSITION_WIDTH = "transition_width"
TRANSITION_HEIGHT = "transition_height"
SILENT_TRANSITION_WIDTH = "silent_transition_width"
ARC_SIZE = "arc_size"
ARROWHEAD_SIZE = "arrowhead_size"
INDICATE_ARC_WEIGHT = "indicate_arc_weight"
INDICATE_VARIABLE_ARCS = "indicate_variable_arcs"
VARIABLE_ARC_INDICATOR_COLOR = "variable_arc_indicator_color"
VARIABLE_ARC_INDICATOR_SIZE = "variable_arc_indicator_size"
# Colors
TRANSITION_COLOR = "transition_color"
TRANSITION_FILL_COLOR = "transition_fill_color"
TRANSITION_TEXT_COLOR = "transition_text_color"
DEFAULT_PLACE_COLOR = "default_place_color"
ARC_DEFAULT_COLOR = "arc_default_color"
TYPE_COLOR_MAPPING = "type_color_mapping"
# Font
FONT_NAME = "font_name"
FONT_SIZE = "font_size"
[docs]
def apply(ocpn: Dict[str, Any], parameters: Optional[Dict[Any, Any]] = None) -> Digraph:
"""
Obtains a visualization of the provided object-centric Petri net using
a Sugiyama-based layout algorithm.
Args:
ocpn (Dict[str, Any]): Object-centric Petri net structure.
parameters (Optional[Dict[Any, Any]], optional): Algorithm parameters.
Returns:
Digraph: A Graphviz digraph object representing the layout.
"""
if parameters is None:
parameters = {}
ocpn = deepcopy(ocpn)
for ot, (net, im, fm) in ocpn.get("petri_nets", {}).items():
for place in net.places:
place.name = ot + "@@" + place.name
# --- Set default configuration parameters ---
config = {
# Basic viz
Parameters.FORMAT.value: constants.DEFAULT_FORMAT_GVIZ_VIEW,
Parameters.BGCOLOR.value: constants.DEFAULT_BGCOLOR,
Parameters.RANKDIR.value: constants.DEFAULT_RANKDIR_GVIZ,
Parameters.ENGINE.value: "dot",
Parameters.ENABLE_GRAPH_TITLE.value: constants.DEFAULT_ENABLE_GRAPH_TITLES,
Parameters.GRAPH_TITLE.value: "Object-Centric Petri Net (Sugiyama Layout)",
# Sugiyama layout params
Parameters.SOURCES.value: [],
Parameters.SINKS.value: [],
Parameters.VERTEX_SEP.value: 30,
Parameters.LAYER_SEP.value: 50,
Parameters.BORDER_PADDING.value: 20,
Parameters.OBJECT_CENTRALITY.value: {},
Parameters.OBJECT_ATTRACTION.value: 0.2,
Parameters.OBJECT_ATTRACTION_RANGE_MIN.value: 1,
Parameters.OBJECT_ATTRACTION_RANGE_MAX.value: 2,
Parameters.MAX_BARYCENTER_ITERATIONS.value: 24,
# Visual sizes (points)
Parameters.PLACE_RADIUS.value: 10.0,
Parameters.TRANSITION_WIDTH.value: 40.0,
Parameters.TRANSITION_HEIGHT.value: 20.0,
Parameters.SILENT_TRANSITION_WIDTH.value: 8.0,
Parameters.ARC_SIZE.value: 1.0,
Parameters.ARROWHEAD_SIZE.value: 0.7,
Parameters.INDICATE_ARC_WEIGHT.value: False,
Parameters.INDICATE_VARIABLE_ARCS.value: True,
Parameters.VARIABLE_ARC_INDICATOR_COLOR.value: "#FF0000",
Parameters.VARIABLE_ARC_INDICATOR_SIZE.value: 1.5,
# Colors
Parameters.TRANSITION_COLOR.value: "black",
Parameters.TRANSITION_FILL_COLOR.value: "white",
Parameters.TRANSITION_TEXT_COLOR.value: "black",
Parameters.DEFAULT_PLACE_COLOR.value: "#D3D3D3",
Parameters.ARC_DEFAULT_COLOR.value: "black",
Parameters.TYPE_COLOR_MAPPING.value: {},
# Font
Parameters.FONT_NAME.value: "Arial",
Parameters.FONT_SIZE.value: 10,
}
config.update({param.value: val for param, val in parameters.items() if isinstance(param, Parameters)})
config.update({k: v for k, v in parameters.items() if isinstance(k, str) and k in [p.value for p in Parameters]})
layout_data = _build_initial_layout(ocpn, config)
if not layout_data["vertices"]:
print("Error: No vertices found in OCPN structure.")
return Digraph()
_reverse_cycles(layout_data, config)
if not _assign_layers(layout_data):
print("Error: Layer assignment failed. Cannot proceed.")
return Digraph()
_insert_dummy_vertices(layout_data)
_order_vertices(layout_data, config)
_position_vertices(layout_data, config)
gviz_graph = _create_graphviz_digraph(ocpn, layout_data, config)
return gviz_graph