from copy import copy
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any, Tuple
from numpy.random import choice, exponential
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.util.dt_parsing.variants import strpfromiso
[docs]
class Parameters(Enum):
NUM_TRACES = "num_traces"
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
CASE_ARRIVAL_RATE = "case_arrival_rate"
PERFORMANCE_DFG = "performance_dfg"
PARAM_ARTIFICIAL_START_ACTIVITY = constants.PARAM_ARTIFICIAL_START_ACTIVITY
PARAM_ARTIFICIAL_END_ACTIVITY = constants.PARAM_ARTIFICIAL_END_ACTIVITY
[docs]
def dict_based_choice(dct: Dict[str, float]) -> str:
"""
Performs a weighted choice, given a dictionary associating
a weight to each possible choice
Parameters
-----------------
dct
Dictionary associating a weight to each choice
Returns
-----------------
choice
Choice
"""
X = []
Y = []
summ = 0
for x, y in dct.items():
X.append(x)
Y.append(y)
summ += y
if summ > 0:
for i in range(len(Y)):
Y[i] = Y[i] / summ
return list(choice(X, 1, p=Y))[0]
[docs]
def apply(
frequency_dfg: Dict[Tuple[str, str], int],
start_activities: Dict[str, int],
end_activities: Dict[str, int],
parameters: Optional[Dict[Any, Any]] = None,
) -> EventLog:
"""
Simulates a log out with the transition probabilities provided by the frequency DFG,
and the time deltas provided by the performance DFG
Parameters
---------------
frequency_dfg
Frequency DFG
start_activities
Start activities
end_activities
End activities
parameters
Parameters of the algorithm, including:
- Parameters.NUM_TRACES: the number of traces of the simulated log
- Parameters.ACTIVITY_KEY: the activity key to be used in the simulated log
- Parameters.TIMESTAMP_KEY: the timestamp key to be used in the simulated log
- Parameters.CASE_ID_KEY: the case identifier key to be used in the simulated log
- Parameters.CASE_ARRIVAL_RATE: the average distance (in seconds) between the start of two cases (default: 1)
- Parameters.PERFORMANCE_DFG: (mandatory) the performance DFG that is used for the time deltas.
Returns
---------------
simulated_log
Simulated log
"""
if parameters is None:
parameters = {}
num_traces = exec_utils.get_param_value(
Parameters.NUM_TRACES, parameters, 1000
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, xes_constants.DEFAULT_TRACEID_KEY
)
case_arrival_rate = exec_utils.get_param_value(
Parameters.CASE_ARRIVAL_RATE, parameters, 1
)
performance_dfg = copy(
exec_utils.get_param_value(
Parameters.PERFORMANCE_DFG, parameters, None
)
)
frequency_dfg = copy(frequency_dfg)
artificial_start_activity = exec_utils.get_param_value(
Parameters.PARAM_ARTIFICIAL_START_ACTIVITY,
parameters,
constants.DEFAULT_ARTIFICIAL_START_ACTIVITY,
)
artificial_end_activity = exec_utils.get_param_value(
Parameters.PARAM_ARTIFICIAL_END_ACTIVITY,
parameters,
constants.DEFAULT_ARTIFICIAL_END_ACTIVITY,
)
for sa in start_activities:
frequency_dfg[(artificial_start_activity, sa)] = start_activities[sa]
performance_dfg[(artificial_start_activity, sa)] = 0
for ea in end_activities:
frequency_dfg[(ea, artificial_end_activity)] = end_activities[ea]
performance_dfg[(ea, artificial_end_activity)] = 0
choices = {}
for el in frequency_dfg:
if not el[0] in choices:
choices[el[0]] = {}
choices[el[0]][el[1]] = frequency_dfg[el]
if performance_dfg is None:
raise Exception(
"performance DFG simulation requires the Parameters.PERFORMANCE_DFG ('performance_dfg') parameter specification."
)
log = EventLog()
curr_st = 10000000
for i in range(num_traces):
curr_st += case_arrival_rate
curr_t = curr_st
trace = Trace(attributes={case_id_key: str(i)})
log.append(trace)
curr_act = artificial_start_activity
while True:
next_act = dict_based_choice(choices[curr_act])
if next_act == artificial_end_activity or next_act is None:
break
perf = performance_dfg[(curr_act, next_act)]
if isinstance(perf, dict):
perf = perf["mean"]
perf = 0 if perf == 0 else exponential(perf)
curr_t += perf
curr_act = next_act
eve = Event(
{
activity_key: curr_act,
timestamp_key: strpfromiso.fix_naivety(
datetime.fromtimestamp(curr_t)
),
}
)
trace.append(eve)
return log