Source code for pm4py.algo.simulation.playout.petri_net.variants.extensive
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import datetime
import sys
from collections import Counter
from enum import Enum
from typing import Optional, Dict, Any, Union
from pm4py.objects import petri_net
from pm4py.objects.log import obj as log_instance
from pm4py.objects.log.obj import EventLog
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util.dt_parsing.variants import strpfromiso
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import xes_constants
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
MAX_TRACE_LENGTH = "maxTraceLength"
RETURN_ELEMENTS = "return_elements"
MAX_MARKING_OCC = "max_marking_occ"
PETRI_SEMANTICS = "petri_semantics"
POSITION_MARKING = 0
POSITION_TRACE = 1
POSITION_ELEMENTS = 2
[docs]
def apply(
net: PetriNet,
initial_marking: Marking,
final_marking: Marking = None,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> EventLog:
"""
Do the playout of a Petrinet generating a log (extensive search; stop at the maximum
trace length specified
Parameters
-----------
net
Petri net to play-out
initial_marking
Initial marking of the Petri net
final_marking
If provided, the final marking of the Petri net
parameters
Parameters of the algorithm:
Parameters.MAX_TRACE_LENGTH -> Maximum trace length
Parameters.PETRI_SEMANTICS -> Petri net semantics
"""
if parameters is None:
parameters = {}
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, xes_constants.DEFAULT_TRACEID_KEY
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
max_trace_length = exec_utils.get_param_value(
Parameters.MAX_TRACE_LENGTH, parameters, 10
)
return_elements = exec_utils.get_param_value(
Parameters.RETURN_ELEMENTS, parameters, False
)
max_marking_occ = exec_utils.get_param_value(
Parameters.MAX_MARKING_OCC, parameters, sys.maxsize
)
semantics = exec_utils.get_param_value(
Parameters.PETRI_SEMANTICS,
parameters,
petri_net.semantics.ClassicSemantics(),
)
# assigns to each event an increased timestamp from 1970
curr_timestamp = 10000000
feasible_elements = []
to_visit = [(initial_marking, (), ())]
visited = set()
while len(to_visit) > 0:
state = to_visit.pop(0)
m = state[POSITION_MARKING]
trace = state[POSITION_TRACE]
elements = state[POSITION_ELEMENTS]
if (m, trace) in visited:
continue
visited.add((m, trace))
en_t = semantics.enabled_transitions(net, m)
if (final_marking is not None and m == final_marking) or (
final_marking is None and len(en_t) == 0
):
if len(trace) <= max_trace_length:
feasible_elements.append(elements)
for t in en_t:
new_elements = elements + (m,)
new_elements = new_elements + (t,)
counter_elements = Counter(new_elements)
if counter_elements[m] > max_marking_occ:
continue
new_m = semantics.weak_execute(t, net, m)
if t.label is not None:
new_trace = trace + (t.label,)
else:
new_trace = trace
new_state = (new_m, new_trace, new_elements)
if new_state in visited or len(new_trace) > max_trace_length:
continue
to_visit.append(new_state)
if return_elements:
return feasible_elements
log = log_instance.EventLog()
for elements in feasible_elements:
log_trace = log_instance.Trace()
log_trace.attributes[case_id_key] = str(len(log))
activities = [
x.label
for x in elements
if type(x) is PetriNet.Transition and x.label is not None
]
for act in activities:
curr_timestamp = curr_timestamp + 1
log_trace.append(
log_instance.Event(
{
activity_key: act,
timestamp_key: strpfromiso.fix_naivety(
datetime.datetime.fromtimestamp(curr_timestamp)
),
}
)
)
log.append(log_trace)
return log