Source code for pm4py.objects.petri_net.utils.murata

from pm4py.util.lp import solver
from pm4py.util import constants
from pm4py.objects.petri_net.utils.petri_utils import remove_place
from copy import copy
from typing import Tuple
import warnings
import importlib.util
from pm4py.objects.petri_net.obj import PetriNet, Marking


[docs] def apply_reduction( net: PetriNet, im: Marking, fm: Marking ) -> Tuple[PetriNet, Marking, Marking]: """ Apply the Murata reduction to an accepting Petri net, removing the structurally redundant places. The implementation follows the Berthelot algorithm as in: https://svn.win.tue.nl/repos/prom/Packages/Murata/Trunk/src/org/processmining/algorithms/BerthelotAlgorithm.java Parameters --------------- net Petri net im Initial marking fm Final marking Returns -------------- net Petri net im Initial marking fm Final marking """ places = sorted(list(net.places), key=lambda x: x.name) redundant = set() for place in places: # Skip places in the initial or final markings if place in im or place in fm: continue Aeq = [] Aub = [] beq = [] bub = [] # first constraint constraint = [0] * (len(net.places) + 1) for p2 in im: if p2 not in redundant: if p2 == place: constraint[places.index(p2)] = im[p2] else: constraint[places.index(p2)] = -im[p2] constraint[-1] = -1 Aeq.append(constraint) beq.append(0) # second constraints for trans in net.transitions: constraint = [0] * (len(net.places) + 1) for arc in trans.in_arcs: p2 = arc.source if p2 not in redundant: if p2 == place: constraint[places.index(p2)] = arc.weight else: constraint[places.index(p2)] = -arc.weight constraint[-1] = -1 Aub.append(constraint) bub.append(0) # third constraints for trans in net.transitions: constraint = [0] * (len(net.places) + 1) for arc in trans.out_arcs: p2 = arc.target if p2 not in redundant: if p2 == place: constraint[places.index(p2)] = -arc.weight else: constraint[places.index(p2)] = arc.weight Aub.append(constraint) bub.append(0) # fourth constraint for p2 in net.places: if p2 not in redundant: constraint = [0] * (len(net.places) + 1) constraint[places.index(p2)] = -1 Aub.append(constraint) if p2 == place: bub.append(-1) else: bub.append(0) # fifth constraint constraint = [0] * (len(net.places) + 1) constraint[-1] = -1 Aub.append(constraint) bub.append(0) c = [1] * (len(net.places) + 1) integrality = [1] * (len(net.places) + 1) proposed_solver = solver.SCIPY if importlib.util.find_spec("pulp"): proposed_solver = solver.PULP else: if constants.SHOW_INTERNAL_WARNINGS: warnings.warn( "solution from scipy may be unstable. Please install PuLP (pip install pulp) for fully reliable results." ) xx = solver.apply( c, Aub, bub, Aeq, beq, variant=proposed_solver, parameters={"integrality": integrality}, ) if (hasattr(xx, "success") and xx.success) or ( hasattr(xx, "sol_status") and xx.sol_status > -1 ): redundant.add(place) for place in redundant: # Remove the redundant place from the net net = remove_place(net, place) return net, im, fm