Source code for pm4py.algo.simulation.montecarlo.variants.petri_semaph_fifo

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.statistics.traces.generic.log import case_arrival
from pm4py.algo.simulation.montecarlo.utils import replay
from pm4py.objects.petri_net.semantics import enabled_transitions, weak_execute
from pm4py.objects.log.obj import Trace, Event
from pm4py.util import xes_constants, exec_utils, constants
from pm4py.objects.stochastic_petri import utils as stochastic_utils
from pm4py.util.dt_parsing.variants import strpfromiso
from pm4py.objects.petri_net.obj import PetriNet, Marking

from intervaltree import Interval, IntervalTree
from statistics import median
import datetime
import heapq
from enum import Enum


# -- Retained original Parameters class --
[docs] class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY TOKEN_REPLAY_VARIANT = "token_replay_variant" PARAM_NUM_SIMULATIONS = "num_simulations" PARAM_FORCE_DISTRIBUTION = "force_distribution" PARAM_ENABLE_DIAGNOSTICS = "enable_diagnostics" PARAM_DIAGN_INTERVAL = "diagn_interval" PARAM_CASE_ARRIVAL_RATIO = "case_arrival_ratio" PARAM_PROVIDED_SMAP = "provided_stochastic_map" PARAM_MAP_RESOURCES_PER_PLACE = "map_resources_per_place" PARAM_DEFAULT_NUM_RESOURCES_PER_PLACE = "default_num_resources_per_place" PARAM_SMALL_SCALE_FACTOR = "small_scale_factor" PARAM_MAX_THREAD_EXECUTION_TIME = "max_thread_exec_time"
# -- Simulation Case Generator (replacing threads) --
[docs] def simulate_case(case_id, net, im, fm, smap, start_time, places_interval_trees, transitions_interval_trees, cases_ex_time, list_cases, small_scale_factor): """ Generator that simulates a single case. Instead of sleeping, it yields its current virtual (simulation) time. """ current_time = start_time # Use a copy of the initial marking if necessary current_marking = im.copy() acquired_places = set() source = list(im)[0] sink = list(fm)[0] # Use a simple resource counter instead of threading.Semaphore. if hasattr(source, "sem_value"): if source.sem_value > 0: source.sem_value -= 1 else: source.sem_value = 0 source.assigned_time.append(current_time) first_event = None last_event = None while not (fm <= current_marking) and (len(enabled_transitions(net, current_marking)) > 0): et = list(enabled_transitions(net, current_marking)) ct = stochastic_utils.pick_transition(et, smap) simulated_execution_plus_waiting_time = smap[ct].get_value() if ct in smap else 0.0 while simulated_execution_plus_waiting_time < 0: simulated_execution_plus_waiting_time = smap[ct].get_value() if ct in smap else 0.0 waiting_time = 0 # For each output arc, attempt to "acquire" the target resource. for arc in ct.out_arcs: place = arc.target sem_value = getattr(place, "sem_value", 0) if sem_value > 0: place.sem_value -= 1 acquired_places.add(place) else: if place.assigned_time: waiting_interval = place.assigned_time.pop(0) - current_time waiting_time = max(waiting_time, waiting_interval) if waiting_time > 0: transitions_interval_trees[ct].add(Interval(current_time, current_time + waiting_time)) execution_time = max(simulated_execution_plus_waiting_time - waiting_time, 0) current_time += waiting_time + execution_time for arc in ct.out_arcs: place = arc.target place.assigned_time.append(current_time) place.assigned_time.sort() current_marking = weak_execute(ct, current_marking) if ct.label is not None: event = Event({ xes_constants.DEFAULT_NAME_KEY: ct.label, xes_constants.DEFAULT_TIMESTAMP_KEY: strpfromiso.fix_naivety( datetime.datetime.fromtimestamp(current_time) ) }) last_event = event if first_event is None: first_event = event list_cases[case_id].append(event) # For each input arc, record occupancy and "release" the resource. for arc in ct.in_arcs: place = arc.source if place.assigned_time: p_ex_time = place.assigned_time.pop(0) if current_time - p_ex_time > 0: places_interval_trees[place].add(Interval(p_ex_time, current_time)) place.assigned_time.append(current_time) place.assigned_time.sort() place.sem_value += 1 # Yield the current simulation time to allow scheduler interleaving. yield current_time if first_event is not None and last_event is not None: exec_duration = ( last_event[xes_constants.DEFAULT_TIMESTAMP_KEY].timestamp() - first_event[xes_constants.DEFAULT_TIMESTAMP_KEY].timestamp() ) cases_ex_time.append(exec_duration) else: cases_ex_time.append(0) # Release any remaining acquired resources. for place in set(current_marking).union(acquired_places): place.sem_value += 1
# -- Scheduler to interleave simulation generators --
[docs] def run_simulation_generators(sim_gens): """ Runs the simulation generators concurrently (in virtual time) using a priority queue. """ queue = [] for case_id, gen in sim_gens.items(): try: next_time = next(gen) heapq.heappush(queue, (next_time, case_id, gen)) except StopIteration: continue while queue: current_time, case_id, gen = heapq.heappop(queue) try: next_time = gen.send(current_time) heapq.heappush(queue, (next_time, case_id, gen)) except StopIteration: continue
# -- Main apply function --
[docs] def apply(log, net, im, fm, parameters=None): """ Performs a Monte Carlo simulation of an accepting Petri net using a generator-based scheduler. Parameters: log : Event log. net : Petri net. im : Initial marking. fm : Final marking. parameters : Dictionary of simulation parameters. Returns: A tuple (simulated_log, simulation_result) where simulation_result is a dictionary: simulation_result = { "output_places_interval_trees": places_interval_trees, "output_transitions_interval_trees": transitions_interval_trees_named, "cases_ex_time": cases_ex_time, "median_cases_ex_time": median(cases_ex_time), "case_arrival_ratio": case_arrival_ratio, "total_cases_time": max_timestamp - min_timestamp, } """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY ) no_simulations = exec_utils.get_param_value( Parameters.PARAM_NUM_SIMULATIONS, parameters, 100 ) force_distribution = exec_utils.get_param_value( Parameters.PARAM_FORCE_DISTRIBUTION, parameters, None ) case_arrival_ratio = exec_utils.get_param_value( Parameters.PARAM_CASE_ARRIVAL_RATIO, parameters, None ) smap = exec_utils.get_param_value( Parameters.PARAM_PROVIDED_SMAP, parameters, None ) resources_per_places = exec_utils.get_param_value( Parameters.PARAM_MAP_RESOURCES_PER_PLACE, parameters, None ) default_num_resources_per_places = exec_utils.get_param_value( Parameters.PARAM_DEFAULT_NUM_RESOURCES_PER_PLACE, parameters, 1 ) small_scale_factor = exec_utils.get_param_value( Parameters.PARAM_SMALL_SCALE_FACTOR, parameters, 864000 ) # The PARAM_MAX_THREAD_EXECUTION_TIME parameter is not used in this single-threaded version. if case_arrival_ratio is None: case_arrival_ratio = case_arrival.get_case_arrival_avg(log, parameters=parameters) places_interval_trees = {} transitions_interval_trees = {} cases_ex_time = [] list_cases = {} # Initialize places: assign resource counters and an empty assigned_time list. for place in net.places: if resources_per_places is not None and place in resources_per_places: place.sem_value = resources_per_places[place] else: place.sem_value = default_num_resources_per_places place.assigned_time = [] places_interval_trees[place] = IntervalTree() for trans in net.transitions: transitions_interval_trees[trans] = IntervalTree() if smap is None: if force_distribution is not None: smap = replay.get_map_from_log_and_net( log, net, im, fm, force_distribution=force_distribution, parameters=parameters ) else: smap = replay.get_map_from_log_and_net( log, net, im, fm, parameters=parameters ) start_time = 1000000 # Avoid using 0 for timestamp issues. sim_generators = {} for i in range(no_simulations): list_cases[i] = Trace() sim_generators[i] = simulate_case( i, net, im, fm, smap, start_time, places_interval_trees, transitions_interval_trees, cases_ex_time, list_cases, small_scale_factor ) start_time += case_arrival_ratio run_simulation_generators(sim_generators) valid_ids = [i for i in range(no_simulations) if i in list_cases] valid_traces = [list_cases[i] for i in valid_ids] min_timestamp = valid_traces[0][0][timestamp_key].timestamp() max_timestamp = max( event[timestamp_key].timestamp() for trace in valid_traces for event in trace ) transitions_interval_trees_named = { t.name: tree for t, tree in transitions_interval_trees.items() } simulated_log = log.__class__(valid_traces) simulation_result = { "output_places_interval_trees": places_interval_trees, "output_transitions_interval_trees": transitions_interval_trees_named, "cases_ex_time": cases_ex_time, "median_cases_ex_time": median(cases_ex_time), "case_arrival_ratio": case_arrival_ratio, "total_cases_time": max_timestamp - min_timestamp, } return simulated_log, simulation_result