Source code for pm4py.algo.discovery.dfg.variants.performance
from collections import Counter
from enum import Enum
from pm4py.util import constants, exec_utils
from pm4py.util import xes_constants as xes_util
from pm4py.util.business_hours import BusinessHours
from typing import Optional, Dict, Any, Union, Tuple
from pm4py.objects.log.obj import EventLog, EventStream
[docs]
class Parameters(Enum):
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
AGGREGATION_MEASURE = "aggregationMeasure"
BUSINESS_HOURS = "business_hours"
BUSINESS_HOUR_SLOTS = "business_hour_slots"
WORKCALENDAR = "workcalendar"
[docs]
def apply(
log: Union[EventLog, EventStream],
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> Dict[Tuple[str, str], float]:
return performance(log, parameters=parameters)
[docs]
def performance(
log: Union[EventLog, EventStream],
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> Dict[Tuple[str, str], float]:
"""
Measure performance between couples of attributes in the DFG graph
Parameters
----------
log
Log
parameters
Possible parameters passed to the algorithms:
aggregationMeasure -> performance aggregation measure (min, max, mean, median)
activity_key -> Attribute to use as activity
timestamp_key -> Attribute to use as timestamp
- Parameters.BUSINESS_HOURS => calculates the difference of time based on the business hours, not the total time.
Default: False
- Parameters.BUSINESS_HOURS_SLOTS =>
work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business
hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g.
[
(7 * 60 * 60, 17 * 60 * 60),
((24 + 7) * 60 * 60, (24 + 12) * 60 * 60),
((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),
]
meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00
Returns
-------
dfg
DFG graph
"""
if parameters is None:
parameters = {}
from statistics import mean, median, stdev
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_util.DEFAULT_NAME_KEY
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY,
parameters,
xes_util.DEFAULT_TIMESTAMP_KEY,
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes_util.DEFAULT_TIMESTAMP_KEY
)
aggregation_measure = exec_utils.get_param_value(
Parameters.AGGREGATION_MEASURE, parameters, "mean"
)
business_hours = exec_utils.get_param_value(
Parameters.BUSINESS_HOURS, parameters, False
)
business_hours_slots = exec_utils.get_param_value(
Parameters.BUSINESS_HOUR_SLOTS,
parameters,
constants.DEFAULT_BUSINESS_HOUR_SLOTS,
)
workcalendar = exec_utils.get_param_value(
Parameters.WORKCALENDAR,
parameters,
constants.DEFAULT_BUSINESS_HOURS_WORKCALENDAR,
)
if business_hours:
dfgs0 = map(
(
lambda t: [
(
(t[i - 1][activity_key], t[i][activity_key]),
max(
0,
BusinessHours(
t[i - 1][timestamp_key],
t[i][start_timestamp_key],
business_hour_slots=business_hours_slots,
workcalendar=workcalendar,
).get_seconds(),
),
)
for i in range(1, len(t))
]
),
log,
)
else:
dfgs0 = map(
(
lambda t: [
(
(t[i - 1][activity_key], t[i][activity_key]),
max(
0,
(
t[i][start_timestamp_key]
- t[i - 1][timestamp_key]
).total_seconds(),
),
)
for i in range(1, len(t))
]
),
log,
)
ret0 = {}
for el in dfgs0:
for couple in el:
if not couple[0] in ret0:
ret0[couple[0]] = []
ret0[couple[0]].append(couple[1])
ret = Counter()
for key in ret0:
if aggregation_measure == "median":
ret[key] = median(ret0[key])
elif aggregation_measure == "min":
ret[key] = min(ret0[key])
elif aggregation_measure == "max":
ret[key] = max(ret0[key])
elif aggregation_measure == "stdev":
ret[key] = stdev(ret0[key]) if len(ret0[key]) > 1 else 0
elif aggregation_measure == "sum":
ret[key] = sum(ret0[key])
elif aggregation_measure == "raw_values":
ret[key] = ret0[key]
elif aggregation_measure == "all":
ret[key] = {
"median": median(ret0[key]),
"min": min(ret0[key]),
"max": max(ret0[key]),
"stdev": stdev(ret0[key]) if len(ret0[key]) > 1 else 0,
"sum": sum(ret0[key]),
"mean": mean(ret0[key]),
}
else:
ret[key] = mean(ret0[key])
return ret