import tempfile
from copy import copy
import sys
from graphviz import Digraph
from pm4py.util import constants
from typing import Dict, List, Tuple
from collections import defaultdict, deque
from pm4py.util.vis_utils import (
human_readable_stat,
get_arc_penwidth,
get_trans_freq_color,
value_to_color,
)
[docs]
def get_activities_color(activities_count):
"""
Get frequency color for attributes
Parameters
-----------
activities_count
Count of attributes in the log
Returns
-----------
activities_color
Color assigned to attributes in the graph
"""
activities_color = {}
min_value, max_value = get_min_max_value(activities_count)
for ac in activities_count:
v0 = activities_count[ac]
v1 = get_trans_freq_color(v0, min_value, max_value)
activities_color[ac] = v1
return activities_color
[docs]
def get_activities_color_serv_time(serv_time):
"""
Gets the color for the activities based on the service time
Parameters
----------------
serv_time
Service time
Returns
----------------
act_color
Dictionary associating each activity to a color based on the service time
"""
activities_color = {}
min_soj_time, max_soj_time = get_min_max_value(serv_time)
for ac in serv_time:
act_soj_time = serv_time[ac]
trans_base_color = int(
255
- 100
* (act_soj_time - min_soj_time)
/ (max_soj_time - min_soj_time + 0.00001)
)
trans_base_color_hex = str(hex(trans_base_color))[2:].upper()
activities_color[ac] = (
"#" + "FF" + trans_base_color_hex + trans_base_color_hex
)
return activities_color
[docs]
def get_min_max_value(dfg):
"""
Gets min and max value assigned to edges
in DFG graph
Parameters
-----------
dfg
Directly follows graph
Returns
-----------
min_value
Minimum value in directly follows graph
max_value
Maximum value in directly follows graph
"""
min_value = 9999999999
max_value = -1
for edge in dfg:
if dfg[edge] < min_value:
min_value = dfg[edge]
if dfg[edge] > max_value:
max_value = dfg[edge]
return min_value, max_value
[docs]
def assign_penwidth_edges(dfg):
"""
Assign penwidth to edges in directly-follows graph
Parameters
-----------
dfg
Direcly follows graph
Returns
-----------
penwidth
Graph penwidth that edges should have in the direcly follows graph
"""
penwidth = {}
min_value, max_value = get_min_max_value(dfg)
for edge in dfg:
v0 = dfg[edge]
v1 = get_arc_penwidth(v0, min_value, max_value)
penwidth[edge] = str(v1)
return penwidth
[docs]
def sort_dfg_reachability(
dfg: List[Tuple[str, str]],
start_activities_to_include: List[str],
end_activities_to_include: List[str],
) -> Tuple[List[str], List[Tuple[str, str]]]:
"""
Sort the edges of the directly-follows graph based on reachability principles
(start activities are putting at the beginning, end activities at the end)
Parameters
----------------
dfg
List of edges of the directly-follows graph (without frequency/performance annotation)
start_activities_to_include
Start activities
end_activities
End activities
Returns
----------------
sorted_activities
Activities sorted by reachability
sorted_edges
Edges sorted by reachability
"""
# identify all unique activities
activities_dfg = set(x[0] for x in dfg).union(set(x[1] for x in dfg))
# create adjacency lists and in-degree count
adjacency_list = defaultdict(list)
in_degree = defaultdict(int)
for u, v in dfg:
adjacency_list[u].append(v)
in_degree[v] += 1
if u not in in_degree:
in_degree[u] = 0
# initialize the queue with start activities
queue = deque(start_activities_to_include)
distance = {activity: 0 for activity in start_activities_to_include}
# ensure all activities are present in the distance dictionary
for activity in activities_dfg:
if activity not in distance:
distance[activity] = float("inf")
# perform BFS to calculate the distance of each activity from the start
# activities
while queue:
current = queue.popleft()
current_distance = distance[current]
for neighbor in adjacency_list[current]:
if distance[neighbor] > current_distance + 1:
distance[neighbor] = current_distance + 1
queue.append(neighbor)
# sort edges based on the distance of their source activities
def edge_priority(edge):
u, v = edge
if u in start_activities_to_include:
return (0, distance[u], distance[v], u, v)
if v in end_activities_to_include:
return (2, distance[u], distance[v], u, v)
return (1, distance[u], distance[v], u, v)
sorted_edges = sorted(dfg, key=edge_priority)
# Step 6: Sort activities based on their distance
sorted_activities = sorted(activities_dfg, key=lambda x: (distance[x], x))
return sorted_activities, sorted_edges
[docs]
def graphviz_visualization(
activities_count,
dfg,
image_format="png",
measure="frequency",
max_no_of_edges_in_diagram=100000,
start_activities=None,
end_activities=None,
serv_time=None,
font_size="12",
bgcolor=constants.DEFAULT_BGCOLOR,
rankdir=constants.DEFAULT_RANKDIR_GVIZ,
enable_graph_title: bool = constants.DEFAULT_ENABLE_GRAPH_TITLES,
graph_title: str = "Directly-Follows Graph",
):
"""
Do GraphViz visualization of a DFG graph
Parameters
-----------
activities_count
Count of attributes in the log (may include attributes that are not in the DFG graph)
dfg
DFG graph
image_format
GraphViz should be represented in this format
measure
Describes which measure is assigned to edges in direcly follows graph (frequency/performance)
max_no_of_edges_in_diagram
Maximum number of edges in the diagram allowed for visualization
start_activities
Start activities of the log
end_activities
End activities of the log
serv_time
For each activity, the service time in the log
font_size
Size of the text on the activities/edges
bgcolor
Background color of the visualization (i.e., 'transparent', 'white', ...)
rankdir
Direction of the graph ("LR" for left-to-right; "TB" for top-to-bottom)
enable_graph_title
Enables the visualization of a graph's title
graph_title
Graph title to display (if enable_graph_title)
Returns
-----------
viz
Digraph object
"""
if start_activities is None:
start_activities = []
if end_activities is None:
end_activities = []
filename = tempfile.NamedTemporaryFile(suffix=".gv")
filename.close()
viz = Digraph(
"",
filename=filename.name,
engine="dot",
graph_attr={"bgcolor": bgcolor, "rankdir": rankdir},
)
if enable_graph_title:
viz.attr(
label='<<FONT POINT-SIZE="'
+ str(2 * int(font_size))
+ '">'
+ graph_title
+ "</FONT>>",
labelloc="top",
)
# first, remove edges in diagram that exceeds the maximum number of edges
# in the diagram
dfg_key_value_list = []
for edge in dfg:
dfg_key_value_list.append([edge, dfg[edge]])
# more fine grained sorting to avoid that edges that are below the threshold are
# undeterministically removed
dfg_key_value_list = sorted(
dfg_key_value_list,
key=lambda x: (x[1], x[0][0], x[0][1]),
reverse=True,
)
dfg_key_value_list = dfg_key_value_list[
0: min(len(dfg_key_value_list), max_no_of_edges_in_diagram)
]
dfg_allowed_keys = [x[0] for x in dfg_key_value_list]
dfg_keys = list(dfg.keys())
for edge in dfg_keys:
if edge not in dfg_allowed_keys:
del dfg[edge]
activities_count_int = copy(activities_count)
activities_in_dfg = set(activities_count)
# assign attributes color
if measure == "frequency":
activities_color = get_activities_color(activities_count_int)
else:
activities_color = get_activities_color_serv_time(serv_time)
# represent nodes
viz.attr("node", shape="box")
if len(activities_in_dfg) == 0:
activities_to_include = sorted(list(set(activities_count_int)))
else:
# take unique elements as a list not as a set (in this way, nodes are
# added in the same order to the graph)
activities_to_include = sorted(list(set(activities_in_dfg)))
start_activities_to_include = [
act for act in start_activities if act in activities_to_include
]
end_activities_to_include = [
act for act in end_activities if act in activities_to_include
]
# calculate edges penwidth
ext_dfg = copy(dfg)
if start_activities_to_include is not None and start_activities_to_include:
for sact in start_activities_to_include:
ext_dfg[(constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, sact)] = (
start_activities[sact]
)
if end_activities_to_include is not None and end_activities_to_include:
for eact in end_activities_to_include:
ext_dfg[(eact, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)] = (
end_activities[eact]
)
dfg_values = dfg.values()
min_dfg_value = min(dfg_values)
max_dfg_value = max(dfg_values)
penwidth = assign_penwidth_edges(ext_dfg)
dfg_edges = sorted(list(dfg.keys()))
if start_activities_to_include and end_activities_to_include:
activities_to_include, dfg_edges = sort_dfg_reachability(
dfg_edges, start_activities_to_include, end_activities_to_include
)
activities_map = {}
for act in activities_to_include:
if "frequency" in measure and act in activities_count_int:
viz.node(
str(hash(act)),
act + " (" + str(activities_count_int[act]) + ")",
style="filled",
fillcolor=activities_color[act],
fontsize=font_size,
)
activities_map[act] = str(hash(act))
elif (
"performance" in measure
and act in serv_time
and serv_time[act] >= 0
):
viz.node(
str(hash(act)),
act + " (" + human_readable_stat(serv_time[act]) + ")",
fontsize=font_size,
style="filled",
fillcolor=activities_color[act],
)
activities_map[act] = str(hash(act))
else:
viz.node(str(hash(act)), act, fontsize=font_size)
activities_map[act] = str(hash(act))
# represent edges
for edge in dfg_edges:
if "frequency" in measure or "cost" in measure:
label = str(dfg[edge])
else:
label = human_readable_stat(dfg[edge])
color = None
if "performance" in measure:
color = value_to_color(dfg[edge], min_dfg_value, max_dfg_value)
viz.edge(
str(hash(edge[0])),
str(hash(edge[1])),
label=label,
penwidth=str(penwidth[edge]),
fontsize=font_size,
color=color,
)
if start_activities_to_include:
viz.node("@@startnode", "<●>", shape="circle", fontsize="34")
for act in start_activities_to_include:
label = (
str(start_activities[act])
if isinstance(start_activities, dict)
and measure == "frequency"
else ""
)
viz.edge(
"@@startnode",
activities_map[act],
label=label,
fontsize=font_size,
penwidth=str(
penwidth[
(constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, act)
]
),
)
if end_activities_to_include:
# <■>
viz.node("@@endnode", "<■>", shape="doublecircle", fontsize="32")
for act in end_activities_to_include:
label = (
str(end_activities[act])
if isinstance(end_activities, dict) and measure == "frequency"
else ""
)
viz.edge(
activities_map[act],
"@@endnode",
label=label,
fontsize=font_size,
penwidth=str(
penwidth[(act, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)]
),
)
viz.attr(overlap="false")
viz.attr(fontsize="11")
viz.format = image_format.replace("html", "plain-ext")
return viz