'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import uuid
from collections import Counter
from copy import deepcopy
from enum import Enum
from pm4py.objects.bpmn.obj import BPMN
from pm4py.objects.bpmn.util.sorting import get_sorted_nodes_edges
from pm4py.util import exec_utils
import tempfile
[docs]
class EndpointDirection(Enum):
RIGHT = "right"
LEFT = "left"
TOP = "top"
BOTTOM = "bottom"
[docs]
class Parameters(Enum):
TASK_WH = "task_wh"
SCREEN_SIZE_X = "screen_size_x"
SCREEN_SIZE_Y = "screen_size_y"
SCALING_FACTOR = "scaling_factor"
[docs]
def get_right_edge_coord(layout, node, p, partial_counter, total_counter):
y_factor = partial_counter[EndpointDirection.RIGHT] / (
total_counter[EndpointDirection.RIGHT] + 1.0
)
new_x = p[0] + layout.get(node).get_width()
new_y = p[1] + round(layout.get(node).get_height() * y_factor)
return (new_x, new_y)
[docs]
def get_left_edge_coord(layout, node, p, partial_counter, total_counter):
y_factor = partial_counter[EndpointDirection.LEFT] / (
total_counter[EndpointDirection.LEFT] + 1.0
)
new_x = p[0]
new_y = p[1] + round(layout.get(node).get_height() * y_factor)
return (new_x, new_y)
[docs]
def get_top_edge_coord(layout, node, p, partial_counter, total_counter):
x_factor = partial_counter[EndpointDirection.TOP] / (
total_counter[EndpointDirection.TOP] + 1.0
)
new_x = p[0] + round(layout.get(node).get_width() * x_factor)
new_y = p[1]
return (new_x, new_y)
[docs]
def get_bottom_edge_coord(layout, node, p, partial_counter, total_counter):
x_factor = partial_counter[EndpointDirection.BOTTOM] / (
total_counter[EndpointDirection.BOTTOM] + 1.0
)
new_x = p[0] + round(layout.get(node).get_width() * x_factor)
new_y = p[1] + layout.get(node).get_height()
return (new_x, new_y)
[docs]
def apply(bpmn_graph, parameters=None):
"""
Layouts a BPMN graph (inserting the positions of the nodes and the layouting of the edges)
Parameters
-------------
bpmn_graph
BPMN graph
parameters
Parameters of the algorithm:
- Parameters.SCREEN_SIZE_X => target size of the screen in pixels (default 1920.0)
- Parameters.SCREEN_SIZE_Y => target size of the screen in pixels (default 1080.0)
- Parameters.SCALING_FACTOR => scaling factor of the layouting (defualt 2.5)
Returns
-------------
bpmn_graph
BPMN graph with layout information
"""
from graphviz import Digraph
from pm4py.visualization.common import save as gsave
from pm4py.visualization.common import svg_pos_parser
if parameters is None:
parameters = {}
screen_size_x = exec_utils.get_param_value(
Parameters.SCREEN_SIZE_X, parameters, 1920.0
)
screen_size_y = exec_utils.get_param_value(
Parameters.SCREEN_SIZE_Y, parameters, 1080.0
)
scaling_factor = exec_utils.get_param_value(
Parameters.SCALING_FACTOR, parameters, 2.5
)
nodes = bpmn_graph.get_nodes()
flows = bpmn_graph.get_flows()
layout = bpmn_graph.get_layout()
filename_gv = tempfile.NamedTemporaryFile(suffix=".gv")
filename_gv.close()
filename_svg = tempfile.NamedTemporaryFile(suffix=".svg")
filename_svg.close()
viz = Digraph(
bpmn_graph.get_name(), filename=filename_gv.name, engine="dot"
)
viz.format = "svg"
viz.graph_attr["rankdir"] = "LR"
graph_nodes, graph_edges = get_sorted_nodes_edges(bpmn_graph)
nodes_dict = {}
inv_nodes_dict = {}
for n in graph_nodes:
node_uuid = str(uuid.uuid4()).replace("-", "")
nodes_dict[n] = node_uuid
inv_nodes_dict[node_uuid] = n
viz.node(node_uuid, label=" ", shape="box")
for tup in graph_edges:
viz.edge(nodes_dict[tup[0]], nodes_dict[tup[1]])
gsave.save(viz, filename_svg.name)
nodes_p, edges_p = svg_pos_parser.apply(filename_svg.name)
nodes_pos = {}
for node in nodes_p:
nodes_pos[inv_nodes_dict[node]] = nodes_p[node]["polygon"]
endpoints_wh = exec_utils.get_param_value(
Parameters.TASK_WH, parameters, 30
)
task_wh = exec_utils.get_param_value(Parameters.TASK_WH, parameters, 60)
# add node positions to BPMN nodes
for n in graph_nodes:
node_pos = nodes_pos[n][0]
pos_x = float(node_pos[0])
pos_y = float(node_pos[1])
layout.get(n).set_x(pos_x)
layout.get(n).set_y(pos_y)
if isinstance(n, BPMN.Task):
this_width = min(
round(2 * task_wh),
round(2 * (len(n.get_name()) + 7) * task_wh / 22.0),
)
layout.get(n).set_width(this_width)
layout.get(n).set_height(task_wh)
elif isinstance(n, BPMN.StartEvent) or isinstance(n, BPMN.EndEvent):
layout.get(n).set_width(endpoints_wh)
layout.get(n).set_height(endpoints_wh)
else:
layout.get(n).set_width(task_wh)
layout.get(n).set_height(task_wh)
max_x = max(1, max(abs(layout.get(node).get_x()) for node in nodes))
max_y = max(1, max(abs(layout.get(node).get_y()) for node in nodes))
different_x = len(set(layout.get(node).get_x() for node in nodes))
different_y = len(set(layout.get(node).get_y() for node in nodes))
stretch_fact_x = scaling_factor * 1.25 * screen_size_x / max_x
stretch_fact_y = scaling_factor * screen_size_y / max_y
for node in nodes:
layout.get(node).set_x(
round(layout.get(node).get_x() * stretch_fact_x)
)
layout.get(node).set_y(
round(layout.get(node).get_y() * stretch_fact_y)
)
min_x = min(layout.get(node).get_x() for node in nodes)
min_y = min(layout.get(node).get_y() for node in nodes)
for node in nodes:
x_coord = layout.get(node).get_x() - min_x
y_coord = layout.get(node).get_y() - min_y
layout.get(node).set_x(x_coord)
layout.get(node).set_y(y_coord)
outgoing_edges = dict()
ingoing_edges = dict()
sources_dict = dict()
targets_dict = dict()
for flow in flows:
source = flow.get_source()
target = flow.get_target()
x_src = layout.get(source).get_x()
x_trg = layout.get(target).get_x()
y_src = layout.get(source).get_y()
y_trg = layout.get(target).get_y()
sources_dict[(x_src, y_src)] = source
targets_dict[(x_trg, y_trg)] = target
diff_x = abs(x_trg - x_src)
diff_y = abs(y_src - y_trg)
if not (x_src, y_src) in outgoing_edges:
outgoing_edges[(x_src, y_src)] = {}
outgoing_edges[(x_src, y_src)][(x_trg, y_trg)] = {
EndpointDirection.RIGHT: 0.0,
EndpointDirection.LEFT: 0.0,
EndpointDirection.TOP: 0.0,
EndpointDirection.BOTTOM: 0.0,
}
if not (x_trg, y_trg) in ingoing_edges:
ingoing_edges[(x_trg, y_trg)] = {}
ingoing_edges[(x_trg, y_trg)][(x_src, y_src)] = {
EndpointDirection.RIGHT: 0.0,
EndpointDirection.LEFT: 0.0,
EndpointDirection.TOP: 0.0,
EndpointDirection.BOTTOM: 0.0,
}
if x_trg > x_src:
outgoing_edges[(x_src, y_src)][(x_trg, y_trg)][
EndpointDirection.RIGHT
] = diff_x / (diff_x + diff_y)
ingoing_edges[(x_trg, y_trg)][(x_src, y_src)][
EndpointDirection.LEFT
] = diff_x / (diff_x + diff_y)
else:
outgoing_edges[(x_src, y_src)][(x_trg, y_trg)][
EndpointDirection.LEFT
] = diff_x / (diff_x + diff_y)
ingoing_edges[(x_trg, y_trg)][(x_src, y_src)][
EndpointDirection.RIGHT
] = diff_x / (diff_x + diff_y)
if y_src > y_trg:
outgoing_edges[(x_src, y_src)][(x_trg, y_trg)][
EndpointDirection.TOP
] = diff_y / (diff_x + diff_y)
ingoing_edges[(x_trg, y_trg)][(x_src, y_src)][
EndpointDirection.BOTTOM
] = diff_y / (diff_x + diff_y)
else:
outgoing_edges[(x_src, y_src)][(x_trg, y_trg)][
EndpointDirection.BOTTOM
] = diff_y / (diff_x + diff_y)
ingoing_edges[(x_trg, y_trg)][(x_src, y_src)][
EndpointDirection.TOP
] = diff_y / (diff_x + diff_y)
# normalization
outgoing_edges0 = deepcopy(outgoing_edges)
ingoing_edges0 = deepcopy(ingoing_edges)
for p1 in outgoing_edges:
sum_right = 0.0
sum_left = 0.0
sum_top = 0.0
sum_bottom = 0.0
for p2 in outgoing_edges[p1]:
sum_right += outgoing_edges0[p1][p2][EndpointDirection.RIGHT]
sum_left += outgoing_edges0[p1][p2][EndpointDirection.LEFT]
sum_top += outgoing_edges0[p1][p2][EndpointDirection.TOP]
sum_bottom += outgoing_edges0[p1][p2][EndpointDirection.BOTTOM]
if p1 in ingoing_edges:
for p2 in ingoing_edges[p1]:
sum_right += ingoing_edges0[p1][p2][EndpointDirection.RIGHT]
sum_left += ingoing_edges0[p1][p2][EndpointDirection.LEFT]
sum_top += ingoing_edges0[p1][p2][EndpointDirection.TOP]
sum_bottom += ingoing_edges0[p1][p2][EndpointDirection.BOTTOM]
for p2 in outgoing_edges[p1]:
if sum_right > 0:
outgoing_edges[p1][p2][EndpointDirection.RIGHT] = (
outgoing_edges[p1][p2][EndpointDirection.RIGHT] ** 2
/ sum_right
)
if sum_left > 0:
outgoing_edges[p1][p2][EndpointDirection.LEFT] = (
outgoing_edges[p1][p2][EndpointDirection.LEFT] ** 2
/ sum_left
)
if sum_top > 0:
outgoing_edges[p1][p2][EndpointDirection.TOP] = (
outgoing_edges[p1][p2][EndpointDirection.TOP] ** 2
/ sum_top
)
if sum_bottom > 0:
outgoing_edges[p1][p2][EndpointDirection.BOTTOM] = (
outgoing_edges[p1][p2][EndpointDirection.BOTTOM] ** 2
/ sum_bottom
)
for p1 in ingoing_edges:
sum_right = 0.0
sum_left = 0.0
sum_top = 0.0
sum_bottom = 0.0
for p2 in ingoing_edges[p1]:
sum_right += ingoing_edges0[p1][p2][EndpointDirection.RIGHT]
sum_left += ingoing_edges0[p1][p2][EndpointDirection.LEFT]
sum_top += ingoing_edges0[p1][p2][EndpointDirection.TOP]
sum_bottom += ingoing_edges0[p1][p2][EndpointDirection.BOTTOM]
if p1 in outgoing_edges:
for p2 in outgoing_edges[p1]:
sum_right += outgoing_edges0[p1][p2][EndpointDirection.RIGHT]
sum_left += outgoing_edges0[p1][p2][EndpointDirection.LEFT]
sum_top += outgoing_edges0[p1][p2][EndpointDirection.TOP]
sum_bottom += outgoing_edges0[p1][p2][EndpointDirection.BOTTOM]
for p2 in ingoing_edges[p1]:
if sum_right > 0:
ingoing_edges[p1][p2][EndpointDirection.RIGHT] = (
ingoing_edges[p1][p2][EndpointDirection.RIGHT] ** 2
/ sum_right
)
if sum_left > 0:
ingoing_edges[p1][p2][EndpointDirection.LEFT] = (
ingoing_edges[p1][p2][EndpointDirection.LEFT] ** 2
/ sum_left
)
if sum_top > 0:
ingoing_edges[p1][p2][EndpointDirection.TOP] = (
ingoing_edges[p1][p2][EndpointDirection.TOP] ** 2 / sum_top
)
if sum_bottom > 0:
ingoing_edges[p1][p2][EndpointDirection.BOTTOM] = (
ingoing_edges[p1][p2][EndpointDirection.BOTTOM] ** 2
/ sum_bottom
)
# keep best direction
for p1 in outgoing_edges:
for p2 in outgoing_edges[p1]:
vals = sorted(
[(x, y) for x, y in outgoing_edges[p1][p2].items()],
key=lambda x: x[1],
reverse=True,
)
outgoing_edges[p1][p2] = vals[0][0]
for p1 in ingoing_edges:
for p2 in ingoing_edges[p1]:
vals = sorted(
[(x, y) for x, y in ingoing_edges[p1][p2].items()],
key=lambda x: x[1],
reverse=True,
)
ingoing_edges[p1][p2] = vals[0][0]
total_counter = dict()
partial_counter = dict()
for p1 in outgoing_edges:
if p1 not in total_counter:
total_counter[p1] = Counter()
for p2 in outgoing_edges[p1]:
dir = outgoing_edges[p1][p2]
total_counter[p1][dir] += 1
for p1 in ingoing_edges:
if p1 not in total_counter:
total_counter[p1] = Counter()
for p2 in ingoing_edges[p1]:
dir = ingoing_edges[p1][p2]
total_counter[p1][dir] += 1
outgoing_edges_dirs = deepcopy(outgoing_edges)
ingoing_edges_dirs = deepcopy(ingoing_edges)
# decide exiting/entering point for edges
for p1 in outgoing_edges:
node = sources_dict[p1]
if p1 not in partial_counter:
partial_counter[p1] = Counter()
sorted_outgoing_edges = sorted(
outgoing_edges[p1], key=lambda x: x, reverse=False
)
for p2 in sorted_outgoing_edges:
dir = outgoing_edges[p1][p2]
partial_counter[p1][dir] += 1
if dir == EndpointDirection.RIGHT:
outgoing_edges[p1][p2] = get_right_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.LEFT:
outgoing_edges[p1][p2] = get_left_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.TOP:
outgoing_edges[p1][p2] = get_top_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.BOTTOM:
outgoing_edges[p1][p2] = get_bottom_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
for p1 in ingoing_edges:
node = targets_dict[p1]
if p1 not in partial_counter:
partial_counter[p1] = Counter()
sorted_ingoing_edges = sorted(
ingoing_edges[p1], key=lambda x: x, reverse=False
)
for p2 in sorted_ingoing_edges:
dir = ingoing_edges[p1][p2]
partial_counter[p1][dir] += 1
if dir == EndpointDirection.RIGHT:
ingoing_edges[p1][p2] = get_right_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.LEFT:
ingoing_edges[p1][p2] = get_left_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.TOP:
ingoing_edges[p1][p2] = get_top_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
elif dir == EndpointDirection.BOTTOM:
ingoing_edges[p1][p2] = get_bottom_edge_coord(
layout, node, p1, partial_counter[p1], total_counter[p1]
)
# order the left-entering ingoing edges better
for p1 in ingoing_edges:
vals = [(x, y) for x, y in ingoing_edges[p1].items() if y[0] == p1[0]]
if len(vals) > 1:
vals_x = [x[0] for x in vals]
vals_y = [x[1] for x in vals]
vals_x = sorted(vals_x)
vals_y = sorted(vals_y)
for i in range(len(vals_x)):
ingoing_edges[p1][vals_x[i]] = vals_y[i]
# set waypoints for edges
for flow in flows:
source = flow.get_source()
target = flow.get_target()
flow.del_waypoints()
x_src = layout.get(source).get_x()
x_trg = layout.get(target).get_x()
y_src = layout.get(source).get_y()
y_trg = layout.get(target).get_y()
p1 = (x_src, y_src)
p2 = (x_trg, y_trg)
source_x = outgoing_edges[p1][p2][0]
source_y = outgoing_edges[p1][p2][1]
target_x = ingoing_edges[p2][p1][0]
target_y = ingoing_edges[p2][p1][1]
dir_source = outgoing_edges_dirs[p1][p2]
dir_target = ingoing_edges_dirs[p2][p1]
middle_x = (source_x + target_x) / 2.0
middle_y = (source_y + target_y) / 2.0
flow.add_waypoint((source_x, source_y))
if dir_source in [EndpointDirection.LEFT, EndpointDirection.RIGHT]:
if dir_target in [EndpointDirection.LEFT, EndpointDirection.RIGHT]:
flow.add_waypoint((middle_x, source_y))
flow.add_waypoint((middle_x, target_y))
elif dir_target in [
EndpointDirection.TOP,
EndpointDirection.BOTTOM,
]:
flow.add_waypoint((target_x, source_y))
elif dir_source in [EndpointDirection.TOP, EndpointDirection.BOTTOM]:
if dir_target in [EndpointDirection.TOP, EndpointDirection.BOTTOM]:
flow.add_waypoint((source_x, middle_y))
flow.add_waypoint((target_x, middle_y))
elif dir_target in [
EndpointDirection.LEFT,
EndpointDirection.RIGHT,
]:
flow.add_waypoint((source_x, target_y))
flow.add_waypoint((target_x, target_y))
return bpmn_graph