Source code for pm4py.objects.petri_net.saw_net.semantics

import copy
import itertools
import random
from abc import ABC, abstractclassmethod
from collections import Counter
from typing import Counter as TCounter
from typing import Generic, List, Optional, Tuple, TypeVar

from pm4py.objects.petri_net.saw_net.obj import StochasticArcWeightNet
from pm4py.objects.petri_net.stochastic.semantics import (
    StochasticPetriNetSemantics,
)

N = TypeVar("N", bound=StochasticArcWeightNet)
T = TypeVar("T", bound=StochasticArcWeightNet.Transition)
P = TypeVar("P", bound=StochasticArcWeightNet.Place)
A = TypeVar("A", bound=StochasticArcWeightNet.Arc)
B = TypeVar("B", bound=StochasticArcWeightNet.Binding)


[docs] class StochasticArcWeightNetSemantics( StochasticPetriNetSemantics[N], Generic[N], ABC ):
[docs] @classmethod def is_enabled(cls, pn: N, transition: T, marking: TCounter[P]) -> bool: """ Checks whether a given transition is enabled in a given Petri net and marking. Every place should at least have the same number of tokens as the minimum binding that has a weight above 0 Parameters ---------- :param pn: Petri net :param transition: transition to check :param marking: marking to check Returns ------- :return: true if enabled, false otherwise """ if transition not in pn.transitions: return False for a in transition.in_arcs: if marking[a.source] < min( [k for k, v in a.weight_distribution.items() if v > 0] ): return False return True
[docs] @classmethod def fire(cls, pn: N, binding: B, marking: TCounter[P]) -> TCounter[P]: """fires the binding in the given marking. Does not check if the the binding is feasible (this should be handled by the invoking code) Args: pn (N): saw net to use marking (TCounter[P]): marking to use binding (B): binding to use Returns: TCounter[P]: _description_ """ m_out = copy.copy(marking) for a, w in binding: if isinstance(a.source, StochasticArcWeightNet.Place): m_out[a.source] -= w else: m_out[a.target] += w return m_out
[docs] @classmethod def all_enabled_bindings( cls, pn: N, transition: T, marking: TCounter[P] ) -> List[List[Tuple[A, int]]]: """ Creates all possible feasible bindings for a given input transition in a given marking Parameters ---------- :param pn: Petri net :param marking: marking to use :param transition: transition to genereate all feasible bindings for Returns ------- :return: list containing all posible feasible bindings """ return list( filter( lambda b: cls.is_enabled_binding(pn, transition, b, marking), cls.all_legal_bindings(pn, transition), ) )
[docs] @classmethod def is_enabled_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> bool: """ Checks if the provided binding is enabled Parameters ---------- :param pn: Petri net :param marking: marking to use :param transition: transition to genereate all feasible bindings for Returns ------- :return: bool indicates if the binding is enabled """ if transition not in pn.transitions: return False places_in_bindings = set( filter( lambda x: isinstance(x, StochasticArcWeightNet.Place), [ x for x in list( itertools.chain( *[(a.source, a.target) for (a, w) in binding] ) ) ], ) ) for a in transition.in_arcs: if a.source not in places_in_bindings: return False for a in transition.out_arcs: if a.target not in places_in_bindings: return False for a, w in binding: if a.weight_distribution[w] == 0.0: return False if transition not in {a.source, a.target}: return False if transition == a.target: if w > marking[a.source]: return False return True
[docs] @classmethod def amortized_priority( cls, binding: StochasticArcWeightNet.Binding ) -> float: """ Computes the amortized priority (a.k.a weight) of a binding. The amortized priority is equal to the product of all individual weights of the arc weights includec in the binding. Args: binding (StochasticArcWeightNet.Binding): input binding Returns: float: amortized weight """ prod = 1 for a, w in binding: prod *= a.weight_distribution[w] return prod
[docs] @abstractclassmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ pass
[docs] class LocalStochasticArcWeightNetSemantics( StochasticArcWeightNetSemantics[N], Generic[N] ):
[docs] @classmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ s = 0 for t in pn.transitions: if cls.is_enabled(pn, t, marking): for b in cls.all_enabled_bindings(pn, transition, marking): s += t.weight * cls.amortized_priority(b) print(cls.amortized_priority(binding)) return transition.weight * cls.amortized_priority(binding) / s
[docs] class GlobalStochasticArcWeightNetSemantics( StochasticArcWeightNetSemantics[N], Generic[N] ):
[docs] @classmethod def probability_of_transition( cls, pn: N, transition: T, marking: TCounter[P] ) -> float: """ Compute the probability of firing a transition in the net and marking. Args: pn (N): Stochastic net transition (T): transition to fire marking (Counter[P]): marking to use Returns: float: _description_ """ return ( 0.0 if transition not in pn.transitions or not cls.is_enabled(pn, transition, marking) else sum( [ transition.weight * cls.amortized_priority(b) for b in cls.all_enabled_bindings(pn, transition, marking) ] ) / sum( [ t.weight * cls.amortized_priority(b) for t in pn.transitions if cls.is_enabled(pn, t, marking) for b in cls.all_enabled_bindings(pn, t, marking) ] ) )
[docs] @classmethod def sample_enabled_transition( cls, pn: N, marking: TCounter[P], seed: int = None ) -> Optional[Tuple[T, B]]: if seed is not None: random.seed(seed) bindings = list() probs = list() for t in [t for t in pn.transitions if cls.is_enabled(pn, t, marking)]: for b in cls.all_enabled_bindings(pn, t, marking): bindings.append((t, b)) probs.append(cls.probability_of_binding(pn, t, b, marking)) return ( None if len(bindings) == 0 else random.choices(bindings, probs)[0] )
[docs] @classmethod def probability_of_binding( cls, pn: N, transition: T, binding: StochasticArcWeightNet.Binding, marking: TCounter[P], ) -> float: """ Calculates the probability of firing a transition t under binding b in the net, in the given marking. Parameters ---------- :param pn: Petri net :param transition: transition to fire :param binding: binding to consider :param marking: marking to use Returns ------- :return: firing probability of transition t under binding b """ return ( 0.0 if transition not in pn.transitions or not cls.is_enabled_binding(pn, transition, binding, marking) else (transition.weight * cls.amortized_priority(binding)) / sum( [ t.weight * cls.amortized_priority(b) for t in pn.transitions if cls.is_enabled(pn, t, marking) for b in cls.all_enabled_bindings(pn, t, marking) ] ) )