Source code for pm4py.objects.petri_net.utils.murata
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.util.lp import solver
from pm4py.util import constants
from pm4py.objects.petri_net.utils.petri_utils import remove_place
from copy import copy
from typing import Tuple
import warnings
import importlib.util
from pm4py.objects.petri_net.obj import PetriNet, Marking
[docs]
def apply_reduction(
net: PetriNet, im: Marking, fm: Marking
) -> Tuple[PetriNet, Marking, Marking]:
"""
Apply the Murata reduction to an accepting Petri net, removing the structurally redundant places.
The implementation follows the Berthelot algorithm as in:
https://svn.win.tue.nl/repos/prom/Packages/Murata/Trunk/src/org/processmining/algorithms/BerthelotAlgorithm.java
Parameters
---------------
net
Petri net
im
Initial marking
fm
Final marking
Returns
--------------
net
Petri net
im
Initial marking
fm
Final marking
"""
places = sorted(list(net.places), key=lambda x: x.name)
redundant = set()
for place in places:
# Skip places in the initial or final markings
if place in im or place in fm:
continue
Aeq = []
Aub = []
beq = []
bub = []
# first constraint
constraint = [0] * (len(net.places) + 1)
for p2 in im:
if p2 not in redundant:
if p2 == place:
constraint[places.index(p2)] = im[p2]
else:
constraint[places.index(p2)] = -im[p2]
constraint[-1] = -1
Aeq.append(constraint)
beq.append(0)
# second constraints
for trans in net.transitions:
constraint = [0] * (len(net.places) + 1)
for arc in trans.in_arcs:
p2 = arc.source
if p2 not in redundant:
if p2 == place:
constraint[places.index(p2)] = arc.weight
else:
constraint[places.index(p2)] = -arc.weight
constraint[-1] = -1
Aub.append(constraint)
bub.append(0)
# third constraints
for trans in net.transitions:
constraint = [0] * (len(net.places) + 1)
for arc in trans.out_arcs:
p2 = arc.target
if p2 not in redundant:
if p2 == place:
constraint[places.index(p2)] = -arc.weight
else:
constraint[places.index(p2)] = arc.weight
Aub.append(constraint)
bub.append(0)
# fourth constraint
for p2 in net.places:
if p2 not in redundant:
constraint = [0] * (len(net.places) + 1)
constraint[places.index(p2)] = -1
Aub.append(constraint)
if p2 == place:
bub.append(-1)
else:
bub.append(0)
# fifth constraint
constraint = [0] * (len(net.places) + 1)
constraint[-1] = -1
Aub.append(constraint)
bub.append(0)
c = [1] * (len(net.places) + 1)
integrality = [1] * (len(net.places) + 1)
proposed_solver = solver.SCIPY
if importlib.util.find_spec("pulp"):
proposed_solver = solver.PULP
else:
if constants.SHOW_INTERNAL_WARNINGS:
warnings.warn(
"solution from scipy may be unstable. Please install PuLP (pip install pulp) for fully reliable results."
)
xx = solver.apply(
c,
Aub,
bub,
Aeq,
beq,
variant=proposed_solver,
parameters={"integrality": integrality},
)
if (hasattr(xx, "success") and xx.success) or (
hasattr(xx, "sol_status") and xx.sol_status > -1
):
redundant.add(place)
for place in redundant:
# Remove the redundant place from the net
net = remove_place(net, place)
return net, im, fm