Source code for pm4py.streaming.conversion.from_pandas

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Optional, Dict, Any

import numpy as np
import pandas as pd

from pm4py.objects.log.obj import Trace, Event
from pm4py.streaming.stream.live_trace_stream import LiveTraceStream
from pm4py.util import constants, xes_constants, exec_utils, pandas_utils


[docs] class Parameters(Enum): CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY INDEX_KEY = "index_key"
[docs] class PandasDataframeAsIterable(object): def __init__( self, dataframe: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None, ): if parameters is None: parameters = {} case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME ) activity_key = exec_utils.get_param_value( Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY ) timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY, ) index_key = exec_utils.get_param_value( Parameters.INDEX_KEY, parameters, constants.DEFAULT_INDEX_KEY ) if not (hasattr(dataframe, "attrs") and dataframe.attrs): # dataframe has not been initialized through format_dataframe dataframe = pandas_utils.insert_index(dataframe, index_key) dataframe.sort_values([case_id_key, timestamp_key, index_key]) cases = dataframe[case_id_key].to_numpy() self.activities = dataframe[activity_key].to_numpy() self.timestamps = dataframe[timestamp_key].to_numpy() self.c_unq, self.c_ind, self.c_counts = np.unique( cases, return_index=True, return_counts=True ) self.no_traces = len(self.c_ind) self.i = 0
[docs] def read_trace(self) -> Trace: if self.i < self.no_traces: case_id = self.c_unq[self.i] si = self.c_ind[self.i] ei = si + self.c_counts[self.i] trace = Trace( attributes={xes_constants.DEFAULT_TRACEID_KEY: case_id} ) for j in range(si, ei): event = Event( { xes_constants.DEFAULT_NAME_KEY: self.activities[j], xes_constants.DEFAULT_TIMESTAMP_KEY: self.timestamps[ j ], } ) trace.append(event) self.i = self.i + 1 return trace
[docs] def reset(self): self.i = 0
def __iter__(self): """ Starts the iteration """ return self def __next__(self): """ Gets the next trace """ trace = self.read_trace() if trace is None: raise StopIteration return trace
[docs] def to_trace_stream(self, trace_stream: LiveTraceStream): """ Sends the content of the dataframe to a trace stream Parameters -------------- trace_stream Trace stream """ trace = self.read_trace() while trace is not None: trace_stream.append(trace) trace = self.read_trace()
[docs] def apply(dataframe, parameters=None) -> PandasDataframeAsIterable: """ Transforms the Pandas dataframe object to an iterable Parameters ---------------- dataframe Pandas dataframe parameters Parameters of the algorithm, including: - Parameters.CASE_ID_KEY => the attribute to be used as case identifier (default: constants.CASE_CONCEPT_NAME) - Parameters.ACTIVITY_KEY => the attribute to be used as activity (default: xes_constants.DEFAULT_NAME_KEY) - Parameters.TIMESTAMP_KEY => the attribute to be used as timestamp (default: xes_constants.DEFAULT_TIMESTAMP_KEY) Returns ---------------- log_iterable Iterable log object, which can be iterated directly or added to a live trace stream (using the method to_trace_stream). """ return PandasDataframeAsIterable(dataframe, parameters=parameters)