'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Optional, Dict, Any, List
import pandas as pd
from pm4py.objects.conversion.log import converter as log_converter
from pm4py.objects.log.obj import EventStream
from pm4py.util import constants
from pm4py.util import exec_utils
from pm4py.util import points_subset
from pm4py.util import xes_constants, pandas_utils
from pm4py.util.dt_parsing.variants import strpfromiso
import numpy as np
import random
import traceback
LEGACY_PARQUET_TP_REPLACER = "AAA"
LEGACY_PARQUET_CASECONCEPTNAME = "caseAAAconceptAAAname"
[docs]
class Parameters(Enum):
PARTITION_COLUMN = "partition_column"
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
CASE_PREFIX = constants.CASE_ATTRIBUTE_PREFIX
CASE_ATTRIBUTES = "case_attributes"
MANDATORY_ATTRIBUTES = "mandatory_attributes"
MAX_NO_CASES = "max_no_cases"
MIN_DIFFERENT_OCC_STR_ATTR = "min_different_occ_str_attr"
MAX_DIFFERENT_OCC_STR_ATTR = "max_different_occ_str_attr"
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
PARAM_ARTIFICIAL_START_ACTIVITY = constants.PARAM_ARTIFICIAL_START_ACTIVITY
PARAM_ARTIFICIAL_END_ACTIVITY = constants.PARAM_ARTIFICIAL_END_ACTIVITY
INDEX_KEY = "index_key"
CASE_INDEX_KEY = "case_index_key"
USE_EXTREMES_TIMESTAMP = "use_extremes_timestamp"
ADD_CASE_IDENTIFIER_COLUMN = "add_case_identifier_column"
DETERMINISTIC = "deterministic"
COUNT_OCCURRENCES = "count_occurrences"
[docs]
def insert_partitioning(df, num_partitions, parameters=None):
"""
Insert the partitioning in the specified dataframe
Parameters
-------------
df
Dataframe
num_partitions
Number of partitions
parameters
Parameters of the algorithm
Returns
-------------
df
Partitioned dataframe
"""
if parameters is None:
parameters = {}
case_index_key = exec_utils.get_param_value(
Parameters.CASE_INDEX_KEY, parameters, constants.DEFAULT_CASE_INDEX_KEY
)
partition_column = exec_utils.get_param_value(
Parameters.PARTITION_COLUMN, parameters, "@@partitioning"
)
if case_index_key not in df.columns:
from pm4py.util import pandas_utils
df = pandas_utils.insert_case_index(df, case_index_key)
df[partition_column] = df[case_index_key] % num_partitions
return df
[docs]
def legacy_parquet_support(df, parameters=None):
"""
For legacy support, Parquet files columns could not contain
a ":" that has been arbitrarily replaced by a replacer string.
This string substitutes the replacer to the :
Parameters
---------------
dataframe
Dataframe
parameters
Parameters of the algorithm
"""
if parameters is None:
parameters = {}
df.columns = [
x.replace(LEGACY_PARQUET_TP_REPLACER, ":") for x in df.columns
]
return df
[docs]
def table_to_stream(table, parameters=None):
"""
Converts a Pyarrow table to an event stream
Parameters
------------
table
Pyarrow table
parameters
Possible parameters of the algorithm
"""
if parameters is None:
parameters = {}
dict0 = table.to_pydict()
keys = list(dict0.keys())
# for legacy format support
if LEGACY_PARQUET_CASECONCEPTNAME in keys:
for key in keys:
dict0[key.replace(LEGACY_PARQUET_TP_REPLACER, ":")] = dict0.pop(
key
)
stream = EventStream([dict(zip(dict0, i)) for i in zip(*dict0.values())])
return stream
[docs]
def table_to_log(table, parameters=None):
"""
Converts a Pyarrow table to an event log
Parameters
------------
table
Pyarrow table
parameters
Possible parameters of the algorithm
"""
if parameters is None:
parameters = {}
stream = table_to_stream(table, parameters=parameters)
return log_converter.apply(stream, parameters=parameters)
[docs]
def convert_timestamp_columns_in_df(
df, timest_format=None, timest_columns=None
):
"""
Convert all dataframe columns in a dataframe
Parameters
-----------
df
Dataframe
timest_format
(If provided) Format of the timestamp columns in the CSV file
timest_columns
Columns of the CSV that shall be converted into timestamp
Returns
------------
df
Dataframe with timestamp columns converted
"""
if timest_format is None:
timest_format = constants.DEFAULT_TIMESTAMP_PARSE_FORMAT
if timest_format is None:
timest_format = "mixed"
for col in df.columns:
if timest_columns is None or col in timest_columns:
if "obj" in str(df[col].dtype) or "str" in str(df[col].dtype):
try:
df[col] = pandas_utils.dataframe_column_string_to_datetime(
df[col], format=timest_format, utc=True
)
except BaseException:
try:
df[col] = (
pandas_utils.dataframe_column_string_to_datetime(
df[col],
format=timest_format,
exact=False,
utc=True,
)
)
except BaseException:
try:
df[col] = (
pandas_utils.dataframe_column_string_to_datetime(
df[col], format=timest_format))
except BaseException:
pass
for col in df.columns:
if "date" in str(df[col].dtype) or "time" in str(df[col].dtype):
df[col] = strpfromiso.fix_dataframe_column(df[col])
return df
[docs]
def sample_dataframe(df, parameters=None):
"""
Sample a dataframe on a given number of cases
Parameters
--------------
df
Dataframe
parameters
Parameters of the algorithm, including:
- Parameters.CASE_ID_KEY
- Parameters.CASE_ID_TO_RETAIN
Returns
-------------
sampled_df
Sampled dataframe
"""
if parameters is None:
parameters = {}
deterministic = exec_utils.get_param_value(
Parameters.DETERMINISTIC, parameters, False
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
max_no_cases = exec_utils.get_param_value(
Parameters.MAX_NO_CASES, parameters, 100
)
case_ids = pandas_utils.format_unique(df[case_id_key].unique())
case_ids = list(case_ids)
if not deterministic:
random.shuffle(case_ids)
case_id_to_retain = points_subset.pick_chosen_points_list(
min(max_no_cases, len(case_ids)), case_ids
)
return df[df[case_id_key].isin(case_id_to_retain)]
[docs]
def automatic_feature_selection_df(df, parameters=None):
"""
Performs an automatic feature selection on dataframes,
keeping the features useful for ML purposes
Parameters
---------------
df
Dataframe
parameters
Parameters of the algorithm
Returns
---------------
featured_df
Dataframe with only the features that have been selected
"""
if parameters is None:
parameters = {}
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
mandatory_attributes = exec_utils.get_param_value(
Parameters.MANDATORY_ATTRIBUTES,
parameters,
set(df.columns).intersection(
{case_id_key, activity_key, timestamp_key}
),
)
min_different_occ_str_attr = exec_utils.get_param_value(
Parameters.MIN_DIFFERENT_OCC_STR_ATTR, parameters, 5
)
max_different_occ_str_attr = exec_utils.get_param_value(
Parameters.MAX_DIFFERENT_OCC_STR_ATTR, parameters, 50
)
cols_dtypes = {x: str(df[x].dtype) for x in df.columns}
other_attributes_to_retain = set()
no_all_cases = df[case_id_key].nunique()
for x, y in cols_dtypes.items():
attr_df = df.dropna(subset=[x])
this_cases = attr_df[case_id_key].nunique()
# in any case, keep attributes that appears at least once per case
if this_cases == no_all_cases:
if "float" in y or "int" in y:
# (as in the classic log version) retain always float/int attributes
other_attributes_to_retain.add(x)
elif "obj" in y or "str" in y:
# (as in the classic log version) keep string attributes if they have enough variability, but not too much
# (that would be hard to explain)
unique_val_count = df[x].nunique()
if (
min_different_occ_str_attr
<= unique_val_count
<= max_different_occ_str_attr
):
other_attributes_to_retain.add(x)
else:
# not consider the attribute after this feature selection if it
# has other types (for example, date)
pass
attributes_to_retain = mandatory_attributes.union(
other_attributes_to_retain
)
return df[list(attributes_to_retain)]
[docs]
def select_number_column(
df: pd.DataFrame,
fea_df: pd.DataFrame,
col: str,
case_id_key=constants.CASE_CONCEPT_NAME,
) -> pd.DataFrame:
"""
Extract a column for the features dataframe for the given numeric attribute
Parameters
--------------
df
Dataframe
fea_df
Feature dataframe
col
Numeric column
case_id_key
Case ID key
Returns
--------------
fea_df
Feature dataframe (desidered output)
"""
df = (
df.dropna(subset=[col])
.groupby(case_id_key)
.last()
.reset_index()[[case_id_key, col]]
)
fea_df = fea_df.merge(
df, on=[case_id_key], how="left", suffixes=("", "_y")
)
fea_df[col] = fea_df[col].astype(np.float32)
return fea_df
[docs]
def select_string_column(
df: pd.DataFrame,
fea_df: pd.DataFrame,
col: str,
case_id_key=constants.CASE_CONCEPT_NAME,
count_occurrences=False,
) -> pd.DataFrame:
"""
Extract N columns (for N different attribute values; hotencoding) for the features dataframe for the given string attribute
Parameters
--------------
df
Dataframe
fea_df
Feature dataframe
col
String column
case_id_key
Case ID key
count_occurrences
If True, count the number of occurrences of the attribute value in each case.
If False (default), use binary encoding (1 if present, 0 if not present)
Returns
--------------
fea_df
Feature dataframe (desidered output)
"""
vals = pandas_utils.format_unique(df[col].unique())
for val in vals:
if val is not None:
# Convert value to string first to handle all data types
val_str = str(val)
# Remove non-ASCII characters and spaces for column naming
new_col = (
col
+ "_"
+ val_str.encode("ascii", errors="ignore")
.decode("ascii")
.replace(" ", "")
)
if count_occurrences:
# Count the number of occurrences of this value per case
counts = df[df[col] == val].groupby(case_id_key).size().reset_index(name='count')
fea_df = fea_df.merge(
counts.rename(columns={'count': new_col}),
on=case_id_key,
how="left"
)
fea_df[new_col] = fea_df[new_col].fillna(0).astype(np.float32)
else:
# Binary encoding (original behavior)
filt_df_cases = pandas_utils.format_unique(
df[df[col] == val][case_id_key].unique()
)
fea_df[new_col] = fea_df[case_id_key].isin(filt_df_cases)
fea_df[new_col] = fea_df[new_col].astype(np.float32)
return fea_df
[docs]
def get_features_df(
df: pd.DataFrame,
list_columns: List[str],
parameters: Optional[Dict[Any, Any]] = None,
) -> pd.DataFrame:
"""
Given a dataframe and a list of columns, performs an automatic feature extraction
Parameters
---------------
df
Dataframe
list_column
List of column to consider in the feature extraction
parameters
Parameters of the algorithm, including:
- Parameters.CASE_ID_KEY: the case ID
- Parameters.COUNT_OCCURRENCES: if True, count occurrences of string attributes instead of binary encoding
Returns
---------------
fea_df
Feature dataframe (desidered output)
"""
if parameters is None:
parameters = {}
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
add_case_identifier_column = exec_utils.get_param_value(
Parameters.ADD_CASE_IDENTIFIER_COLUMN, parameters, False
)
count_occurrences = exec_utils.get_param_value(
Parameters.COUNT_OCCURRENCES, parameters, False
)
fea_df = pandas_utils.instantiate_dataframe(
{
case_id_key: sorted(
pandas_utils.format_unique(df[case_id_key].unique())
)
}
)
for col in list_columns:
if "obj" in str(df[col].dtype) or "str" in str(df[col].dtype):
fea_df = select_string_column(
df, fea_df, col, case_id_key=case_id_key, count_occurrences=count_occurrences
)
elif "float" in str(df[col].dtype) or "int" in str(df[col].dtype):
fea_df = select_number_column(
df, fea_df, col, case_id_key=case_id_key
)
fea_df = fea_df.sort_values(case_id_key)
if not add_case_identifier_column:
del fea_df[case_id_key]
return fea_df
[docs]
def insert_artificial_start_end(
df0: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
) -> pd.DataFrame:
"""
Inserts the artificial start/end activities in a Pandas dataframe
Parameters
------------------
df0
Dataframe
parameters
Parameters of the algorithm, including:
- Parameters.CASE_ID_KEY: the case identifier
- Parameters.TIMESTAMP_KEY: the timestamp
- Parameters.ACTIVITY_KEY: the activity
Returns
-----------------
enriched_df
Dataframe with artificial start/end activities
"""
if parameters is None:
parameters = {}
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY
)
use_extremes_timestamp = exec_utils.get_param_value(
Parameters.USE_EXTREMES_TIMESTAMP, parameters, False
)
artificial_start_activity = exec_utils.get_param_value(
Parameters.PARAM_ARTIFICIAL_START_ACTIVITY,
parameters,
constants.DEFAULT_ARTIFICIAL_START_ACTIVITY,
)
artificial_end_activity = exec_utils.get_param_value(
Parameters.PARAM_ARTIFICIAL_END_ACTIVITY,
parameters,
constants.DEFAULT_ARTIFICIAL_END_ACTIVITY,
)
index_key = exec_utils.get_param_value(
Parameters.INDEX_KEY, parameters, constants.DEFAULT_INDEX_KEY
)
df = df0.copy()
df = pandas_utils.insert_index(df, index_key)
df = df.sort_values([case_id_key, timestamp_key, index_key])
start_df = (
df[[case_id_key, timestamp_key]]
.groupby(case_id_key)
.first()
.reset_index()
)
end_df = (
df[[case_id_key, timestamp_key]]
.groupby(case_id_key)
.last()
.reset_index()
)
# stability trick: remove 1ms from the artificial start activity
# timestamp, add 1ms to the artificial end activity timestamp
if use_extremes_timestamp:
start_df[timestamp_key] = pd.Timestamp.min
end_df[timestamp_key] = pd.Timestamp.max
start_df[timestamp_key] = start_df[timestamp_key].dt.tz_localize("utc")
end_df[timestamp_key] = end_df[timestamp_key].dt.tz_localize("utc")
else:
start_df[timestamp_key] = start_df[timestamp_key] - pd.Timedelta(
"1 ms"
)
end_df[timestamp_key] = end_df[timestamp_key] + pd.Timedelta("1 ms")
start_df[activity_key] = artificial_start_activity
end_df[activity_key] = artificial_end_activity
df = pandas_utils.concat([start_df, df, end_df])
df = pandas_utils.insert_index(df, index_key)
df = df.sort_values([case_id_key, timestamp_key, index_key])
df.attrs = df0.attrs
return df
[docs]
def dataframe_to_activity_case_table(
df: pd.DataFrame, parameters: Optional[Dict[Any, Any]] = None
):
"""
Transforms a Pandas dataframe into:
- an "activity" table, containing the events and their attributes
- a "case" table, containing the cases and their attributes
Parameters
--------------
df
Dataframe
parameters
Parameters of the algorithm that should be used, including:
- Parameters.CASE_ID_KEY => the column to be used as case ID (shall be included both in the activity table and the case table)
- Parameters.CASE_PREFIX => if a list of attributes at the case level is not provided, then all the ones of the dataframe
starting with one of these are considered.
- Parameters.CASE_ATTRIBUTES => the attributes of the dataframe to be used as case columns
Returns
---------------
activity_table
Activity table
case_table
Case table
"""
if parameters is None:
parameters = {}
# make sure we start from a dataframe object
df = log_converter.apply(
df, variant=log_converter.Variants.TO_DATA_FRAME, parameters=parameters
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
case_id_prefix = exec_utils.get_param_value(
Parameters.CASE_PREFIX, parameters, constants.CASE_ATTRIBUTE_PREFIX
)
case_attributes = exec_utils.get_param_value(
Parameters.CASE_ATTRIBUTES,
parameters,
set([x for x in df.columns if x.startswith(case_id_prefix)]),
)
event_attributes = set([x for x in df.columns if x not in case_attributes])
activity_table = df[event_attributes.union({case_id_key})]
case_table = (
df[case_attributes.union({case_id_key})]
.groupby(case_id_key)
.first()
.reset_index()
)
return activity_table, case_table