Source code for pm4py.objects.log.util.interval_lifecycle

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.util.business_hours import BusinessHours
from pm4py.objects.log.util import sorting
from pm4py.util import constants
from pm4py.util import xes_constants as xes
from pm4py.objects.log.obj import EventLog, Trace, Event
from copy import copy
from enum import Enum
from pm4py.util import exec_utils


[docs] class Parameters(Enum): TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TRANSITION_KEY = constants.PARAMETER_CONSTANT_TRANSITION_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY LIFECYCLE_INSTANCE_KEY = "pm4py:param:lifecycle:instance:key" BUSINESS_HOURS = "business_hours" BUSINESS_HOUR_SLOTS = "business_hour_slots" WORKCALENDAR = "workcalendar"
[docs] def to_interval(log, parameters=None): """ Converts a log to interval format (e.g. an event has two timestamps) from lifecycle format (an event has only a timestamp, and a transition lifecycle) Parameters ------------- log Log (expressed in the lifecycle format) parameters Possible parameters of the method (activity, timestamp key, start timestamp key, transition ...) Returns ------------- log Interval event log """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY ) start_timestamp_key = exec_utils.get_param_value( Parameters.START_TIMESTAMP_KEY, parameters, xes.DEFAULT_START_TIMESTAMP_KEY, ) transition_key = exec_utils.get_param_value( Parameters.TRANSITION_KEY, parameters, xes.DEFAULT_TRANSITION_KEY ) activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes.DEFAULT_NAME_KEY ) lifecycle_instance_key = exec_utils.get_param_value( Parameters.LIFECYCLE_INSTANCE_KEY, parameters, xes.DEFAULT_INSTANCE_KEY ) business_hours = exec_utils.get_param_value( Parameters.BUSINESS_HOURS, parameters, False ) business_hours_slots = exec_utils.get_param_value( Parameters.BUSINESS_HOUR_SLOTS, parameters, constants.DEFAULT_BUSINESS_HOUR_SLOTS, ) if log is not None and len(log) > 0: if ( "PM4PY_TYPE" in log.attributes and log.attributes["PM4PY_TYPE"] == "interval" ): return log if log[0] is not None and len(log[0]) > 0: first_event = log[0][0] if start_timestamp_key in first_event: return log new_log = EventLog( attributes=copy(log.attributes), extensions=copy(log.extensions), classifiers=copy(log.classifiers), omni_present=copy(log.omni_present), properties=copy(log.properties), ) new_log.attributes["PM4PY_TYPE"] = "interval" new_log.properties[ constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY ] = xes.DEFAULT_START_TIMESTAMP_KEY for trace in log: new_trace = Trace() for attr in trace.attributes: new_trace.attributes[attr] = trace.attributes[attr] activities_start = {} for event in trace: activity = event[activity_key] instance = ( event[lifecycle_instance_key] if lifecycle_instance_key in event else None ) activity = (activity, instance) transition = ( event[transition_key] if transition_key in event else "complete" ) timestamp = event[timestamp_key] if transition.lower() == "start": if activity not in activities_start: activities_start[activity] = list() activities_start[activity].append(event) elif transition.lower() == "complete": start_event = None start_timestamp = event[timestamp_key] if ( activity in activities_start and len(activities_start[activity]) > 0 ): start_event = activities_start[activity].pop(0) start_timestamp = start_event[timestamp_key] new_event = Event() for attr in event: if ( not attr == timestamp_key and not attr == transition_key ): new_event[attr] = event[attr] if start_event is not None: for attr in start_event: if ( not attr == timestamp_key and not attr == transition_key ): new_event["@@startevent_" + attr] = ( start_event[attr] ) new_event[start_timestamp_key] = start_timestamp new_event[timestamp_key] = timestamp new_event["@@duration"] = ( timestamp - start_timestamp ).total_seconds() if business_hours: bh = BusinessHours( start_timestamp, timestamp, business_hour_slots=business_hours_slots, ) new_event["@@approx_bh_duration"] = bh.get_seconds() new_trace.append(new_event) new_trace = sorting.sort_timestamp_trace( new_trace, start_timestamp_key ) new_log.append(new_trace) return new_log return log
[docs] def to_lifecycle(log, parameters=None): """ Converts a log from interval format (e.g. an event has two timestamps) to lifecycle format (an event has only a timestamp, and a transition lifecycle) Parameters ------------- log Log (expressed in the interval format) parameters Possible parameters of the method (activity, timestamp key, start timestamp key, transition ...) Returns ------------- log Lifecycle event log """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY ) start_timestamp_key = exec_utils.get_param_value( Parameters.START_TIMESTAMP_KEY, parameters, xes.DEFAULT_START_TIMESTAMP_KEY, ) transition_key = exec_utils.get_param_value( Parameters.TRANSITION_KEY, parameters, xes.DEFAULT_TRANSITION_KEY ) if log is not None and len(log) > 0: if ( "PM4PY_TYPE" in log.attributes and log.attributes["PM4PY_TYPE"] == "lifecycle" ): return log if log[0] is not None and len(log[0]) > 0: first_event = log[0][0] if transition_key in first_event: return log new_log = EventLog( attributes=copy(log.attributes), extensions=copy(log.extensions), classifiers=copy(log.classifiers), omni_present=copy(log.omni_present), properties=copy(log.properties), ) new_log.attributes["PM4PY_TYPE"] = "lifecycle" for trace in log: new_trace = Trace() for attr in trace.attributes: new_trace.attributes[attr] = trace.attributes[attr] list_events = [] for index, event in enumerate(trace): new_event_start = Event() new_event_complete = Event() for attr in event: if ( not attr == timestamp_key and not attr == start_timestamp_key ): new_event_start[attr] = event[attr] new_event_complete[attr] = event[attr] new_event_start[timestamp_key] = event[start_timestamp_key] new_event_start[transition_key] = "start" new_event_start["@@custom_lif_id"] = 0 new_event_start["@@origin_ev_idx"] = index new_event_complete[timestamp_key] = event[timestamp_key] new_event_complete[transition_key] = "complete" new_event_complete["@@custom_lif_id"] = 1 new_event_complete["@@origin_ev_idx"] = index list_events.append(new_event_start) list_events.append(new_event_complete) list_events = sorted( list_events, key=lambda x: ( x[timestamp_key], x["@@origin_ev_idx"], x["@@custom_lif_id"], ), ) for ev in list_events: new_trace.append(ev) new_log.append(new_trace) return new_log return log
[docs] def assign_lead_cycle_time(log, parameters=None): """ Assigns the lead and cycle time to an interval log Parameters ------------- log Interval log parameters Parameters of the algorithm, including: start_timestamp_key, timestamp_key, business_hour_slots """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes.DEFAULT_TIMESTAMP_KEY ) start_timestamp_key = exec_utils.get_param_value( Parameters.START_TIMESTAMP_KEY, parameters, xes.DEFAULT_START_TIMESTAMP_KEY, ) business_hours_slots = exec_utils.get_param_value( Parameters.BUSINESS_HOUR_SLOTS, parameters, constants.DEFAULT_BUSINESS_HOUR_SLOTS, ) interval_log = to_interval(log, parameters=parameters) for trace in interval_log: approx_partial_lead_time = 0 approx_partial_cycle_time = 0 approx_wasted_time = 0 max_et = None max_et_seconds = 0 for i in range(len(trace)): this_wasted_time = 0 st = trace[i][start_timestamp_key] st_seconds = st.timestamp() et = trace[i][timestamp_key] et_seconds = et.timestamp() if max_et_seconds > 0 and st_seconds > max_et_seconds: bh_unworked = BusinessHours( max_et, st, business_hour_slots=business_hours_slots ) unworked_sec = bh_unworked.get_seconds() approx_partial_lead_time = ( approx_partial_lead_time + unworked_sec ) approx_wasted_time = approx_wasted_time + unworked_sec this_wasted_time = unworked_sec if st_seconds > max_et_seconds: bh = BusinessHours( st, et, business_hour_slots=business_hours_slots ) approx_bh_duration = bh.get_seconds() approx_partial_cycle_time = ( approx_partial_cycle_time + approx_bh_duration ) approx_partial_lead_time = ( approx_partial_lead_time + approx_bh_duration ) elif st_seconds < max_et_seconds and et_seconds > max_et_seconds: bh = BusinessHours( max_et, et, business_hour_slots=business_hours_slots ) approx_bh_duration = bh.get_seconds() approx_partial_cycle_time = ( approx_partial_cycle_time + approx_bh_duration ) approx_partial_lead_time = ( approx_partial_lead_time + approx_bh_duration ) if et_seconds > max_et_seconds: max_et_seconds = et_seconds max_et = et ratio_cycle_lead_time = 1 if approx_partial_lead_time > 0: ratio_cycle_lead_time = ( approx_partial_cycle_time / approx_partial_lead_time ) trace[i][ "@@approx_bh_partial_cycle_time" ] = approx_partial_cycle_time trace[i][ "@@approx_bh_partial_lead_time" ] = approx_partial_lead_time trace[i]["@@approx_bh_overall_wasted_time"] = approx_wasted_time trace[i]["@@approx_bh_this_wasted_time"] = this_wasted_time trace[i][ "@approx_bh_ratio_cycle_lead_time" ] = ratio_cycle_lead_time return interval_log