Source code for pm4py.objects.log.importer.xes.variants.rustxes

from enum import Enum
from pm4py.util import exec_utils
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog
from pm4py.objects.conversion.log import converter as log_converter
import pandas as pd
from copy import copy
from pm4py.util.dt_parsing.variants import strpfromiso
import importlib.util


[docs] class Parameters(Enum): RETURN_LEGACY_LOG_OBJECT = "return_legacy_log_object"
[docs] def apply( log_path: str, parameters: Optional[Dict[Any, Any]] = None ) -> Union[EventLog, pd.DataFrame]: if parameters is None: parameters = {} return_legacy_log_object = exec_utils.get_param_value( Parameters.RETURN_LEGACY_LOG_OBJECT, parameters, True ) import rustxes log = rustxes.import_xes(log_path) log = log[0] log = log.to_pandas() for col in log.columns: if "date" in str(log[col].dtype) or "time" in str(log[col].dtype): log[col] = strpfromiso.fix_dataframe_column(log[col]) if importlib.util.find_spec("cudf"): import cudf log = cudf.DataFrame(log) if return_legacy_log_object: this_parameters = copy(parameters) this_parameters["stream_postprocessing"] = True log = log_converter.apply( log, variant=log_converter.Variants.TO_EVENT_LOG, parameters=this_parameters, ) return log