Source code for pm4py.visualization.dfg.util.dfg_gviz

import tempfile
from copy import copy
import sys

from graphviz import Digraph
from pm4py.util import constants
from typing import Dict, List, Tuple
from collections import defaultdict, deque
from pm4py.util.vis_utils import (
    human_readable_stat,
    get_arc_penwidth,
    get_trans_freq_color,
    value_to_color,
)


[docs] def get_activities_color(activities_count): """ Get frequency color for attributes Parameters ----------- activities_count Count of attributes in the log Returns ----------- activities_color Color assigned to attributes in the graph """ activities_color = {} min_value, max_value = get_min_max_value(activities_count) for ac in activities_count: v0 = activities_count[ac] v1 = get_trans_freq_color(v0, min_value, max_value) activities_color[ac] = v1 return activities_color
[docs] def get_activities_color_serv_time(serv_time): """ Gets the color for the activities based on the service time Parameters ---------------- serv_time Service time Returns ---------------- act_color Dictionary associating each activity to a color based on the service time """ activities_color = {} min_soj_time, max_soj_time = get_min_max_value(serv_time) for ac in serv_time: act_soj_time = serv_time[ac] trans_base_color = int( 255 - 100 * (act_soj_time - min_soj_time) / (max_soj_time - min_soj_time + 0.00001) ) trans_base_color_hex = str(hex(trans_base_color))[2:].upper() activities_color[ac] = ( "#" + "FF" + trans_base_color_hex + trans_base_color_hex ) return activities_color
[docs] def get_min_max_value(dfg): """ Gets min and max value assigned to edges in DFG graph Parameters ----------- dfg Directly follows graph Returns ----------- min_value Minimum value in directly follows graph max_value Maximum value in directly follows graph """ min_value = 9999999999 max_value = -1 for edge in dfg: if dfg[edge] < min_value: min_value = dfg[edge] if dfg[edge] > max_value: max_value = dfg[edge] return min_value, max_value
[docs] def assign_penwidth_edges(dfg): """ Assign penwidth to edges in directly-follows graph Parameters ----------- dfg Direcly follows graph Returns ----------- penwidth Graph penwidth that edges should have in the direcly follows graph """ penwidth = {} min_value, max_value = get_min_max_value(dfg) for edge in dfg: v0 = dfg[edge] v1 = get_arc_penwidth(v0, min_value, max_value) penwidth[edge] = str(v1) return penwidth
[docs] def sort_dfg_reachability( dfg: List[Tuple[str, str]], start_activities_to_include: List[str], end_activities_to_include: List[str], ) -> Tuple[List[str], List[Tuple[str, str]]]: """ Sort the edges of the directly-follows graph based on reachability principles (start activities are putting at the beginning, end activities at the end) Parameters ---------------- dfg List of edges of the directly-follows graph (without frequency/performance annotation) start_activities_to_include Start activities end_activities End activities Returns ---------------- sorted_activities Activities sorted by reachability sorted_edges Edges sorted by reachability """ # identify all unique activities activities_dfg = set(x[0] for x in dfg).union(set(x[1] for x in dfg)) # create adjacency lists and in-degree count adjacency_list = defaultdict(list) in_degree = defaultdict(int) for u, v in dfg: adjacency_list[u].append(v) in_degree[v] += 1 if u not in in_degree: in_degree[u] = 0 # initialize the queue with start activities queue = deque(start_activities_to_include) distance = {activity: 0 for activity in start_activities_to_include} # ensure all activities are present in the distance dictionary for activity in activities_dfg: if activity not in distance: distance[activity] = float("inf") # perform BFS to calculate the distance of each activity from the start # activities while queue: current = queue.popleft() current_distance = distance[current] for neighbor in adjacency_list[current]: if distance[neighbor] > current_distance + 1: distance[neighbor] = current_distance + 1 queue.append(neighbor) # sort edges based on the distance of their source activities def edge_priority(edge): u, v = edge if u in start_activities_to_include: return (0, distance[u], distance[v], u, v) if v in end_activities_to_include: return (2, distance[u], distance[v], u, v) return (1, distance[u], distance[v], u, v) sorted_edges = sorted(dfg, key=edge_priority) # Step 6: Sort activities based on their distance sorted_activities = sorted(activities_dfg, key=lambda x: (distance[x], x)) return sorted_activities, sorted_edges
[docs] def graphviz_visualization( activities_count, dfg, image_format="png", measure="frequency", max_no_of_edges_in_diagram=100000, start_activities=None, end_activities=None, serv_time=None, font_size="12", bgcolor=constants.DEFAULT_BGCOLOR, rankdir=constants.DEFAULT_RANKDIR_GVIZ, enable_graph_title: bool = constants.DEFAULT_ENABLE_GRAPH_TITLES, graph_title: str = "Directly-Follows Graph", ): """ Do GraphViz visualization of a DFG graph Parameters ----------- activities_count Count of attributes in the log (may include attributes that are not in the DFG graph) dfg DFG graph image_format GraphViz should be represented in this format measure Describes which measure is assigned to edges in direcly follows graph (frequency/performance) max_no_of_edges_in_diagram Maximum number of edges in the diagram allowed for visualization start_activities Start activities of the log end_activities End activities of the log serv_time For each activity, the service time in the log font_size Size of the text on the activities/edges bgcolor Background color of the visualization (i.e., 'transparent', 'white', ...) rankdir Direction of the graph ("LR" for left-to-right; "TB" for top-to-bottom) enable_graph_title Enables the visualization of a graph's title graph_title Graph title to display (if enable_graph_title) Returns ----------- viz Digraph object """ if start_activities is None: start_activities = [] if end_activities is None: end_activities = [] filename = tempfile.NamedTemporaryFile(suffix=".gv") filename.close() viz = Digraph( "", filename=filename.name, engine="dot", graph_attr={"bgcolor": bgcolor, "rankdir": rankdir}, ) if enable_graph_title: viz.attr( label='<<FONT POINT-SIZE="' + str(2 * int(font_size)) + '">' + graph_title + "</FONT>>", labelloc="top", ) # first, remove edges in diagram that exceeds the maximum number of edges # in the diagram dfg_key_value_list = [] for edge in dfg: dfg_key_value_list.append([edge, dfg[edge]]) # more fine grained sorting to avoid that edges that are below the threshold are # undeterministically removed dfg_key_value_list = sorted( dfg_key_value_list, key=lambda x: (x[1], x[0][0], x[0][1]), reverse=True, ) dfg_key_value_list = dfg_key_value_list[ 0: min(len(dfg_key_value_list), max_no_of_edges_in_diagram) ] dfg_allowed_keys = [x[0] for x in dfg_key_value_list] dfg_keys = list(dfg.keys()) for edge in dfg_keys: if edge not in dfg_allowed_keys: del dfg[edge] activities_count_int = copy(activities_count) activities_in_dfg = set(activities_count) # assign attributes color if measure == "frequency": activities_color = get_activities_color(activities_count_int) else: activities_color = get_activities_color_serv_time(serv_time) # represent nodes viz.attr("node", shape="box") if len(activities_in_dfg) == 0: activities_to_include = sorted(list(set(activities_count_int))) else: # take unique elements as a list not as a set (in this way, nodes are # added in the same order to the graph) activities_to_include = sorted(list(set(activities_in_dfg))) start_activities_to_include = [ act for act in start_activities if act in activities_to_include ] end_activities_to_include = [ act for act in end_activities if act in activities_to_include ] # calculate edges penwidth ext_dfg = copy(dfg) if start_activities_to_include is not None and start_activities_to_include: for sact in start_activities_to_include: ext_dfg[(constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, sact)] = ( start_activities[sact] ) if end_activities_to_include is not None and end_activities_to_include: for eact in end_activities_to_include: ext_dfg[(eact, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)] = ( end_activities[eact] ) dfg_values = dfg.values() min_dfg_value = min(dfg_values) max_dfg_value = max(dfg_values) penwidth = assign_penwidth_edges(ext_dfg) dfg_edges = sorted(list(dfg.keys())) if start_activities_to_include and end_activities_to_include: activities_to_include, dfg_edges = sort_dfg_reachability( dfg_edges, start_activities_to_include, end_activities_to_include ) activities_map = {} for act in activities_to_include: if "frequency" in measure and act in activities_count_int: viz.node( str(hash(act)), act + " (" + str(activities_count_int[act]) + ")", style="filled", fillcolor=activities_color[act], fontsize=font_size, ) activities_map[act] = str(hash(act)) elif ( "performance" in measure and act in serv_time and serv_time[act] >= 0 ): viz.node( str(hash(act)), act + " (" + human_readable_stat(serv_time[act]) + ")", fontsize=font_size, style="filled", fillcolor=activities_color[act], ) activities_map[act] = str(hash(act)) else: viz.node(str(hash(act)), act, fontsize=font_size) activities_map[act] = str(hash(act)) # represent edges for edge in dfg_edges: if "frequency" in measure or "cost" in measure: label = str(dfg[edge]) else: label = human_readable_stat(dfg[edge]) color = None if "performance" in measure: color = value_to_color(dfg[edge], min_dfg_value, max_dfg_value) viz.edge( str(hash(edge[0])), str(hash(edge[1])), label=label, penwidth=str(penwidth[edge]), fontsize=font_size, color=color, ) if start_activities_to_include: viz.node("@@startnode", "<&#9679;>", shape="circle", fontsize="34") for act in start_activities_to_include: label = ( str(start_activities[act]) if isinstance(start_activities, dict) and measure == "frequency" else "" ) viz.edge( "@@startnode", activities_map[act], label=label, fontsize=font_size, penwidth=str( penwidth[ (constants.DEFAULT_ARTIFICIAL_START_ACTIVITY, act) ] ), ) if end_activities_to_include: # <&#9632;> viz.node("@@endnode", "<&#9632;>", shape="doublecircle", fontsize="32") for act in end_activities_to_include: label = ( str(end_activities[act]) if isinstance(end_activities, dict) and measure == "frequency" else "" ) viz.edge( activities_map[act], "@@endnode", label=label, fontsize=font_size, penwidth=str( penwidth[(act, constants.DEFAULT_ARTIFICIAL_END_ACTIVITY)] ), ) viz.attr(overlap="false") viz.attr(fontsize="11") viz.format = image_format.replace("html", "plain-ext") return viz