'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from typing import Optional, Dict, Any
from pm4py.util import exec_utils, constants
from enum import Enum
import shutil
import json
import tempfile
import importlib.resources
from pm4py.util import vis_utils
[docs]
class Parameters(Enum):
ENCODING = "encoding"
IFRAME_WIDTH = "iframe_width"
IFRAME_HEIGHT = "iframe_height"
LOCAL_JUPYTER_FILE_NAME = "local_jupyter_file_name"
PERFORMANCE_AGGREGATION_MEASURE = "aggregationMeasure"
ANNOTATION = "annotation"
ACT_METRIC = "act_metric"
EDGE_METRIC = "edge_metric"
ACT_THRESHOLD = "act_threshold"
EDGE_THRESHOLD = "edge_threshold"
[docs]
def wrap_text(text: str, max_length: int = 15) -> str:
words = text.split()
current_line = []
current_length = 0
result_lines = []
for word in words:
# If adding this word to the current line would exceed max_length
if (
current_length + len(word) + (1 if current_line else 0)
> max_length
):
# Join the current line into a string and append to results
result_lines.append(" ".join(current_line))
# Start a new line with the current word
current_line = [word]
current_length = len(word)
else:
# Add this word to the current line
if current_line:
current_length += len(word) + 1 # plus one for the space
else:
current_length += len(word)
current_line.append(word)
# Append the last line if there is any leftover
if current_line:
result_lines.append(" ".join(current_line))
return "\n".join(result_lines)
[docs]
def get_html_file_contents():
with importlib.resources.path(
"pm4py.visualization.ocel.ocdfg.util", "elkjs_ocdfg.html"
) as p:
with open(str(p), "r") as html_file:
return html_file.read()
[docs]
def apply(
ocdfg: Dict[str, Any], parameters: Optional[Dict[Any, Any]] = None
) -> str:
"""
Visualizes an OC-DFG using ELK.JS
Parameters
---------------
ocdfg
OC-DFG
parameters
Parameters of the algorithm:
- Parameters.ACT_METRIC => the metric to use for the activities. Available values:
- "events" => number of events (default)
- "unique_objects" => number of unique objects
- "total_objects" => number of total objects
- Parameters.EDGE_METRIC => the metric to use for the edges. Available values:
- "event_couples" => number of event couples (default)
- "unique_objects" => number of unique objects
- "total_objects" => number of total objects
- Parameters.ANNOTATION => the annotation to use for the visualization. Values:
- "frequency": frequency annotation
- "performance": performance annotation
- Parameters.PERFORMANCE_AGGREGATION_MEASURE => the aggregation measure to use for the performance:
- mean
- median
- min
- max
- sum
Returns
---------------
viz
Visualization file
"""
if parameters is None:
parameters = {}
from statistics import mean, median
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
aggregation_measure = exec_utils.get_param_value(
Parameters.PERFORMANCE_AGGREGATION_MEASURE, parameters, "mean"
)
act_key = exec_utils.get_param_value(
Parameters.ACT_METRIC, parameters, "events"
)
edge_key = exec_utils.get_param_value(
Parameters.EDGE_METRIC, parameters, "event_couples"
)
annotation = exec_utils.get_param_value(
Parameters.ANNOTATION, parameters, "frequency"
)
act_threshold = exec_utils.get_param_value(
Parameters.ACT_THRESHOLD, parameters, 0
)
edge_threshold = exec_utils.get_param_value(
Parameters.EDGE_THRESHOLD, parameters, 0
)
pref_act = (
" E="
if act_key == "events"
else " UO=" if act_key == "unique_objects" else " TO="
)
pref_edge = (
" EC="
if edge_key == "event_couples"
else " UO=" if edge_key == "unique_objects" else " TO="
)
data = {"objectTypes": [], "overallActivityStats": {}}
added_activities = set()
for act, ent in ocdfg["activities_indep"][act_key].items():
if len(ent) >= act_threshold:
data["overallActivityStats"][wrap_text(act)] = {
"totalFrequency": pref_act + str(len(ent))
}
added_activities.add(act)
counter = 0
for ot, content in ocdfg["activities_ot"][act_key].items():
counter += 1
list_item = {}
list_item["objType"] = str(counter)
list_item["headerLabel"] = wrap_text(ot)
list_item["activities"] = []
for act, ent in content.items():
if act in added_activities:
list_item["activities"].append(
{"name": wrap_text(act), "frequency": pref_act + str(len(ent))}
)
content2 = ocdfg["edges"][edge_key][ot]
content3 = ocdfg["edges_performance"][edge_key][ot]
content4 = ocdfg["start_activities"][act_key][ot]
content5 = ocdfg["end_activities"][act_key][ot]
list_edges = []
for tup, ent in content2.items():
perf = content3[tup]
if aggregation_measure == "median":
perf = median(perf)
elif aggregation_measure == "min":
perf = min(perf)
elif aggregation_measure == "max":
perf = max(perf)
elif aggregation_measure == "sum":
perf = sum(perf)
else:
perf = mean(perf)
if tup[0] in added_activities and tup[1] in added_activities:
if len(ent) >= edge_threshold:
list_edges.append(
{
"source": wrap_text(tup[0]),
"target": wrap_text(tup[1]),
"frequency": pref_edge + str(len(ent)),
"performance": vis_utils.human_readable_stat(perf),
}
)
for act, ent in content4.items():
if act in added_activities:
if len(ent) >= edge_threshold:
list_edges.append(
{
"source": "Start",
"target": wrap_text(act),
"frequency": pref_edge + str(len(ent)),
"performance": vis_utils.human_readable_stat(0.0),
}
)
for act, ent in content5.items():
if act in added_activities:
if len(ent) >= edge_threshold:
list_edges.append(
{
"source": wrap_text(act),
"target": "End",
"frequency": pref_edge + str(len(ent)),
"performance": vis_utils.human_readable_stat(0.0),
}
)
list_item["edges"] = list_edges
data["objectTypes"].append(list_item)
stru = json.dumps(data, indent=2)
if annotation == "frequency":
suffix = (
"drawGraph(data, {showFrequency: true, showPerformance: false});\n"
)
else:
suffix = (
"drawGraph(data, {showFrequency: false, showPerformance: true});\n"
)
stru = "const data = " + stru + ";\n\n" + suffix
F = tempfile.NamedTemporaryFile(suffix=".html")
F.close()
F = open(F.name, "w", encoding=encoding)
F.write(get_html_file_contents().replace("REPLACE", stru))
F.close()
return F.name
[docs]
def view(temp_file_name, parameters=None):
"""
View the SNA visualization on the screen
Parameters
-------------
temp_file_name
Temporary file name
parameters
Possible parameters of the algorithm
"""
if parameters is None:
parameters = {}
if constants.DEFAULT_ENABLE_VISUALIZATIONS_VIEW:
iframe_width = exec_utils.get_param_value(
Parameters.IFRAME_WIDTH, parameters, 900
)
iframe_height = exec_utils.get_param_value(
Parameters.IFRAME_HEIGHT, parameters, 600
)
local_jupyter_file_name = exec_utils.get_param_value(
Parameters.LOCAL_JUPYTER_FILE_NAME,
parameters,
"jupyter_bpmn_vis.html",
)
if vis_utils.check_visualization_inside_jupyter():
from IPython.display import IFrame
shutil.copyfile(temp_file_name, local_jupyter_file_name)
iframe = IFrame(
local_jupyter_file_name,
width=iframe_width,
height=iframe_height,
)
from IPython.display import display
return display(iframe)
else:
vis_utils.open_opsystem_image_viewer(temp_file_name)
[docs]
def save(temp_file_name, dest_file, parameters=None):
"""
Save the SNA visualization from a temporary file to a well-defined destination file
Parameters
-------------
temp_file_name
Temporary file name
dest_file
Destination file
parameters
Possible parameters of the algorithm
"""
if parameters is None:
parameters = {}
shutil.copyfile(temp_file_name, dest_file)