Source code for pm4py.objects.petri_net.saw_net.semantics

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or 
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''

import copy
import itertools
import random
from abc import ABC, abstractclassmethod
from collections import Counter
from typing import Counter as TCounter
from typing import Generic, List, Optional, Tuple, TypeVar

from pm4py.objects.petri_net.saw_net.obj import StochasticArcWeightNet
from pm4py.objects.petri_net.stochastic.semantics import (
    StochasticPetriNetSemantics,
)

N = TypeVar("N", bound=StochasticArcWeightNet)
T = TypeVar("T", bound=StochasticArcWeightNet.Transition)
P = TypeVar("P", bound=StochasticArcWeightNet.Place)
A = TypeVar("A", bound=StochasticArcWeightNet.Arc)
B = TypeVar("B", bound=StochasticArcWeightNet.Binding)


[docs] class StochasticArcWeightNetSemantics( StochasticPetriNetSemantics[N], Generic[N], ABC ):
[docs] @classmethod def is_enabled(cls, pn: N, transition: T, marking: TCounter[P]) -> bool: """ Checks whether a given transition is enabled in a given Petri net and marking. Every place should at least have the same number of tokens as the minimum binding that has a weight above 0 Parameters ---------- :param pn: Petri net :param transition: transition to check :param marking: marking to check Returns ------- :return: true if enabled, false otherwise """ if transition not in pn.transitions: return False for a in transition.in_arcs: if marking[a.source] < min( [k for k, v in a.weight_distribution.items() if v > 0] ): return False return True
[docs] @classmethod def fire(cls, pn: N, binding: B, marking: TCounter[P]) -> TCounter[P]: """fires the binding in the given marking. Does not check if the the binding is feasible (this should be handled by the invoking code) Args: pn (N): saw net to use marking (TCounter[P]): marking to use binding (B): binding to use Returns: TCounter[P]: _description_ """ m_out = copy.copy(marking) for a, w in binding: if isinstance(a.source, StochasticArcWeightNet.Place): m_out[a.source] -= w else: m_out[a.target] += w return m_out
[docs] @classmethod def all_enabled_bindings( cls, pn: N, transition: T, marking: TCounter[P] ) -> List[List[Tuple[A, int]]]: """ Creates all possible feasible bindings for a given input transition in a given marking Parameters ---------- :param pn: Petri net :param marking: marking to use :param transition: transition to genereate all feasible bindings for Returns ------- :return: list containing all posible feasible bindings """ return list( filter( lambda b: cls.is_enabled_binding(pn, transition, b, marking), cls.all_legal_bindings(pn, transition), ) )
[docs] @classmethod def is_enabled_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> bool: """ Checks if the provided binding is enabled Parameters ---------- :param pn: Petri net :param marking: marking to use :param transition: transition to genereate all feasible bindings for Returns ------- :return: bool indicates if the binding is enabled """ if transition not in pn.transitions: return False places_in_bindings = set( filter( lambda x: isinstance(x, StochasticArcWeightNet.Place), [ x for x in list( itertools.chain( *[(a.source, a.target) for (a, w) in binding] ) ) ], ) ) for a in transition.in_arcs: if a.source not in places_in_bindings: return False for a in transition.out_arcs: if a.target not in places_in_bindings: return False for a, w in binding: if a.weight_distribution[w] == 0.0: return False if transition not in {a.source, a.target}: return False if transition == a.target: if w > marking[a.source]: return False return True
[docs] @classmethod def amortized_priority( cls, binding: StochasticArcWeightNet.Binding ) -> float: """ Computes the amortized priority (a.k.a weight) of a binding. The amortized priority is equal to the product of all individual weights of the arc weights includec in the binding. Args: binding (StochasticArcWeightNet.Binding): input binding Returns: float: amortized weight """ prod = 1 for a, w in binding: prod *= a.weight_distribution[w] return prod
[docs] @abstractclassmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ pass
[docs] class LocalStochasticArcWeightNetSemantics( StochasticArcWeightNetSemantics[N], Generic[N] ):
[docs] @classmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ s = 0 for t in pn.transitions: if cls.is_enabled(pn, t, marking): for b in cls.all_enabled_bindings(pn, transition, marking): s += t.weight * cls.amortized_priority(b) print(cls.amortized_priority(binding)) return transition.weight * cls.amortized_priority(binding) / s
[docs] class GlobalStochasticArcWeightNetSemantics( StochasticArcWeightNetSemantics[N], Generic[N] ):
[docs] @classmethod def probability_of_transition( cls, pn: N, transition: T, marking: TCounter[P] ) -> float: """ Compute the probability of firing a transition in the net and marking. Args: pn (N): Stochastic net transition (T): transition to fire marking (Counter[P]): marking to use Returns: float: _description_ """ return ( 0.0 if transition not in pn.transitions or not cls.is_enabled(pn, transition, marking) else sum( [ transition.weight * cls.amortized_priority(b) for b in cls.all_enabled_bindings(pn, transition, marking) ] ) / sum( [ t.weight * cls.amortized_priority(b) for t in pn.transitions if cls.is_enabled(pn, t, marking) for b in cls.all_enabled_bindings(pn, t, marking) ] ) )
[docs] @classmethod def sample_enabled_transition( cls, pn: N, marking: TCounter[P], seed: int = None ) -> Optional[Tuple[T, B]]: if seed is not None: random.seed(seed) bindings = list() probs = list() for t in [t for t in pn.transitions if cls.is_enabled(pn, t, marking)]: for b in cls.all_enabled_bindings(pn, t, marking): bindings.append((t, b)) probs.append(cls.probability_of_binding(pn, t, b, marking)) return ( None if len(bindings) == 0 else random.choices(bindings, probs)[0] )
[docs] @classmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ return ( 0.0 if transition not in pn.transitions or not cls.is_enabled_binding(pn, transition, binding, marking) else (transition.weight * cls.amortized_priority(binding)) / sum( [ t.weight * cls.amortized_priority(b) for t in pn.transitions if cls.is_enabled(pn, t, marking) for b in cls.all_enabled_bindings(pn, t, marking) ] ) )