'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import os
import tempfile
import uuid
from enum import Enum
from math import log10
from typing import Any, Dict, Optional, Union
import pandas as pd
from pm4py.util import exec_utils, constants
[docs]
class Parameters(Enum):
FORMAT = "format"
NODE_HEIGHT = "node_height"
NODE_WIDTH = "node_width"
EDGE_PENWIDTH = "edge_penwidth"
MAX_VARIANTS = "max_variants"
ALIGNMENT_CRITERIA = "alignment_criteria"
MIN_HORIZONTAL_DISTANCE = "min_horizontal_distance"
MAX_HORIZONTAL_DISTANCE = "max_horizontal_distance"
LAYOUT_EXT_MULTIPLIER = "layout_ext_multiplier"
SHOW_LEGEND = "show_legend"
ENABLE_GRAPH_TITLE = "enable_graph_title"
GRAPH_TITLE = "graph_title"
[docs]
def apply(
variants_df: pd.DataFrame,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
):
if parameters is None:
parameters = {}
# Extract parameters
format = exec_utils.get_param_value(Parameters.FORMAT, parameters, "png")
node_height = exec_utils.get_param_value(Parameters.NODE_HEIGHT, parameters, 0.85)
node_width = exec_utils.get_param_value(Parameters.NODE_WIDTH, parameters, 0.85)
edge_penwidth = exec_utils.get_param_value(Parameters.EDGE_PENWIDTH, parameters, 1.0)
max_variants = exec_utils.get_param_value(Parameters.MAX_VARIANTS, parameters, 5)
alignment_criteria = exec_utils.get_param_value(Parameters.ALIGNMENT_CRITERIA, parameters, "start")
min_horizontal_distance = exec_utils.get_param_value(Parameters.MIN_HORIZONTAL_DISTANCE, parameters, 1.5)
max_horizontal_distance = exec_utils.get_param_value(Parameters.MAX_HORIZONTAL_DISTANCE, parameters, 4.5)
layout_ext_multiplier = exec_utils.get_param_value(Parameters.LAYOUT_EXT_MULTIPLIER, parameters, 75)
enable_graph_title = exec_utils.get_param_value(Parameters.ENABLE_GRAPH_TITLE, parameters,
constants.DEFAULT_ENABLE_GRAPH_TITLES)
graph_title = exec_utils.get_param_value(Parameters.GRAPH_TITLE, parameters, "Process Variants Paths and Durations")
# Required column names
variant_column = "@@variant_column"
variant_count = "@@variant_count"
index_column = "@@index_in_trace"
flow_time_column = "@@flow_time"
activity_key = "concept:name"
activity_key_2 = "concept:name_2"
# Sort variants from most frequent to least frequent and pick top N
unique_variants = variants_df[[variant_column, variant_count]].drop_duplicates()
unique_variants = unique_variants.sort_values(variant_count, ascending=False)
top_variants = unique_variants.head(max_variants)[variant_column].tolist()
filtered_df = variants_df[variants_df[variant_column].isin(top_variants)]
# Temporary .gv and output file
output_file_gv = tempfile.NamedTemporaryFile(suffix=".gv")
output_file_gv.close()
output_file_img = tempfile.NamedTemporaryFile(suffix="." + format)
output_file_img.close()
# For distance normalization
max_flow_time = filtered_df[flow_time_column].max()
# Assign each variant a color
variant_colors = {
variant: f"#{hash(str(variant)) % 0xffffff:06x}"
for variant in top_variants
}
# Build the GraphViz lines
lines = ["graph G {"]
if enable_graph_title:
lines.append(
f' label=<<FONT POINT-SIZE="20">{graph_title}</FONT>>;'
' labelloc="top";'
)
lines.append(' layout=neato;')
lines.append(' splines=true;')
# Store the computed positions of each activity per variant
variant_y_pos = {}
variant_node_positions = {}
# Assign y-coordinates to variants
total_variants = len(top_variants)
for i, variant in enumerate(top_variants):
y_coord = (total_variants - i) * layout_ext_multiplier
variant_y_pos[variant] = y_coord
variant_node_positions[variant] = {}
# Calculate positions starting from 0 for all variants
for variant in top_variants:
vdf = filtered_df[filtered_df[variant_column] == variant].sort_values(index_column)
for _, row in vdf.iterrows():
src_activity = row[activity_key]
tgt_activity = row[activity_key_2]
flow_time = row[flow_time_column]
src_idx = row[index_column]
if src_activity not in variant_node_positions[variant]:
if src_idx == 0:
variant_node_positions[variant][src_activity] = 0
else:
continue # Skip if source activity is missing (should not happen with sorted data)
# Convert flow_time to a distance
if flow_time == 0:
distance = min_horizontal_distance
else:
norm_time = min(1.0, log10(1 + flow_time) / log10(1 + max_flow_time))
distance = min_horizontal_distance + norm_time * (max_horizontal_distance - min_horizontal_distance)
x_old = variant_node_positions[variant][src_activity]
x_new = x_old + distance * layout_ext_multiplier
variant_node_positions[variant][tgt_activity] = x_new
# Apply shifts based on alignment criteria
if alignment_criteria == "start":
# No shift needed
pass
elif alignment_criteria == "end":
for variant in top_variants:
# Find the position of the last activity (maximum position)
last_pos = max(variant_node_positions[variant].values())
shift = -last_pos
for activity in variant_node_positions[variant]:
variant_node_positions[variant][activity] += shift
else:
# alignment_criteria is the name of some activity
variants_with_activity = [
variant for variant in top_variants
if alignment_criteria in variant_node_positions[variant]
]
variants_without_activity = [
variant for variant in top_variants
if alignment_criteria not in variant_node_positions[variant]
]
if variants_without_activity:
missing_variants = ", ".join(str(v) for v in variants_without_activity)
raise ValueError(
f"Alignment activity '{alignment_criteria}' not found in variants: {missing_variants}. "
"All variants must contain the alignment activity."
)
for variant in variants_with_activity:
align_pos = variant_node_positions[variant][alignment_criteria]
shift = -align_pos
for activity in variant_node_positions[variant]:
variant_node_positions[variant][activity] += shift
# Create node labels for each variant
for i, variant in enumerate(top_variants):
count = unique_variants[unique_variants[variant_column] == variant][variant_count].iloc[0]
label_text = f"Variant\n{i + 1}\n({count} cases)"
label_node_id = f"label_{uuid.uuid4().hex[:12]}"
lines.append(
f' {label_node_id} [label="{label_text}", shape=none, '
f'fontsize="10pt", pos="-60,{variant_y_pos[variant]}!", fixedsize=true];'
)
# Create actual activity nodes and edges
for variant in top_variants:
y_pos = variant_y_pos[variant]
vdf = filtered_df[filtered_df[variant_column] == variant].sort_values(index_column)
color = variant_colors[variant]
activity_node_ids = {}
# Nodes
for activity, x_pos in variant_node_positions[variant].items():
node_id = f"n{uuid.uuid4().hex[:12]}"
activity_node_ids[activity] = node_id
label = activity.replace(" ", "\\n") # literal backslash-n for Graphviz
lines.append(
f' {node_id} [label="{label}", shape=box, style="filled,rounded", '
f'fillcolor="{color}", width={node_width}, height={node_height}, '
f'pos="{x_pos},{y_pos}!", fontsize="8pt", fixedsize=true];'
)
# Edges
for _, row in vdf.iterrows():
src = row[activity_key]
tgt = row[activity_key_2]
ftime = row[flow_time_column]
if ftime < 60:
label_time = f"{ftime:.1f}s"
elif ftime < 3600:
label_time = f"{ftime / 60:.1f}m"
elif ftime < 86400:
label_time = f"{ftime / 3600:.1f}h"
else:
label_time = f"{ftime / 86400:.1f}d"
src_id = activity_node_ids[src]
tgt_id = activity_node_ids[tgt]
lines.append(
f' {src_id} -- {tgt_id} [label="{label_time}", fontsize="7pt", '
f'color="{color}", penwidth={edge_penwidth}];'
)
lines.append("}")
# Write .gv file
with open(output_file_gv.name, "w") as f:
f.write("\n".join(lines))
# Use neato -n2 to respect exact coordinates
os.system(f'neato -n2 -T{format} "{output_file_gv.name}" > "{output_file_img.name}"')
return output_file_img.name