'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Optional, Dict, Any, Tuple, List
import numpy as np
from pm4py.objects.petri_net.utils import (
align_utils,
petri_utils as petri_utils,
)
from pm4py.objects.petri_net.utils.incidence_matrix import IncidenceMatrix
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import exec_utils, constants
from pm4py.util.lp import solver
[docs]
class Parameters(Enum):
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
COSTS = "costs"
INCIDENCE_MATRIX = "incidence_matrix"
A = "A_matrix"
FULL_BOOTSTRAP_REQUIRED = "full_bootstrap_required"
[docs]
class MarkingEquationSolver(object):
def __init__(
self,
net: PetriNet,
im: Marking,
fm: Marking,
parameters: Optional[Dict[Any, Any]] = None,
):
"""
Constructor
Parameters
---------------
net
Petri net
im
Initial marking
fm
Final marking
parameters
Parameters of the algorithm, including:
- Parameters.CASE_ID_KEY => attribute to use as case identifier
- Parameters.ACTIVITY_KEY => attribute to use as activity
- Parameters.COSTS => (if provided) the cost function (otherwise the default cost function is applied)
- Parameters.INCIDENCE_MATRIX => (if provided) the incidence matrix of the sync product net
- Parameters.A => (if provided) the A numpy matrix of the incidence matrix
- Parameters.FULL_BOOTSTRAP_REQUIRED => The preset/postset of places/transitions need to be inserted
"""
if parameters is None:
parameters = {}
costs = exec_utils.get_param_value(Parameters.COSTS, parameters, None)
if costs is None:
costs = align_utils.construct_standard_cost_function(
net, align_utils.SKIP
)
self.net = net
self.ini = im
self.fin = fm
self.costs = costs
self.incidence_matrix = exec_utils.get_param_value(
Parameters.INCIDENCE_MATRIX, parameters, IncidenceMatrix(self.net)
)
self.Aeq = exec_utils.get_param_value(
Parameters.A,
parameters,
np.asmatrix(self.incidence_matrix.a_matrix),
)
self.full_bootstrap_required = exec_utils.get_param_value(
Parameters.FULL_BOOTSTRAP_REQUIRED, parameters, True
)
self.__build_entities()
self.__build_problem_components()
def __build_entities(self):
"""
Builds entities useful to define the marking equation
"""
transitions = self.incidence_matrix.transitions
self.inv_indices = {y: x for x, y in transitions.items()}
self.inv_indices = [
self.inv_indices[i] for i in range(len(self.inv_indices))
]
self.ini_vec = np.matrix(
self.incidence_matrix.encode_marking(self.ini)
).transpose()
self.fin_vec = np.matrix(
self.incidence_matrix.encode_marking(self.fin)
).transpose()
if self.full_bootstrap_required:
petri_utils.decorate_transitions_prepostset(self.net)
petri_utils.decorate_places_preset_trans(self.net)
def __build_problem_components(self):
"""
Builds the components needed to solve the marking equation
"""
self.beq = self.fin_vec - self.ini_vec
self.Aub = -np.eye(self.Aeq.shape[1])
self.bub = np.zeros((self.Aeq.shape[1], 1))
self.c = [
self.costs[self.inv_indices[i]]
for i in range(len(self.inv_indices))
]
if (
solver.DEFAULT_LP_SOLVER_VARIANT
== solver.CVXOPT_SOLVER_CUSTOM_ALIGN
):
from cvxopt import matrix
self.Aeq_transf = matrix(self.Aeq.astype(np.float64))
self.Aub_transf = matrix(self.Aub.astype(np.float64))
self.c_transf = matrix([1.0 * x for x in self.c])
else:
self.Aeq_transf = self.Aeq
self.Aub_transf = self.Aub
self.c_transf = self.c
[docs]
def get_components(self) -> Tuple[Any, Any, Any, Any, Any]:
"""
Retrieve the components (Numpy matrixes) of the problem
Returns
---------------
c
objective function
Aub
Inequalities matrix
bub
Inequalities vector
Aeq
Equalities matrix
beq
Equalities vector
"""
if (
solver.DEFAULT_LP_SOLVER_VARIANT
== solver.CVXOPT_SOLVER_CUSTOM_ALIGN
):
from cvxopt import matrix
self.beq_transf = matrix(self.beq.astype(np.float64))
self.bub_transf = matrix(self.bub.astype(np.float64))
else:
self.beq_transf = self.beq
self.bub_transf = self.bub
return (
self.c_transf,
self.Aub_transf,
self.bub_transf,
self.Aeq_transf,
self.beq_transf,
)
[docs]
def change_ini_vec(self, ini: Marking):
"""
Changes the initial marking of the synchronous product net
Parameters
--------------
ini
Initial marking
"""
self.ini = ini
self.ini_vec = np.matrix(
self.incidence_matrix.encode_marking(ini)
).transpose()
self.beq = self.fin_vec - self.ini_vec
[docs]
def get_x_vector(self, sol_points: List[int]) -> List[int]:
"""
Returns the x vector of the solution
Parameters
--------------
sol_points
Solution of the integer problem
Returns
---------------
x
X vector
"""
return sol_points
[docs]
def get_h(self, sol_points: List[int]) -> int:
"""
Returns the value of the heuristics
Parameters
--------------
sol_points
Solution of the integer problem
Returns
--------------
h
Heuristics value
"""
return int(np.dot(sol_points, self.c))
[docs]
def get_activated_transitions(
self, sol_points: List[int]
) -> List[PetriNet.Transition]:
"""
Gets the transitions of the synchronous product net that are non-zero
in the solution of the marking equation
Parameters
--------------
sol_points
Solution of the integer problem
Returns
--------------
act_trans
Activated transitions
"""
act_trans = []
for i in range(len(sol_points)):
for j in range(sol_points[i]):
act_trans.append(self.inv_indices[i])
return act_trans
[docs]
def solve(self) -> Tuple[int, List[int]]:
"""
Solves the marking equation, returning the heuristics and the x vector
Returns
-------------
h
Heuristics value
x
X vector
"""
c, Aub, bub, Aeq, beq = self.get_components()
return self.solve_given_components(c, Aub, bub, Aeq, beq)
[docs]
def solve_given_components(self, c, Aub, bub, Aeq, beq):
"""
Solves the linear problem given the components
Parameters
--------------
c
Objective vector
Aub
Inequalities matrix
bub
Inequalities vector
Aeq
Equalities matrix
beq
Equalities vector
Returns
-------------
h
Heuristics value
x
X vector
"""
if (
solver.DEFAULT_LP_SOLVER_VARIANT
== solver.CVXOPT_SOLVER_CUSTOM_ALIGN
and type(c) is list
):
from cvxopt import matrix
Aub = matrix(Aub.astype(np.float64))
bub = matrix(bub.astype(np.float64))
Aeq = matrix(Aeq.astype(np.float64))
beq = matrix(beq.astype(np.float64))
c = matrix([1.0 * x for x in c])
sol = solver.apply(
c, Aub, bub, Aeq, beq, variant=solver.DEFAULT_LP_SOLVER_VARIANT
)
sol_points = solver.get_points_from_sol(
sol, variant=solver.DEFAULT_LP_SOLVER_VARIANT
)
if sol_points is not None:
x = self.get_x_vector(sol_points)
x = [int(y) for y in x]
h = self.get_h(sol_points)
return h, x
return None, None
[docs]
def get_firing_sequence(
self, x: List[int]
) -> Tuple[List[PetriNet.Transition], bool, int]:
"""
Gets a firing sequence from the X vector
Parameters
----------------
x
X vector
Returns
----------------
firing_sequence
Firing sequence
reach_fm
Boolean value that is true whether the firing sequence reaches the final marking
explained_events
Number of explaned events by the firing sequence
"""
activated_transitions = self.get_activated_transitions(x)
firing_sequence, reach_fm, explained_events = (
align_utils.search_path_among_sol(
self.net, self.ini, self.fin, activated_transitions
)
)
return firing_sequence, reach_fm, explained_events
[docs]
def build(
net: PetriNet,
im: Marking,
fm: Marking,
parameters: Optional[Dict[Any, Any]] = None,
) -> MarkingEquationSolver:
"""
Builds the marking equation out of a Petri net
Parameters
---------------
net
Petri net
im
Initial marking
fm
Final marking
parameters
Parameters of the algorithm, including:
- Parameters.CASE_ID_KEY => attribute to use as case identifier
- Parameters.ACTIVITY_KEY => attribute to use as activity
- Parameters.COSTS => (if provided) the cost function (otherwise the default cost function is applied)
- Parameters.INCIDENCE_MATRIX => (if provided) the incidence matrix of the Petri net
- Parameters.A => (if provided) the A numpy matrix of the incidence matrix
- Parameters.FULL_BOOTSTRAP_REQUIRED => The preset/postset of places/transitions need to be inserted
"""
if parameters is None:
parameters = {}
return MarkingEquationSolver(net, im, fm, parameters=parameters)
[docs]
def get_h_value(
solver: MarkingEquationSolver, parameters: Optional[Dict[Any, Any]] = None
) -> int:
"""
Gets the heuristics value from the marking equation
Parameters
--------------
solver
Marking equation solver (class in this file)
parameters
Possible parameters of the algorithm
"""
return solver.solve()[0]