Source code for pm4py.statistics.traces.cycle_time.polars.get

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Dict, Optional, Any, Union

import polars as pl

from pm4py.statistics.traces.cycle_time.util import compute
from pm4py.util import exec_utils, constants, xes_constants


[docs] class Parameters(Enum): TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
[docs] def apply( lf: pl.LazyFrame, parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> float: """ Computes the cycle time starting from a Polars LazyFrame The definition that has been followed is the one proposed in: https://www.presentationeze.com/presentations/lean-manufacturing-just-in-time/lean-manufacturing-just-in-time-full-details/process-cycle-time-analysis/calculate-cycle-time/#:~:text=Cycle%20time%20%3D%20Average%20time%20between,is%2024%20minutes%20on%20average. So: Cycle time = Average time between completion of units. Example taken from the website: Consider a manufacturing facility, which is producing 100 units of product per 40 hour week. The average throughput rate is 1 unit per 0.4 hours, which is one unit every 24 minutes. Therefore the cycle time is 24 minutes on average. Parameters ------------------ lf LazyFrame parameters Parameters of the algorithm, including: - Parameters.START_TIMESTAMP_KEY => the attribute acting as start timestamp - Parameters.TIMESTAMP_KEY => the attribute acting as timestamp - Parameters.CASE_ID_KEY => the attribute acting as case identifier Returns ------------------ cycle_time Cycle time """ if parameters is None: parameters = {} timestamp_key = exec_utils.get_param_value( Parameters.TIMESTAMP_KEY, parameters, xes_constants.DEFAULT_TIMESTAMP_KEY, ) start_timestamp_key = exec_utils.get_param_value( Parameters.START_TIMESTAMP_KEY, parameters, timestamp_key ) case_id_key = exec_utils.get_param_value( Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME ) # Handle case where start_timestamp_key and timestamp_key are the same if start_timestamp_key == timestamp_key: # Select unique columns and rename for clarity events_df = lf.select([ pl.col(timestamp_key).alias("start_ts"), pl.col(timestamp_key).alias("end_ts"), pl.col(case_id_key) ]).collect() start_col_name = "start_ts" end_col_name = "end_ts" else: # Select different timestamp columns events_df = lf.select([start_timestamp_key, timestamp_key, case_id_key]).collect() start_col_name = start_timestamp_key end_col_name = timestamp_key # Convert timestamps to events tuples events = [] for row in events_df.iter_rows(): start_idx = events_df.columns.index(start_col_name) end_idx = events_df.columns.index(end_col_name) start_ts = row[start_idx].timestamp() if hasattr(row[start_idx], 'timestamp') else row[start_idx] end_ts = row[end_idx].timestamp() if hasattr(row[end_idx], 'timestamp') else row[end_idx] events.append((start_ts, end_ts)) # Get number of unique cases num_instances = events_df[case_id_key].n_unique() return compute.cycle_time(events, num_instances)