'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from typing import Any, Dict, List, Tuple, Union, Optional, Collection
import networkx as nx
import numpy as np
from pm4py.objects.process_tree.obj import ProcessTree, Operator
from pm4py.objects.process_tree.utils.generic import is_leaf, is_tau_leaf
from pm4py.util.lp import solver
from pm4py.utils import project_on_event_attribute
import pandas as pd
from pm4py.util import exec_utils
from enum import Enum
from pm4py.util import constants, xes_constants
from pm4py.objects.log.obj import EventLog
import importlib.util
from functools import lru_cache
from collections import defaultdict
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
SHOW_PROGRESS_BAR = "show_progress_bar"
[docs]
class ProcessTreeAligner:
"""
Implementation of the approach described in:
Schwanen, Christopher T., Wied Pakusa, and Wil MP van der Aalst. "Process tree alignments." Enterprise Design, Operations, and Computing, ser. LNCS, Cham: Springer International Publishing (2024).
"""
def __init__(self, tree: ProcessTree):
self.tree = tree
self.graph = nx.MultiDiGraph()
self.node_id_counter = 0 # Initialize node ID counter
# Cache to store activity-to-edges mapping
self.activity_to_edges = defaultdict(list)
# Flag for tracking if the tree has been processed
self._tree_processed = False
self._build_process_tree_graph(self.tree)
self._precompute_activity_edges()
def _precompute_activity_edges(self):
"""Precompute activity to edges mapping for faster lookups during alignment"""
for edge in self.graph.edges(keys=True, data=True):
u, v, k, data = edge
label = data.get('label')
if label is not None:
self.activity_to_edges[label].append((u, v, k))
def _get_new_node_id(self) -> int:
node_id = self.node_id_counter
self.node_id_counter += 1
return node_id
def _build_process_tree_graph(self, tree: ProcessTree) -> None:
start_node = self._get_new_node_id()
self.graph.add_node(start_node, source=True)
if is_tau_leaf(tree):
self.graph.nodes[start_node]["sink"] = True
return
if tree.operator == Operator.LOOP:
if len(tree.children) != 2:
raise Exception(f"Loop {tree} does not have exactly two children")
if is_tau_leaf(tree.children[0]):
# Special handling when the first child of loop is a tau transition
self.graph.nodes[start_node]["sink"] = True
self._build_process_tree_subgraph(tree.children[1], start_node, start_node)
else:
end_node = self._get_new_node_id()
self.graph.add_node(end_node, sink=True)
self._build_process_tree_subgraph(tree.children[0], start_node, end_node)
self._build_process_tree_subgraph(tree.children[1], end_node, start_node)
else:
end_node = self._get_new_node_id()
self.graph.add_node(end_node, sink=True)
self._build_process_tree_subgraph(tree, start_node, end_node)
self._tree_processed = True
def _build_process_tree_subgraph(self, tree: ProcessTree, start_node: Any, end_node: Any, iac: int = 1) -> None:
if tree.operator is None:
self._build_process_tree_subgraph_leaf(tree, start_node, end_node, iac)
elif tree.operator == Operator.SEQUENCE:
self._build_process_tree_subgraph_sequence(tree, start_node, end_node, iac)
elif tree.operator == Operator.XOR:
self._build_process_tree_subgraph_xor(tree, start_node, end_node, iac)
elif tree.operator == Operator.PARALLEL:
self._build_process_tree_subgraph_parallel(tree, start_node, end_node, iac)
elif tree.operator == Operator.LOOP:
self._build_process_tree_subgraph_loop(tree, start_node, end_node, iac)
else:
raise Exception(f"Operator {tree.operator} is not supported")
def _build_process_tree_subgraph_leaf(self, tree: ProcessTree, start_node: Any, end_node: Any, iac: int) -> None:
if not is_leaf(tree):
raise Exception(f"Subtree {tree} is not a leaf")
self.graph.add_edge(
start_node,
end_node,
label=tree.label,
capacity=1. / iac,
cost=iac if tree.label is not None else 0
)
def _build_process_tree_subgraph_sequence(self, tree: ProcessTree, start_node: Any, end_node: Any,
iac: int) -> None:
if len(tree.children) == 1:
self._build_process_tree_subgraph(tree.children[0], start_node, end_node, iac)
return
nodes = [start_node] + [self._get_new_node_id() for _ in range(len(tree.children) - 1)] + [end_node]
for i in range(len(tree.children)):
self._build_process_tree_subgraph(tree.children[i], nodes[i], nodes[i + 1], iac)
def _build_process_tree_subgraph_xor(self, tree: ProcessTree, start_node: Any, end_node: Any, iac: int) -> None:
for child in tree.children:
self._build_process_tree_subgraph(child, start_node, end_node, iac)
def _build_process_tree_subgraph_parallel(self, tree: ProcessTree, start_node: Any, end_node: Any,
iac: int) -> None:
if tree.operator != Operator.PARALLEL:
raise Exception(f"Operator {tree.operator} is not a parallel")
# Ensure start_node and end_node are added to the graph
if start_node not in self.graph.nodes:
self.graph.add_node(start_node)
if end_node not in self.graph.nodes:
self.graph.add_node(end_node)
if 'shuffle' not in self.graph.nodes[start_node]:
self.graph.nodes[start_node]["shuffle"] = []
self.graph.nodes[start_node]["iac"] = iac
self.graph.nodes[start_node]["is_split"] = True
if 'shuffle' not in self.graph.nodes[end_node]:
self.graph.nodes[end_node]["shuffle"] = []
self.graph.nodes[end_node]["iac"] = iac
self.graph.nodes[end_node]["is_join"] = True
shuffle_split = []
shuffle_join = []
local_iac = iac * len(tree.children)
for child in tree.children:
spread_node_start = self._get_new_node_id()
spread_node_end = self._get_new_node_id()
self.graph.add_node(spread_node_start)
self.graph.add_node(spread_node_end)
# Record shuffle edges
shuffle_split.append((start_node, spread_node_start, 0))
shuffle_join.append((spread_node_end, end_node, 0))
# Add shuffle edges with appropriate capacity
self.graph.add_edge(
start_node, spread_node_start,
label=None, capacity=1. / local_iac, cost=0, shuffle=True
)
self.graph.add_edge(
spread_node_end, end_node,
label=None, capacity=1. / local_iac, cost=0, shuffle=True
)
# Build subgraph for the child
self._build_process_tree_subgraph(child, spread_node_start, spread_node_end, local_iac)
self.graph.nodes[start_node]["shuffle"].append(shuffle_split)
self.graph.nodes[end_node]["shuffle"].append(shuffle_join)
def _build_process_tree_subgraph_loop(self, tree: ProcessTree, start_node: Any, end_node: Any, iac: int) -> None:
old_start_node = start_node
start_node = self._get_new_node_id()
self.graph.add_node(start_node)
self.graph.add_edge(
old_start_node, start_node,
label=None, capacity=1. / iac, cost=0
)
old_end_node = end_node
end_node = self._get_new_node_id()
self.graph.add_node(end_node)
self.graph.add_edge(
end_node, old_end_node,
label=None, capacity=1. / iac, cost=0
)
self._build_process_tree_subgraph(tree.children[0], start_node, end_node, iac)
self._build_process_tree_subgraph(tree.children[1], end_node, start_node, iac)
[docs]
@lru_cache(maxsize=128)
def align(self, trace: Tuple[str]) -> Tuple[float, List[Tuple[str, str]]]:
"""
Align a trace with the process tree. Using tuple for trace to enable caching.
Args:
trace: A tuple of activity labels (converted from list for caching)
Returns:
A tuple containing (alignment cost, alignment moves)
"""
# Convert back to list for processing
trace_list = list(trace)
return self._align_impl(trace_list)
def _align_impl(self, trace: List[str]) -> Tuple[float, List[Tuple[str, str]]]:
"""Implementation of the alignment algorithm"""
align_variant = "pulp"
num_steps = len(trace) + 1
graph = self.graph
# Pre-processed data
edges = list(graph.edges(keys=True))
nodes = list(graph.nodes())
# Use vectorized operations and efficient data structures
# Variable indexing
var_index = {}
var_counter = 0
# Using dictionaries instead of individual variables for better memory management
x_vars = {} # Flow variables
y_vars = {} # Log move variables
z_vars = {} # Sync move variables
s_vars = {} # Shuffle variables
# Pre-allocate arrays for improved performance
# Approximate size of arrays based on problem dimensions
max_vars = num_steps * (len(edges) + len(nodes) + len(edges) + len(nodes))
c = np.zeros(max_vars) # Objective function coefficients
# x variables
for i in range(num_steps):
for e in edges:
x_vars[(i,) + e] = var_counter
var_index[var_counter] = ('x', i, e)
var_counter += 1
# y variables (continuous between 0 and 1)
for i in range(1, num_steps):
for v in nodes:
y_vars[(i, v)] = var_counter
var_index[var_counter] = ('y', i, v)
var_counter += 1
# z variables (continuous between 0 and capacity)
sync_edges = {}
for idx, activity in enumerate(trace):
step = idx + 1
# Use precomputed activity to edges mapping for better performance
matching_edges = self.activity_to_edges.get(activity, [])
sync_edges[step] = matching_edges
for e in matching_edges:
z_vars[(step,) + e] = var_counter
var_index[var_counter] = ('z', step, e)
var_counter += 1
# s variables (binary)
shuffles = {}
for v in nodes:
node_data = graph.nodes[v]
if node_data.get('shuffle'):
for idx, shuffle_edges in enumerate(node_data['shuffle']):
shuffles[(v, idx)] = shuffle_edges
for i in range(num_steps):
for key in shuffles.keys():
s_vars[(i, key)] = var_counter
var_index[var_counter] = ('s', i, key)
var_counter += 1
num_vars = var_counter
# Resize c to actual number of variables
c = np.zeros(num_vars)
# For x variables - vectorized assignment when possible
for key, idx in x_vars.items():
i, u, v, k = key
cost = graph.edges[(u, v, k)].get('cost', 0)
c[idx] = cost
# For y variables - all have cost 1
for idx in y_vars.values():
c[idx] = 1
# For z variables
for key, idx in z_vars.items():
step, u, v, k = key
edge_cost = graph.edges[(u, v, k)].get('cost', 0)
c[idx] = 1 - edge_cost if edge_cost > 1 else 0
# Constraints
# Use more efficient sparse matrix construction
Aeq_rows = []
beq = []
# Flow conservation constraints - vectorize when possible
for i in range(num_steps):
for v in nodes:
row = {}
rhs = 0
# Sum of inflow arcs
for e in edges:
if e[1] == v:
idx = x_vars.get((i,) + e)
if idx is not None:
row[idx] = row.get(idx, 0) + 1
# Sum of outflow arcs
for e in edges:
if e[0] == v:
idx = x_vars.get((i,) + e)
if idx is not None:
row[idx] = row.get(idx, 0) - 1
# Add y and z variables
if i > 0:
idx_y = y_vars.get((i, v))
if idx_y is not None:
row[idx_y] = row.get(idx_y, 0) + 1
for e in edges:
if e[1] == v:
idx_z = z_vars.get((i,) + e)
if idx_z is not None:
row[idx_z] = row.get(idx_z, 0) + 1
if i < num_steps - 1:
idx_y = y_vars.get((i + 1, v))
if idx_y is not None:
row[idx_y] = row.get(idx_y, 0) - 1
for e in edges:
if e[0] == v:
idx_z = z_vars.get((i + 1,) + e)
if idx_z is not None:
row[idx_z] = row.get(idx_z, 0) - 1
# Handle source and sink nodes
if i == 0 and graph.nodes[v].get('source'):
rhs = -1
elif i == len(trace) and graph.nodes[v].get('sink'):
rhs = 1
if row:
Aeq_rows.append((row, rhs))
# Shuffle constraints
for key, shuffle_edges in shuffles.items():
v, idx = key
iac = graph.nodes[v]['iac']
for i in range(num_steps):
row = {}
for u, w, _ in shuffle_edges:
edge = (u, w, 0)
idx_x = x_vars.get((i,) + edge)
if idx_x is not None:
row[idx_x] = row.get(idx_x, 0) + 1
idx_s = s_vars.get((i, key))
if idx_s is not None:
row[idx_s] = row.get(idx_s, 0) - (1.0 / iac)
Aeq_rows.append((row, 0))
# Duplicate labels constraints
Aub_rows = []
bub = []
for i in sync_edges:
if len(sync_edges[i]) > 1:
row = {}
for e in sync_edges[i]:
idx_z = z_vars.get((i,) + e)
if idx_z is not None:
edge_cost = graph.edges[e].get('cost', 0)
row[idx_z] = edge_cost
Aub_rows.append((row, 1))
# Variable bounds
lb = np.zeros(num_vars)
ub = np.full(num_vars, np.inf)
# x variables
for key, idx in x_vars.items():
i, u, v, k = key
capacity = graph.edges[(u, v, k)].get('capacity', np.inf)
ub[idx] = capacity
# y variables
for idx in y_vars.values():
ub[idx] = 1
# z variables
for key, idx in z_vars.items():
i, u, v, k = key
capacity = graph.edges[(u, v, k)].get('capacity', np.inf)
ub[idx] = capacity
# s variables
for idx in s_vars.values():
ub[idx] = 1
lb[idx] = 0
# Variable types
vartype = [0] * num_vars
for idx in s_vars.values():
vartype[idx] = 1
# Build Aeq efficiently with COO format for sparse matrix creation
Aeq_data = []
Aeq_row_idx = []
Aeq_col_idx = []
for row_idx, (row, rhs_value) in enumerate(Aeq_rows):
for var_idx, coeff in row.items():
Aeq_data.append(coeff)
Aeq_row_idx.append(row_idx)
Aeq_col_idx.append(var_idx)
beq.append(rhs_value)
# Create sparse matrix once, then convert to array format after all data is populated
from scipy import sparse
Aeq = sparse.csr_matrix((Aeq_data, (Aeq_row_idx, Aeq_col_idx)), shape=(len(Aeq_rows), num_vars))
Aeq = Aeq.toarray()
# Build Aub
Aub_data = []
Aub_row_idx = []
Aub_col_idx = []
for row_idx, (row, rhs_value) in enumerate(Aub_rows):
for var_idx, coeff in row.items():
Aub_data.append(coeff)
Aub_row_idx.append(row_idx)
Aub_col_idx.append(var_idx)
bub.append(rhs_value)
Aub = sparse.csr_matrix((Aub_data, (Aub_row_idx, Aub_col_idx)), shape=(len(Aub_rows), num_vars))
Aub = Aub.toarray()
# Solver parameters
bounds = [(lb[i], ub[i]) for i in range(num_vars)]
parameters = {
'bounds': bounds,
'integrality': vartype,
}
c = [float(x) for x in c]
bub = [float(x) for x in bub]
beq = [float(x) for x in beq]
if align_variant == "cvxopt_solver_custom_align_ilp":
del parameters["bounds"]
Aub_add = np.zeros((2 * len(bounds), len(c)))
for idx, b in enumerate(bounds):
Aub_add[2 * idx, idx] = -1.0
Aub_add[2 * idx + 1, idx] = 1.0
bub.append(-b[0])
bub.append(b[1])
Aub = np.vstack([Aub, Aub_add])
bub = np.asarray(bub)[:, np.newaxis]
beq = np.asarray(beq)[:, np.newaxis]
from cvxopt import matrix
Aub = matrix(Aub.astype(np.float64))
bub = matrix(bub)
Aeq = matrix(Aeq.astype(np.float64))
beq = matrix(beq)
c = matrix(c)
# Solve the LP problem
try:
sol = solver.apply(c, Aub, bub, Aeq, beq, variant=align_variant, parameters=parameters)
prim_obj = solver.get_prim_obj_from_sol(sol, variant=align_variant)
var_values = solver.get_points_from_sol(sol, variant=align_variant)
# Reconstruct the alignment moves more efficiently
alignment_moves = []
i = 1 # Start from step 1
while i <= len(trace):
activity = trace[i - 1]
move_recorded = False
# Check for synchronous moves (z variables)
for e in sync_edges.get(i, []):
idx_z = z_vars.get((i,) + e)
if idx_z is not None and var_values[idx_z] > 1e-5:
# Synchronous move
alignment_moves.append((activity, activity))
move_recorded = True
break
if move_recorded:
i += 1
continue
# Check for moves on log (y variables)
for v in nodes:
idx_y = y_vars.get((i, v))
if idx_y is not None and var_values[idx_y] > 1e-5:
alignment_moves.append((activity, '>>'))
move_recorded = True
break
if move_recorded:
i += 1
continue
# If neither z nor y variables are active, it's a move on model
# Find active x variables at position i
model_moves = []
for e in edges:
idx_x = x_vars.get((i,) + e)
if idx_x is not None and var_values[idx_x] > 1e-5:
label = graph.edges[e].get('label')
if label is not None and label not in model_moves:
model_moves.append(label)
if model_moves:
for label in model_moves:
alignment_moves.append(('>>', label))
i += 1 # Advance to the next step
else:
# No move detected; advance to prevent infinite loop
alignment_moves.append((activity, '>>'))
i += 1
return prim_obj, alignment_moves
except Exception as e:
raise Exception(f"Optimization failed: {str(e)}")
# Helper function to construct progress bar
def _construct_progress_bar(progress_length, parameters):
if exec_utils.get_param_value(Parameters.SHOW_PROGRESS_BAR, parameters,
constants.SHOW_PROGRESS_BAR) and importlib.util.find_spec("tqdm"):
if progress_length > 1:
from tqdm.auto import tqdm
return tqdm(total=progress_length, desc="aligning log, completed variants :: ")
return None
# Helper function to destroy progress bar
def _destroy_progress_bar(progress):
if progress is not None:
progress.close()
del progress
[docs]
def apply_list_tuple_activities(list_tuple_activities: List[Collection[str]], aligner: ProcessTreeAligner,
parameters: Optional[Dict[Any, Any]] = None) -> List[Dict[str, Any]]:
"""
Apply the alignment algorithm to a list of activities.
Optimized to use caching and more efficient data structures.
"""
if parameters is None:
parameters = {}
# Use a set for faster lookup of variants
variants = set(tuple(v) for v in list_tuple_activities)
variants_align = {}
progress = _construct_progress_bar(len(variants), parameters)
# Pre-compute empty trace alignment
empty_cost, empty_moves = aligner.align(())
empty_cost = round(empty_cost + 10 ** -14, 13)
# Process each variant only once
for v in variants:
alignment_cost, alignment_moves = aligner.align(v)
alignment_cost = round(alignment_cost + 10 ** -14, 13)
fitness = 1.0 - alignment_cost / (empty_cost + len(v)) if (empty_cost + len(v)) > 0 else 0.0
alignment = {"cost": alignment_cost, "alignment": alignment_moves, "fitness": fitness}
variants_align[v] = alignment
if progress is not None:
progress.update()
_destroy_progress_bar(progress)
# Map results back to original list
return [variants_align[tuple(t)] for t in list_tuple_activities]
[docs]
def apply(log: Union[pd.DataFrame, EventLog], process_tree: ProcessTree, parameters: Optional[Dict[Any, Any]] = None) -> \
List[Dict[str, Any]]:
"""
Aligns an event log against a process tree model, using the approach described in:
Schwanen, Christopher T., Wied Pakusa, and Wil MP van der Aalst. "Process tree alignments." Enterprise Design, Operations, and Computing, ser. LNCS, Cham: Springer International Publishing (2024).
Parameters
---------------
log
Event log or Pandas dataframe
process_tree
Process tree
parameters
Parameters of the algorithm, including:
- Parameters.ACTIVITY_KEY => the attribute to be used as activity
- Parameters.SHOW_PROGRESS_BAR => shows the progress bar
Returns
---------------
aligned_traces
List that contains the alignment for each trace
"""
if parameters is None:
parameters = {}
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)
list_tuple_activities = project_on_event_attribute(log, activity_key)
list_tuple_activities = [tuple(x) for x in list_tuple_activities]
aligner = ProcessTreeAligner(process_tree)
return apply_list_tuple_activities(list_tuple_activities, aligner, parameters=parameters)