Source code for pm4py.statistics.traces.cycle_time.polars.get
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Dict, Optional, Any, Union
import polars as pl
from pm4py.statistics.traces.cycle_time.util import compute
from pm4py.util import exec_utils, constants, xes_constants
[docs]
class Parameters(Enum):
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
START_TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
[docs]
def apply(
lf: pl.LazyFrame,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> float:
"""
Computes the cycle time starting from a Polars LazyFrame
The definition that has been followed is the one proposed in:
https://www.presentationeze.com/presentations/lean-manufacturing-just-in-time/lean-manufacturing-just-in-time-full-details/process-cycle-time-analysis/calculate-cycle-time/#:~:text=Cycle%20time%20%3D%20Average%20time%20between,is%2024%20minutes%20on%20average.
So:
Cycle time = Average time between completion of units.
Example taken from the website:
Consider a manufacturing facility, which is producing 100 units of product per 40 hour week.
The average throughput rate is 1 unit per 0.4 hours, which is one unit every 24 minutes.
Therefore the cycle time is 24 minutes on average.
Parameters
------------------
lf
LazyFrame
parameters
Parameters of the algorithm, including:
- Parameters.START_TIMESTAMP_KEY => the attribute acting as start timestamp
- Parameters.TIMESTAMP_KEY => the attribute acting as timestamp
- Parameters.CASE_ID_KEY => the attribute acting as case identifier
Returns
------------------
cycle_time
Cycle time
"""
if parameters is None:
parameters = {}
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
start_timestamp_key = exec_utils.get_param_value(
Parameters.START_TIMESTAMP_KEY, parameters, timestamp_key
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
# Handle case where start_timestamp_key and timestamp_key are the same
if start_timestamp_key == timestamp_key:
# Select unique columns and rename for clarity
events_df = lf.select([
pl.col(timestamp_key).alias("start_ts"),
pl.col(timestamp_key).alias("end_ts"),
pl.col(case_id_key)
]).collect()
start_col_name = "start_ts"
end_col_name = "end_ts"
else:
# Select different timestamp columns
events_df = lf.select([start_timestamp_key, timestamp_key, case_id_key]).collect()
start_col_name = start_timestamp_key
end_col_name = timestamp_key
# Convert timestamps to events tuples
events = []
for row in events_df.iter_rows():
start_idx = events_df.columns.index(start_col_name)
end_idx = events_df.columns.index(end_col_name)
start_ts = row[start_idx].timestamp() if hasattr(row[start_idx], 'timestamp') else row[start_idx]
end_ts = row[end_idx].timestamp() if hasattr(row[end_idx], 'timestamp') else row[end_idx]
events.append((start_ts, end_ts))
# Get number of unique cases
num_instances = events_df[case_id_key].n_unique()
return compute.cycle_time(events, num_instances)