Source code for pm4py.algo.discovery.correlation_mining.util
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import numpy as np
from pm4py.util.lp import solver
[docs]
def get_c_matrix(PS_matrix, duration_matrix, activities, activities_counter):
"""
Calculates the C-matrix out of the PS matrix and the duration matrix
Parameters
--------------
PS_matrix
PS matrix
duration_matrix
Duration matrix
activities
Ordered list of activities of the log
activities_counter
Counter of activities
Returns
--------------
c_matrix
C matrix
"""
C_matrix = np.zeros((len(activities), len(activities)))
for i in range(len(activities)):
for j in range(len(activities)):
val = (
duration_matrix[i, j]
/ PS_matrix[i, j]
* 1
/ (
min(
activities_counter[activities[i]],
activities_counter[activities[j]],
)
)
if PS_matrix[i, j] > 0
else 0
)
if val == 0:
val = 100000000000
C_matrix[i, j] = val
return C_matrix
[docs]
def resolve_LP(C_matrix, duration_matrix, activities, activities_counter):
"""
Formulates and solve the LP problem
Parameters
--------------
C_matrix
C_matrix
duration_matrix
Duration matrix
activities
Ordered list of activities of the log
activities_counter
Counter of activities
Returns
-------------
dfg
Directly-Follows Graph
performance_dfg
Performance DFG (containing the estimated performance for the arcs)
"""
edges = [
(i, j) for i in range(len(activities)) for j in range(len(activities))
]
c = [
C_matrix[i, j]
for i in range(len(activities))
for j in range(len(activities))
]
edges_sources = {
i: [z for z in range(len(edges)) if edges[z][0] == i]
for i in range(len(activities))
}
edges_targets = {
j: [z for z in range(len(edges)) if edges[z][1] == j]
for j in range(len(activities))
}
activities_occurrences = {
i: activities_counter[activities[i]] for i in range(len(activities))
}
Aeq = []
beq = []
for i in range(len(activities)):
rec = [0] * len(edges)
for e in edges_sources[i]:
rec[e] = 1
Aeq.append(rec)
beq.append(activities_occurrences[i])
for j in range(len(activities)):
rec = [0] * len(edges)
for e in edges_targets[j]:
rec[e] = 1
Aeq.append(rec)
beq.append(activities_occurrences[j])
Aeq = np.asmatrix(Aeq).astype(np.float64)
beq = np.asmatrix(beq).transpose().astype(np.float64)
Aub = []
bub = []
for i in range(len(activities)):
for e in edges_sources[i]:
rec = [0] * len(edges)
rec[e] = 1
Aub.append(rec)
bub.append(activities_occurrences[i])
rec = [-x for x in rec]
Aub.append(rec)
bub.append(0)
for j in range(len(activities)):
for e in edges_targets[j]:
rec = [0] * len(edges)
rec[e] = 1
Aub.append(rec)
bub.append(activities_occurrences[j])
rec = [-x for x in rec]
Aub.append(rec)
bub.append(0)
Aub = np.asmatrix(Aub).astype(np.float64)
bub = np.asmatrix(bub).transpose().astype(np.float64)
use_cvxopt = False
if (
solver.DEFAULT_LP_SOLVER_VARIANT == solver.CVXOPT_SOLVER_CUSTOM_ALIGN
or solver.DEFAULT_LP_SOLVER_VARIANT
== solver.CVXOPT_SOLVER_CUSTOM_ALIGN_ILP
):
use_cvxopt = True
if use_cvxopt:
from cvxopt import matrix
c = matrix(c)
Aub = matrix(Aub)
bub = matrix(bub)
Aeq = matrix(Aeq)
beq = matrix(beq)
res = solver.apply(
c, Aub, bub, Aeq, beq, variant=solver.DEFAULT_LP_SOLVER_VARIANT
)
points = solver.get_points_from_sol(
res, variant=solver.DEFAULT_LP_SOLVER_VARIANT
)
points = [round(p) for p in points]
dfg = {}
performance_dfg = {}
for idx, p in enumerate(points):
if p > 0:
dfg[(activities[edges[idx][0]], activities[edges[idx][1]])] = p
performance_dfg[
(activities[edges[idx][0]], activities[edges[idx][1]])
] = duration_matrix[edges[idx][0], edges[idx][1]]
return dfg, performance_dfg
[docs]
def match_return_avg_time(ai, aj, exact=False):
"""
Matches two list of times (exact or greedy)
and returns the average.
Parameters
--------------
ai
First list
aj
Second list
Returns
---------------
times_mean
Mean of times
"""
from statistics import mean
if exact:
from pm4py.statistics.util import times_bipartite_matching
matching = times_bipartite_matching.exact_match_minimum_average(ai, aj)
ret_exact = mean([x[1] - x[0] for x in matching]) if matching else 0
return ret_exact
else:
ret_greedy = greedy_match_return_avg_time(ai, aj)
return ret_greedy
[docs]
def greedy_match_return_avg_time(ai, aj):
"""
Matches two list of times with a greedy method
and returns the average.
Parameters
--------------
ai
First list
aj
Second list
parameters
Parameters of the algorithm
Returns
---------------
times_mean
Mean of times
"""
from statistics import mean
tm0 = calculate_time_match_fifo(ai, aj)
td0 = mean([x[1] - x[0] for x in tm0]) if tm0 else 0
tm1 = calculate_time_match_rlifo(ai, aj)
td1 = mean([x[1] - x[0] for x in tm1]) if tm1 else 0
return min(td0, td1)
[docs]
def calculate_time_match_fifo(ai, aj, times0=None):
"""
Associate the times between
two lists of timestamps using FIFO
Parameters
--------------
ai
First list of timestamps
aj
Second list of timestamps
times0
Correspondence between execution times
Returns
--------------
times0
Correspondence between execution times
"""
if times0 is None:
times0 = []
k = 0
z = 0
while k < len(ai):
while z < len(aj):
if ai[k] < aj[z]:
times0.append((ai[k], aj[z]))
z = z + 1
break
z = z + 1
k = k + 1
return times0
[docs]
def calculate_time_match_rlifo(ai, aj, times1=None):
"""
Associate the times between
two lists of timestamps using LIFO (start from end)
Parameters
--------------
ai
First list of timestamps
aj
Second list of timestamps
times0
Correspondence between execution times
Returns
--------------
times0
Correspondence between execution times
"""
if times1 is None:
times1 = []
k = len(ai) - 1
z = len(aj) - 1
while z >= 0:
while k >= 0:
if ai[k] < aj[z]:
times1.append((ai[k], aj[z]))
k = k - 1
break
k = k - 1
z = z - 1
return times1