Source code for pm4py.streaming.algo.conformance.temporal.variants.classic

import logging
import sys
from copy import copy
from enum import Enum
from typing import Optional, Dict, Any, Tuple

from pm4py.objects.log.obj import Event
from pm4py.streaming.algo.interface import StreamingAlgorithm
from pm4py.streaming.util.dictio import generator
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.util import typing
import json


[docs] class Parameters(Enum): ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY ZETA = "zeta" DICT_VARIANT = "dict_variant" DICT_ID = "dict_id" CASE_DICT_ID = "case_dict_id" DEV_DICT_ID = "dev_dict_id"
[docs] class TemporalProfileStreamingConformance(StreamingAlgorithm): def __init__( self, temporal_profile: typing.TemporalProfile, parameters: Optional[Dict[Any, Any]] = None, ): """ Initialize the streaming conformance checking. Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020). Parameters --------------- temporal_profile Temporal profile parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY => the attribute to use as activity - Parameters.START_TIMESTAMP_KEY => the attribute to use as start timestamp - Parameters.TIMESTAMP_KEY => the attribute to use as timestamp - Parameters.ZETA => multiplier for the standard deviation - Parameters.CASE_ID_KEY => column to use as case identifier - Parameters.DICT_VARIANT => the variant of dictionary to use - Parameters.CASE_DICT_ID => the identifier of the case dictionary - Parameters.DEV_DICT_ID => the identifier of the deviations dictionary """ if parameters is None: parameters = {} self.temporal_profile = temporal_profile self.activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY ) self.timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY, ) self.start_timestamp_key = exec_utils.get_param_value( Parameters.START_TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY, ) self.case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME ) self.zeta = exec_utils.get_param_value( Parameters.ZETA, parameters, 6.0 ) parameters_gen = copy(parameters) dict_variant = exec_utils.get_param_value( Parameters.DICT_VARIANT, parameters, generator.Variants.THREAD_SAFE ) case_dict_id = exec_utils.get_param_value( Parameters.CASE_DICT_ID, parameters, 0 ) parameters_gen[Parameters.DICT_ID] = case_dict_id self.case_dictionary = generator.apply( variant=dict_variant, parameters=parameters_gen ) parameters_dev = copy(parameters) dev_dict_id = exec_utils.get_param_value( Parameters.DEV_DICT_ID, parameters, 1 ) parameters_dev[Parameters.DICT_ID] = dev_dict_id self.deviations_dict = generator.apply( variant=dict_variant, parameters=parameters_dev ) StreamingAlgorithm.__init__(self) def _process(self, event: Event): """ Checks the incoming event, and stores it in the cases dictionary Parameters --------------- event Event """ if ( self.case_id_key not in event or self.start_timestamp_key not in event or self.timestamp_key not in event or self.activity_key not in event ): self.message_event_is_not_complete(event) else: case = str(event[self.case_id_key]) start_timestamp = event[self.start_timestamp_key].timestamp() end_timestamp = event[self.timestamp_key].timestamp() activity = str(event[self.activity_key]) if case not in self.case_dictionary.keys(): self.case_dictionary[case] = json.dumps([]) self.deviations_dict[case] = json.dumps([]) ev_red = (case, start_timestamp, end_timestamp, activity) self.check_conformance(ev_red) this_case = json.loads(self.case_dictionary[case]) this_case.append(ev_red) self.case_dictionary[case] = json.dumps(this_case)
[docs] def check_conformance(self, event: Tuple[str, float, float, str]): """ Checks the conformance according to the temporal profile Parameters --------------- event Event """ case, start_timestamp, end_timestamp, activity = event prev_events = json.loads(self.case_dictionary[case]) for i in range(len(prev_events)): ( prev_case, prev_start_timestamp, prev_end_timestamp, prev_activity, ) = prev_events[i] if start_timestamp >= prev_end_timestamp: if (prev_activity, activity) in self.temporal_profile: diff = start_timestamp - prev_end_timestamp mean = self.temporal_profile[(prev_activity, activity)][0] std = self.temporal_profile[(prev_activity, activity)][1] if ( diff < mean - self.zeta * std or diff > mean + self.zeta * std ): this_zeta = ( abs(diff - mean) / std if std > 0 else sys.maxsize ) dev_descr = ( case, prev_activity, activity, diff, this_zeta, ) this_dev = json.loads(self.deviations_dict[case]) this_dev.append(dev_descr) self.deviations_dict[case] = json.dumps(this_dev) self.message_deviation(dev_descr)
[docs] def message_event_is_not_complete(self, event: Event): """ Method that is called when the event does not contain the case, or the activity, or the timestamp Parameters -------------- event Incoming event """ logging.error( "case or activities or timestamp are none! " + str(event) )
[docs] def message_deviation(self, dev_descr: Tuple[str, str, str, float, float]): """ Method that is called to signal a deviation according to the temporal profile Parameters -------------- dev_descr Description of the deviation to be printed """ logging.error( "the temporal profile is broken in the following setting: " + str(dev_descr) )
def _current_result(self) -> typing.TemporalProfileStreamingConfResults: """ Gets the current deviations identified by conformance checking Returns ------------- deviations_dict Deviations dictionary """ dev_dict = {} for x in self.deviations_dict.keys(): y = json.loads(self.deviations_dict[x]) if y: dev_dict[x] = y return dev_dict
[docs] def apply( temporal_profile: typing.TemporalProfile, parameters: Optional[Dict[Any, Any]] = None, ): """ Initialize the streaming conformance checking. Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020). Parameters --------------- temporal_profile Temporal profile parameters Parameters of the algorithm, including: - Parameters.ACTIVITY_KEY => the attribute to use as activity - Parameters.START_TIMESTAMP_KEY => the attribute to use as start timestamp - Parameters.TIMESTAMP_KEY => the attribute to use as timestamp - Parameters.ZETA => multiplier for the standard deviation - Parameters.CASE_ID_KEY => column to use as case identifier - Parameters.DICT_VARIANT => the variant of dictionary to use - Parameters.CASE_DICT_ID => the identifier of the case dictionary - Parameters.DEV_DICT_ID => the identifier of the deviations dictionary """ if parameters is None: parameters = {} return TemporalProfileStreamingConformance( temporal_profile, parameters=parameters )