Source code for pm4py.statistics.attributes.polars.get

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.statistics.attributes.common import get as attributes_common
from pm4py.util.xes_constants import DEFAULT_TIMESTAMP_KEY
from pm4py.util import exec_utils
from pm4py.util import constants
from enum import Enum
from collections import Counter
import polars as pl
import sys
from typing import Optional, Dict, Any, Union, Tuple, List


[docs] class Parameters(Enum): ATTRIBUTE_KEY = constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY MAX_NO_POINTS_SAMPLE = "max_no_of_points_to_sample" KEEP_ONCE_PER_CASE = "keep_once_per_case"
def __add_left_0(stri: str, target_length: int) -> str: """ Adds left 0s to the current string until the target length is reached Parameters ---------------- stri String target_length Target length Returns ---------------- stri Revised string """ while len(stri) < target_length: stri = "0" + stri return stri
[docs] def get_events_distribution( lf: pl.LazyFrame, distr_type: str = "days_month", parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Tuple[List[str], List[int]]: """ Gets the distribution of the events in the specified dimension Parameters ---------------- lf Polars LazyFrame distr_type Type of distribution: - days_month => Gets the distribution of the events among the days of a month (from 1 to 31) - months => Gets the distribution of the events among the months (from 1 to 12) - years => Gets the distribution of the events among the years of the event log - hours => Gets the distribution of the events among the hours of a day (from 0 to 23) - days_week => Gets the distribution of the events among the days of a week (from Monday to Sunday) - weeks => Distribution of the events among the weeks of a year (from 0 to 52) parameters Parameters of the algorithm, including: - Parameters.TIMESTAMP_KEY Returns ---------------- x Points (of the X-axis) y Points (of the Y-axis) """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, DEFAULT_TIMESTAMP_KEY ) values = None all_values = None if distr_type == "days_month": serie = lf.select(pl.col(timestamp_key).dt.day().alias("value")).group_by("value").count() values_df = serie.collect() values = Counter(dict(zip(values_df["value"].to_list(), values_df["count"].to_list()))) all_values = Counter({i: 0 for i in range(1, 32)}) elif distr_type == "months": serie = lf.select(pl.col(timestamp_key).dt.month().alias("value")).group_by("value").count() values_df = serie.collect() values = Counter(dict(zip(values_df["value"].to_list(), values_df["count"].to_list()))) all_values = Counter({i: 0 for i in range(1, 13)}) elif distr_type == "years": serie = lf.select(pl.col(timestamp_key).dt.year().alias("value")).group_by("value").count() values_df = serie.collect() values = Counter(dict(zip(values_df["value"].to_list(), values_df["count"].to_list()))) all_values = Counter( {i: 0 for i in range(min(values), max(values) + 1)} ) elif distr_type == "hours": serie = lf.select(pl.col(timestamp_key).dt.hour().alias("value")).group_by("value").count() values_df = serie.collect() values = Counter(dict(zip(values_df["value"].to_list(), values_df["count"].to_list()))) all_values = Counter({i: 0 for i in range(0, 24)}) elif distr_type == "days_week": serie = lf.select(pl.col(timestamp_key).dt.weekday().alias("value")).group_by("value").count() values_df = serie.collect() # Convert from Monday=1 to Monday=0 format to match pandas values_dict = dict(zip(values_df["value"].to_list(), values_df["count"].to_list())) values = Counter({k-1: v for k, v in values_dict.items()}) all_values = Counter({i: 0 for i in range(0, 7)}) elif distr_type == "weeks": serie = lf.select(pl.col(timestamp_key).dt.week().alias("value")).group_by("value").count() values_df = serie.collect() values = Counter(dict(zip(values_df["value"].to_list(), values_df["count"].to_list()))) all_values = Counter({i: 0 for i in range(0, 53)}) # make sure that all the possible values appear for v in all_values: if v not in values: values[v] = all_values[v] values = sorted([(__add_left_0(str(x), 2), y) for x, y in values.items()]) if distr_type == "days_week": mapping = { "00": "Monday", "01": "Tuesday", "02": "Wednesday", "03": "Thursday", "04": "Friday", "05": "Saturday", "06": "Sunday", } values = [(mapping[x[0]], x[1]) for x in values] return [x[0] for x in values], [x[1] for x in values]
[docs] def get_attribute_values( lf: pl.LazyFrame, attribute_key: str, parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Dict[Any, int]: """ Return list of attribute values contained in the specified column of the LazyFrame Parameters ----------- lf Polars LazyFrame attribute_key Attribute for which we want to known the values and the count parameters Possible parameters of the algorithm Returns ----------- attributes_values_dict Attributes in the specified column, along with their count """ if parameters is None: parameters = {} case_id_glue = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME ) keep_once_per_case = exec_utils.get_param_value( Parameters.KEEP_ONCE_PER_CASE, parameters, False ) if keep_once_per_case: processed_lf = lf.group_by([case_id_glue, attribute_key]).agg(pl.first()) else: processed_lf = lf value_counts = processed_lf.select(pl.col(attribute_key)).group_by(attribute_key).count().collect() attributes_values_dict = dict(zip(value_counts[attribute_key].to_list(), value_counts["count"].to_list())) return attributes_values_dict
[docs] def get_kde_numeric_attribute( lf: pl.LazyFrame, attribute: str, parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Dict[Any, int]: """ Gets the KDE estimation for the distribution of a numeric attribute values Parameters ------------- lf Polars LazyFrame attribute Numeric attribute to analyse parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- x X-axis values to represent y Y-axis values to represent """ if parameters is None: parameters = {} max_no_of_points_to_sample = exec_utils.get_param_value( Parameters.MAX_NO_POINTS_SAMPLE, parameters, sys.maxsize ) red_lf = lf.filter(pl.col(attribute).is_not_null()) # Get sample if needed if max_no_of_points_to_sample < sys.maxsize: red_df = red_lf.collect().sample(n=min(max_no_of_points_to_sample, len(red_lf.collect()))) values = sorted(red_df[attribute].to_list()) else: values = sorted(red_lf.select(pl.col(attribute)).collect()[attribute].to_list()) return attributes_common.get_kde_numeric_attribute( values, parameters=parameters )
[docs] def get_kde_numeric_attribute_json(lf, attribute, parameters=None): """ Gets the KDE estimation for the distribution of a numeric attribute values (expressed as JSON) Parameters -------------- lf Polars LazyFrame attribute Numeric attribute to analyse parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- json JSON representing the graph points """ values = lf.filter(pl.col(attribute).is_not_null()).select(pl.col(attribute)).collect()[attribute].to_list() return attributes_common.get_kde_numeric_attribute_json( values, parameters=parameters )
[docs] def get_kde_date_attribute( lf, attribute=DEFAULT_TIMESTAMP_KEY, parameters=None ): """ Gets the KDE estimation for the distribution of a date attribute values Parameters ------------- lf Polars LazyFrame attribute Date attribute to analyse parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- x X-axis values to represent y Y-axis values to represent """ if parameters is None: parameters = {} max_no_of_points_to_sample = exec_utils.get_param_value( Parameters.MAX_NO_POINTS_SAMPLE, parameters, sys.maxsize ) red_lf = lf.filter(pl.col(attribute).is_not_null()) # Get sample if needed if max_no_of_points_to_sample < sys.maxsize: red_df = red_lf.collect().sample(n=min(max_no_of_points_to_sample, len(red_lf.collect()))) date_values = sorted(red_df[attribute].to_list()) else: date_values = sorted(red_lf.select(pl.col(attribute)).collect()[attribute].to_list()) return attributes_common.get_kde_date_attribute( date_values, parameters=parameters )
[docs] def get_kde_date_attribute_json( lf, attribute=DEFAULT_TIMESTAMP_KEY, parameters=None ): """ Gets the KDE estimation for the distribution of a date attribute values (expressed as JSON) Parameters -------------- lf Polars LazyFrame attribute Date attribute to analyse parameters Possible parameters of the algorithm, including: graph_points -> number of points to include in the graph Returns -------------- json JSON representing the graph points """ values = lf.filter(pl.col(attribute).is_not_null()).select(pl.col(attribute)).collect()[attribute].to_list() return attributes_common.get_kde_date_attribute_json( values, parameters=parameters )