Source code for pm4py.algo.discovery.batches.utils.detection

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from typing import Tuple, List, Any, Set, Optional, Dict, Union

from pm4py.util import exec_utils
import heapq
from copy import copy


[docs] class Parameters(Enum): MERGE_DISTANCE = "merge_distance" MIN_BATCH_SIZE = "min_batch_size"
[docs] class BatchType(Enum): SIMULTANEOUS = "Simultaneous" BATCHING_START = "Batching on Start" BATCHING_END = "Batching on End" SEQ_BATCHING = "Sequential batching" CONC_BATCHING = "Concurrent batching"
def __merge_overlapping_intervals( intervals: List[Tuple[float, float, Set[Any]]] ) -> List[Tuple[float, float, Set[Any]]]: """ Iterative method that merges the overlapping time intervals (an interval [a, b] is overlapping to [c, d] if a <= c <= b or c <= a <= d). """ continue_cycle = True while continue_cycle: continue_cycle = False i = 0 while i < len(intervals) - 1: if intervals[i][1] > intervals[i + 1][0]: # decide to merge interval i and i+1 new_interval = ( min(intervals[i][0], intervals[i + 1][0]), max(intervals[i][1], intervals[i + 1][1]), intervals[i][2].union(intervals[i + 1][2]), ) # add the new interval to the list intervals.append(new_interval) # remove the i+1 interval del intervals[i + 1] # remove the i interval del intervals[i] # sort the intervals intervals.sort() # set the variable continue_cycle to True continue_cycle = True # interrupt the current iteration on the intervals break i = i + 1 return intervals def __merge_near_intervals( intervals: List[Tuple[float, float, Set[Any]]], max_allowed_distance: float ) -> List[Tuple[float, float, Set[Any]]]: """ Merge the non-overlapping time intervals that are nearer than max_allowed_distance. (an interval [a, b] that is non-overlapping with [c, d] having b < c, is merged if c - b <= max_allowed_distance). """ continue_cycle = True while continue_cycle: continue_cycle = False i = 0 while i < len(intervals) - 1: if intervals[i + 1][0] - intervals[i][1] <= max_allowed_distance: # decide to merge interval i and i+1 new_interval = ( min(intervals[i][0], intervals[i + 1][0]), max(intervals[i][1], intervals[i + 1][1]), intervals[i][2].union(intervals[i + 1][2]), ) # remove the i+1 interval del intervals[i + 1] # remove the i interval del intervals[i] # add the new interval to the list heapq.heappush(intervals, new_interval) # set the variable continue_cycle to True continue_cycle = True i = i - 1 i = i + 1 return intervals def __check_batch_type(batch: Tuple[float, float, Set[Any]]) -> str: """ Checks the batch type between: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched) """ events_batch = sorted(list(batch[2])) # take the minimum of the left-extreme of each interval min_left_events = min(ev[0] for ev in events_batch) # take the maximum of the left-extreme of each interval max_left_events = max(ev[0] for ev in events_batch) # take the minimum of the right-extreme of each interval min_right_events = min(ev[1] for ev in events_batch) # take the maximum of the right-extreme of each interval max_right_events = max(ev[1] for ev in events_batch) # CONDITION 1 - All the events in the batch have identical start and end # timestamps if ( min_left_events == max_left_events and min_right_events == max_right_events ): return BatchType.SIMULTANEOUS.value # CONDITION 4 - All the events in the batch have identical start timestamp: if min_left_events == max_left_events: return BatchType.BATCHING_START.value # CONDITION 5 - All the events in the batch have identical end timestamp: if min_right_events == max_right_events: return BatchType.BATCHING_END.value # now we could be in the SEQUENTIAL batching or the CONCURRENT batching # in order to be in the SEQUENTIAL, we need that for all the consecutive # events the end of the first is equal to the start of the second is_sequential = True i = 0 while i < len(events_batch) - 1: # if there are two consecutive events that are not sequentially # matched, then we automatically fall inside the CONCURRENT batching if events_batch[i][1] != events_batch[i + 1][0]: is_sequential = False break i = i + 1 if is_sequential: return BatchType.SEQ_BATCHING.value else: return BatchType.CONC_BATCHING.value def __detect_single( events: List[Tuple[float, float, str]], parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> Dict[str, List[Any]]: """ Detect if there are batches in the execution of events having a given activity-resource combination """ if parameters is None: parameters = {} ret = { BatchType.SIMULTANEOUS.value: [], BatchType.BATCHING_START.value: [], BatchType.BATCHING_END.value: [], BatchType.CONC_BATCHING.value: [], BatchType.SEQ_BATCHING.value: [], } merge_distance = exec_utils.get_param_value( Parameters.MERGE_DISTANCE, parameters, 15 * 60 ) min_batch_size = exec_utils.get_param_value( Parameters.MIN_BATCH_SIZE, parameters, 2 ) intervals = [(e[0], e[1], {copy(e)}) for e in events] heapq.heapify(intervals) intervals = __merge_overlapping_intervals(intervals) intervals = __merge_near_intervals(intervals, merge_distance) batches = [x for x in intervals if len(x[2]) >= min_batch_size] for batch in batches: batch_type = __check_batch_type(batch) ret[batch_type].append(batch) ret = {x: y for x, y in ret.items() if y} return ret
[docs] def detect( actres_grouping: Dict[Tuple[str, str], List[Tuple[float, float, str]]], parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> List[Tuple[Tuple[str, str], int, Dict[str, Any]]]: """ Provided an activity-resource grouping of the events of the event log, returns a list having as elements the activity-resources with the batches that are detected, divided in: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched) The approach has been described in the following paper: Martin, N., Swennen, M., Depaire, B., Jans, M., Caris, A., & Vanhoof, K. (2015, December). Batch Processing: Definition and Event Log Identification. In SIMPDA (pp. 137-140). Parameters ------------------- actres_grouping Activity-resource grouping of events parameters Parameters of the algorithm Returns ------------------ list_batches A (sorted) list containing tuples. Each tuple contain: - Index 0: the activity-resource for which at least one batch has been detected - Index 1: the number of batches for the given activity-resource - Index 2: a list containing all the batches. Each batch is described by: # The start timestamp of the batch # The complete timestamp of the batch # The list of events that are executed in the batch """ if parameters is None: parameters = {} ret = [] for actres in actres_grouping: batches = __detect_single( actres_grouping[actres], parameters=parameters ) if batches: total_length = sum(len(y) for y in batches.values()) ret.append((actres, total_length, batches)) ret = sorted(ret, reverse=True, key=lambda x: (x[1], x[0])) return ret