'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.util.business_hours import BusinessHours
from pm4py.objects.log.util import sorting
from pm4py.util import constants
from pm4py.util import xes_constants as xes
from pm4py.objects.log.obj import EventLog, Trace, Event
from copy import copy
from enum import Enum
from pm4py.util import exec_utils
[docs]
class Parameters(Enum):
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
TRANSITION_KEY = constants.PARAMETER_CONSTANT_TRANSITION_KEY
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
LIFECYCLE_INSTANCE_KEY = "pm4py:param:lifecycle:instance:key"
BUSINESS_HOURS = "business_hours"
BUSINESS_HOUR_SLOTS = "business_hour_slots"
WORKCALENDAR = "workcalendar"
[docs]
def to_interval(log, parameters=None):
"""
Converts a log to interval format (e.g. an event has two timestamps)
from lifecycle format (an event has only a timestamp, and a transition lifecycle)
Parameters
-------------
log
Log (expressed in the lifecycle format)
parameters
Possible parameters of the method (activity, timestamp key, start timestamp key, transition ...)
Returns
-------------
log
Interval event log
"""
if parameters is None:
parameters = {}
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY,
parameters,
xes.DEFAULT_START_TIMESTAMP_KEY,
)
transition_key = exec_utils.get_param_value(
Parameters.TRANSITION_KEY, parameters, xes.DEFAULT_TRANSITION_KEY
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY
)
lifecycle_instance_key = exec_utils.get_param_value(
Parameters.LIFECYCLE_INSTANCE_KEY, parameters, xes.DEFAULT_INSTANCE_KEY
)
business_hours = exec_utils.get_param_value(
Parameters.BUSINESS_HOURS, parameters, False
)
business_hours_slots = exec_utils.get_param_value(
Parameters.BUSINESS_HOUR_SLOTS,
parameters,
constants.DEFAULT_BUSINESS_HOUR_SLOTS,
)
if log is not None and len(log) > 0:
if (
"PM4PY_TYPE" in log.attributes
and log.attributes["PM4PY_TYPE"] == "interval"
):
return log
if log[0] is not None and len(log[0]) > 0:
first_event = log[0][0]
if start_timestamp_key in first_event:
return log
new_log = EventLog(
attributes=copy(log.attributes),
extensions=copy(log.extensions),
classifiers=copy(log.classifiers),
omni_present=copy(log.omni_present),
properties=copy(log.properties),
)
new_log.attributes["PM4PY_TYPE"] = "interval"
new_log.properties[
constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
] = xes.DEFAULT_START_TIMESTAMP_KEY
for trace in log:
new_trace = Trace()
for attr in trace.attributes:
new_trace.attributes[attr] = trace.attributes[attr]
activities_start = {}
for event in trace:
activity = event[activity_key]
instance = (
event[lifecycle_instance_key]
if lifecycle_instance_key in event
else None
)
activity = (activity, instance)
transition = (
event[transition_key]
if transition_key in event
else "complete"
)
timestamp = event[timestamp_key]
if transition.lower() == "start":
if activity not in activities_start:
activities_start[activity] = list()
activities_start[activity].append(event)
elif transition.lower() == "complete":
start_event = None
start_timestamp = event[timestamp_key]
if (
activity in activities_start
and len(activities_start[activity]) > 0
):
start_event = activities_start[activity].pop(0)
start_timestamp = start_event[timestamp_key]
new_event = Event()
for attr in event:
if (
not attr == timestamp_key
and not attr == transition_key
):
new_event[attr] = event[attr]
if start_event is not None:
for attr in start_event:
if (
not attr == timestamp_key
and not attr == transition_key
):
new_event["@@startevent_" + attr] = (
start_event[attr]
)
new_event[start_timestamp_key] = start_timestamp
new_event[timestamp_key] = timestamp
new_event["@@duration"] = (
timestamp - start_timestamp
).total_seconds()
if business_hours:
bh = BusinessHours(
start_timestamp,
timestamp,
business_hour_slots=business_hours_slots,
)
new_event["@@approx_bh_duration"] = bh.get_seconds()
new_trace.append(new_event)
new_trace = sorting.sort_timestamp_trace(
new_trace, start_timestamp_key
)
new_log.append(new_trace)
return new_log
return log
[docs]
def to_lifecycle(log, parameters=None):
"""
Converts a log from interval format (e.g. an event has two timestamps)
to lifecycle format (an event has only a timestamp, and a transition lifecycle)
Parameters
-------------
log
Log (expressed in the interval format)
parameters
Possible parameters of the method (activity, timestamp key, start timestamp key, transition ...)
Returns
-------------
log
Lifecycle event log
"""
if parameters is None:
parameters = {}
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY,
parameters,
xes.DEFAULT_START_TIMESTAMP_KEY,
)
transition_key = exec_utils.get_param_value(
Parameters.TRANSITION_KEY, parameters, xes.DEFAULT_TRANSITION_KEY
)
if log is not None and len(log) > 0:
if (
"PM4PY_TYPE" in log.attributes
and log.attributes["PM4PY_TYPE"] == "lifecycle"
):
return log
if log[0] is not None and len(log[0]) > 0:
first_event = log[0][0]
if transition_key in first_event:
return log
new_log = EventLog(
attributes=copy(log.attributes),
extensions=copy(log.extensions),
classifiers=copy(log.classifiers),
omni_present=copy(log.omni_present),
properties=copy(log.properties),
)
new_log.attributes["PM4PY_TYPE"] = "lifecycle"
for trace in log:
new_trace = Trace()
for attr in trace.attributes:
new_trace.attributes[attr] = trace.attributes[attr]
list_events = []
for index, event in enumerate(trace):
new_event_start = Event()
new_event_complete = Event()
for attr in event:
if (
not attr == timestamp_key
and not attr == start_timestamp_key
):
new_event_start[attr] = event[attr]
new_event_complete[attr] = event[attr]
new_event_start[timestamp_key] = event[start_timestamp_key]
new_event_start[transition_key] = "start"
new_event_start["@@custom_lif_id"] = 0
new_event_start["@@origin_ev_idx"] = index
new_event_complete[timestamp_key] = event[timestamp_key]
new_event_complete[transition_key] = "complete"
new_event_complete["@@custom_lif_id"] = 1
new_event_complete["@@origin_ev_idx"] = index
list_events.append(new_event_start)
list_events.append(new_event_complete)
list_events = sorted(
list_events,
key=lambda x: (
x[timestamp_key],
x["@@origin_ev_idx"],
x["@@custom_lif_id"],
),
)
for ev in list_events:
new_trace.append(ev)
new_log.append(new_trace)
return new_log
return log
[docs]
def assign_lead_cycle_time(log, parameters=None):
"""
Assigns the lead and cycle time to an interval log
Parameters
-------------
log
Interval log
parameters
Parameters of the algorithm, including: start_timestamp_key, timestamp_key, business_hour_slots
"""
if parameters is None:
parameters = {}
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY,
parameters,
xes.DEFAULT_START_TIMESTAMP_KEY,
)
business_hours_slots = exec_utils.get_param_value(
Parameters.BUSINESS_HOUR_SLOTS,
parameters,
constants.DEFAULT_BUSINESS_HOUR_SLOTS,
)
interval_log = to_interval(log, parameters=parameters)
for trace in interval_log:
approx_partial_lead_time = 0
approx_partial_cycle_time = 0
approx_wasted_time = 0
max_et = None
max_et_seconds = 0
for i in range(len(trace)):
this_wasted_time = 0
st = trace[i][start_timestamp_key]
st_seconds = st.timestamp()
et = trace[i][timestamp_key]
et_seconds = et.timestamp()
if max_et_seconds > 0 and st_seconds > max_et_seconds:
bh_unworked = BusinessHours(
max_et, st, business_hour_slots=business_hours_slots
)
unworked_sec = bh_unworked.get_seconds()
approx_partial_lead_time = (
approx_partial_lead_time + unworked_sec
)
approx_wasted_time = approx_wasted_time + unworked_sec
this_wasted_time = unworked_sec
if st_seconds > max_et_seconds:
bh = BusinessHours(
st, et, business_hour_slots=business_hours_slots
)
approx_bh_duration = bh.get_seconds()
approx_partial_cycle_time = (
approx_partial_cycle_time + approx_bh_duration
)
approx_partial_lead_time = (
approx_partial_lead_time + approx_bh_duration
)
elif st_seconds < max_et_seconds and et_seconds > max_et_seconds:
bh = BusinessHours(
max_et, et, business_hour_slots=business_hours_slots
)
approx_bh_duration = bh.get_seconds()
approx_partial_cycle_time = (
approx_partial_cycle_time + approx_bh_duration
)
approx_partial_lead_time = (
approx_partial_lead_time + approx_bh_duration
)
if et_seconds > max_et_seconds:
max_et_seconds = et_seconds
max_et = et
ratio_cycle_lead_time = 1
if approx_partial_lead_time > 0:
ratio_cycle_lead_time = (
approx_partial_cycle_time / approx_partial_lead_time
)
trace[i][
"@@approx_bh_partial_cycle_time"
] = approx_partial_cycle_time
trace[i][
"@@approx_bh_partial_lead_time"
] = approx_partial_lead_time
trace[i]["@@approx_bh_overall_wasted_time"] = approx_wasted_time
trace[i]["@@approx_bh_this_wasted_time"] = this_wasted_time
trace[i][
"@approx_bh_ratio_cycle_lead_time"
] = ratio_cycle_lead_time
return interval_log