Source code for pm4py.streaming.algo.discovery.dfg.variants.frequency

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from collections import Counter
from pm4py.util import exec_utils, constants, xes_constants
from pm4py.streaming.util.dictio import generator
from pm4py.streaming.algo.interface import StreamingAlgorithm
from enum import Enum
from copy import copy
import logging


[docs] class Parameters(Enum): DICT_VARIANT = "dict_variant" DICT_ID = "dict_id" CASE_DICT_ID = "case_dict_id" DFG_DICT_ID = "dfg_dict_id" ACT_DICT_ID = "act_dict_id" START_ACT_DICT_ID = "start_act_dict_id" ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
[docs] class StreamingDfgDiscovery(StreamingAlgorithm): def __init__(self, parameters=None): """ Initialize the StreamingDFGDiscovery object Parameters --------------- parameters of the algorithm, including: - Parameters.ACTIVITY_KEY: the key of the event to use as activity - Parameters.CASE_ID_KEY: the key of the event to use as case identifier """ if parameters is None: parameters = {} self.parameters = parameters self.activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY ) self.case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME ) self.build_dictionaries(parameters) StreamingAlgorithm.__init__(self)
[docs] def build_dictionaries(self, parameters): """ Builds the dictionaries that are needed by the discovery operation Parameters --------------- parameters Parameters: - Parameters.DICT_VARIANT: type of dictionary to use - Parameters.CASE_DICT_ID: identifier of the case dictionary (hosting the last activity of a case) (0) - Parameters.DFG_DICT_ID: identifier of the DFG dictionary (1) - Parameters.ACT_ID: identifier of the dictionary hosting the count of the activities (2) - Parameters.START_ACT_DICT_ID: identifier of the dictionary hosting the count of the start activities (3) """ dict_variant = exec_utils.get_param_value( Parameters.DICT_VARIANT, parameters, generator.Variants.THREAD_SAFE ) case_dict_id = exec_utils.get_param_value( Parameters.CASE_DICT_ID, parameters, 0 ) dfg_dict_id = exec_utils.get_param_value( Parameters.DFG_DICT_ID, parameters, 1 ) act_dict_id = exec_utils.get_param_value( Parameters.ACT_DICT_ID, parameters, 2 ) start_act_dict_id = exec_utils.get_param_value( Parameters.START_ACT_DICT_ID, parameters, 3 ) parameters_case_dict = copy(parameters) parameters_case_dict[Parameters.DICT_ID] = case_dict_id parameters_dfg = copy(parameters) parameters_dfg[Parameters.DICT_ID] = dfg_dict_id parameters_activities = copy(parameters) parameters_activities[Parameters.DICT_ID] = act_dict_id parameters_start_activities = copy(parameters) parameters_start_activities[Parameters.DICT_ID] = start_act_dict_id self.case_dict = generator.apply( variant=dict_variant, parameters=parameters_case_dict ) self.dfg = generator.apply( variant=dict_variant, parameters=parameters_dfg ) self.activities = generator.apply( variant=dict_variant, parameters=parameters_activities ) self.start_activities = generator.apply( variant=dict_variant, parameters=parameters_start_activities )
[docs] def event_without_activity_or_case(self, event): """ Print an error message when an event is without the activity or the case identifier Parameters ---------------- event Event """ logging.warning("event without activity or case: " + str(event))
[docs] def encode_str(self, stru): """ Encodes a string for storage in generic dictionaries """ return str(stru)
[docs] def encode_tuple(self, tup): """ Encodes a tuple for storage in generic dictionaries """ return str(tup)
def _process(self, event): """ Receives an event from the live event stream, and appends it to the current DFG discovery Parameters --------------- event Event """ if self.case_id_key in event and self.activity_key in event: case = self.encode_str(event[self.case_id_key]) activity = self.encode_str(event[self.activity_key]) if case not in self.case_dict: if activity not in self.start_activities: self.start_activities[activity] = 1 else: self.start_activities[activity] = ( int(self.start_activities[activity]) + 1 ) else: df = self.encode_tuple((self.case_dict[case], activity)) if df not in self.dfg: self.dfg[df] = 1 else: self.dfg[df] = int(self.dfg[df]) + 1 if activity not in self.activities: self.activities[activity] = 1 else: self.activities[activity] = int(self.activities[activity]) + 1 self.case_dict[case] = activity else: self.event_without_activity_or_case(event) def _current_result(self): """ Gets the current state of the DFG Returns ---------------- dfg Directly-Follows Graph activities Activities start_activities Start activities end_activities End activities """ dfg = {eval(x): int(self.dfg[x]) for x in self.dfg} activities = {x: int(self.activities[x]) for x in self.activities} start_activities = { x: int(self.start_activities[x]) for x in self.start_activities } end_activities = dict( Counter(self.case_dict[x] for x in self.case_dict) ) return dfg, activities, start_activities, end_activities
[docs] def apply(parameters=None): """ Creates a StreamingDFGDiscovery object Parameters -------------- parameters Parameters of the algorithm """ if parameters is None: parameters = {} return StreamingDfgDiscovery(parameters=parameters)