Source code for pm4py.objects.conversion.log.variants.to_event_stream

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import math
from copy import deepcopy, copy
from enum import Enum

from pm4py.objects.conversion.log import constants
from pm4py.objects.log import obj as log_instance
from pm4py.objects.log.obj import EventLog, Event, XESExtension
from pm4py.util import constants as pmutil
from pm4py.util import exec_utils, pandas_utils, xes_constants
import pandas
import math


[docs] class Parameters(Enum): DEEP_COPY = constants.DEEPCOPY STREAM_POST_PROCESSING = constants.STREAM_POSTPROCESSING CASE_ATTRIBUTE_PREFIX = "case_attribute_prefix" INCLUDE_CASE_ATTRIBUTES = "include_case_attributes" COMPRESS = "compress" EXTENSIONS = "extensions"
def __postprocess_stream(list_events): """ Postprocess the list of events of the stream in order to make sure that there are no NaN/NaT values Parameters ------------- list_events List of events Returns ------------- list_events Postprocessed stream """ NaTType = pandas._libs.tslibs.nattype.NaTType for event in list_events: keys_to_delete = [ k for k, v in event.items() if type(v) is NaTType or ((type(v) is float or type(v) is int) and math.isnan(v)) or v is None ] for k in keys_to_delete: del event[k] return list_events def __compress(list_events): """ Compress a list of events, using one instantiation for the same key/value. Parameters -------------- list_events List of events of the stream Returns -------------- :param list_events: :return: """ compress_dict = {} i = 0 while i < len(list_events): # create a new event where keys and values are compressed comp_ev = {} for k, v in list_events[i].items(): # check if the key has already been instantiated. # in that case, use the current instantiation. if k not in compress_dict: compress_dict[k] = k else: k = compress_dict[k] # check if the value has already been instantiated. # in that case, use the current instantiation if v not in compress_dict: compress_dict[v] = v else: v = compress_dict[v] # saves the compressed keys and values in the dictionary comp_ev[k] = v list_events[i] = comp_ev i = i + 1 return list_events
[docs] def apply(log, parameters=None): """ Converts the event log to an event stream Parameters ---------- log: :class:`pm4py.log.log.EventLog` An Event log include_case_attributes: Default is True case_attribute_prefix: Default is 'case:' enable_deepcopy Enables deepcopy (avoid references between input and output objects) Returns ------- log : :class:`pm4py.log.log.EventLog` An Event stream """ if parameters is None: parameters = {} stream_post_processing = exec_utils.get_param_value( Parameters.STREAM_POST_PROCESSING, parameters, False ) case_pref = exec_utils.get_param_value( Parameters.CASE_ATTRIBUTE_PREFIX, parameters, "case:" ) enable_deepcopy = exec_utils.get_param_value( Parameters.DEEP_COPY, parameters, True ) include_case_attributes = exec_utils.get_param_value( Parameters.INCLUDE_CASE_ATTRIBUTES, parameters, True ) compress = exec_utils.get_param_value( Parameters.COMPRESS, parameters, False ) extensions = exec_utils.get_param_value( Parameters.EXTENSIONS, parameters, None ) if pandas_utils.check_is_pandas_dataframe(log): return __transform_dataframe_to_event_stream( log, stream_post_processing=stream_post_processing, compress=compress, extensions=extensions, ) if isinstance(log, EventLog): return __transform_event_log_to_event_stream( log, include_case_attributes=include_case_attributes, case_attribute_prefix=case_pref, enable_deepcopy=enable_deepcopy, ) return log
def __detect_extensions(df): extensions = set() for col in df.columns: for single_key in col.split(":"): for ext in XESExtension: if single_key == ext.prefix: extensions.add(ext) return extensions def __transform_dataframe_to_event_stream( dataframe, stream_post_processing=False, compress=True, extensions=None ): """ Transforms a dataframe to an event stream Parameters ------------------ dataframe Pandas dataframe stream_post_processing Boolean value that enables the post processing to remove NaN / NaT values compress Compresses the stream in order to reduce the memory utilization after the conversion extensions Provided extensions (to be included in the log) Returns ------------------ stream Event stream """ if extensions is None: extensions = __detect_extensions(dataframe) list_events = pandas_utils.to_dict_records(dataframe) if stream_post_processing: list_events = __postprocess_stream(list_events) if compress: list_events = __compress(list_events) for i in range(len(list_events)): list_events[i] = Event(list_events[i]) if hasattr(dataframe, "attrs"): properties = copy(dataframe.attrs) if pmutil.PARAMETER_CONSTANT_CASEID_KEY in properties: del properties[pmutil.PARAMETER_CONSTANT_CASEID_KEY] else: properties = {} stream = log_instance.EventStream( list_events, attributes={"origin": "csv"}, properties=properties ) for ex in extensions: stream.extensions[ex.name] = { xes_constants.KEY_PREFIX: ex.prefix, xes_constants.KEY_URI: ex.uri, } return stream def __transform_dataframe_to_event_stream_new( dataframe, stream_post_processing=False, compress=False, extensions=None ): """ Transforms a dataframe to an event stream Parameters ------------------ dataframe Pandas dataframe stream_post_processing Boolean value that enables the post processing to remove NaN / NaT values compress Compresses the stream in order to reduce the memory utilization after the conversion extensions Provided extensions (to be included in the log) Returns ------------------ stream Event stream """ if extensions is None: extensions = __detect_extensions(dataframe) columns_names = list(dataframe.columns) columns_corr = [] for c in columns_names: columns_corr.append(dataframe[c].to_numpy()) length = columns_corr[-1].size list_events = [] for i in range(length): eve = {} for j in range(len(columns_names)): eve[columns_names[j]] = columns_corr[j][i] list_events.append(eve) if stream_post_processing: list_events = __postprocess_stream(list_events) if compress: list_events = __compress(list_events) for i in range(len(list_events)): list_events[i] = Event(list_events[i]) if hasattr(dataframe, "attrs"): properties = copy(dataframe.attrs) if pmutil.PARAMETER_CONSTANT_CASEID_KEY in properties: del properties[pmutil.PARAMETER_CONSTANT_CASEID_KEY] else: properties = {} stream = log_instance.EventStream( list_events, attributes={"origin": "csv"}, properties=properties ) for ex in extensions: stream.extensions[ex.name] = { xes_constants.KEY_PREFIX: ex.prefix, xes_constants.KEY_URI: ex.uri, } return stream def __transform_event_log_to_event_stream( log, include_case_attributes=True, case_attribute_prefix=pmutil.CASE_ATTRIBUTE_PREFIX, enable_deepcopy=False, ): """ Converts the event log to an event stream Parameters ---------- log: :class:`pm4py.log.log.EventLog` An Event log include_case_attributes: Default is True case_attribute_prefix: Default is 'case:' enable_deepcopy Enables deepcopy (avoid references between input and output objects) Returns ------- log : :class:`pm4py.log.log.EventLog` An Event stream """ event_stream = log_instance.EventStream( [], attributes=log.attributes, classifiers=log.classifiers, omni_present=log.omni_present, extensions=log.extensions, properties=log.properties, ) for index, trace in enumerate(log): for event in trace: new_event = deepcopy(event) if enable_deepcopy else event if include_case_attributes: for key, value in trace.attributes.items(): new_event[case_attribute_prefix + key] = value else: new_event[pmutil.CASE_ATTRIBUTE_GLUE] = str(index) event_stream.append(new_event) return event_stream