import numpy as np
import pandas as pd
from enum import Enum
from typing import Union, Optional, Dict, Any, List, Tuple
from pm4py.objects.log.obj import EventLog
from pm4py.util import exec_utils, constants, xes_constants
[docs]
class Parameters(Enum):
SUB_LOG_SIZE = "sub_log_size"
WINDOW_SIZE = "window_size"
NUM_PERMUTATIONS = "num_permutations"
THRESH_P_VALUE = "thresh_p_value"
MAX_NO_CHANGE_POINTS = "max_no_change_points"
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
[docs]
def apply(log: Union[EventLog, pd.DataFrame], parameters: Optional[Dict[Any, Any]] = None) -> Tuple[
List[pd.DataFrame], List[int], List[float]]:
"""
Apply concept drift detection to an event log, based on the approach described in:
Bose, RP Jagadeesh Chandra, et al. "Handling concept drift in process mining." Advanced Information Systems Engineering: 23rd International Conference, CAiSE 2011, London, UK, June 20-24, 2011. Proceedings 23. Springer Berlin Heidelberg, 2011.
This method detects sudden changes (concept drifts) in a process by analyzing an event log over time. It splits the log into sub-logs, extracts global features (e.g., Relation Type Count), and applies statistical tests (permutation tests) over sliding windows to identify change points where the process behavior significantly differs.
**Parameters**
-------------
log : Union[EventLog, pd.DataFrame]
The input event log, which can be either a PM4Py EventLog object or a Pandas DataFrame. The log contains traces, where each trace is a sequence of events representing a process instance.
parameters : Optional[Dict[Any, Any]], default=None
Configuration parameters for the algorithm. If None, default values are used. Possible keys include:
- `Parameters.SUB_LOG_SIZE` : int, default=50
Number of traces per sub-log.
- `Parameters.WINDOW_SIZE` : int, default=8
Number of sub-logs in each window for statistical comparison.
- `Parameters.NUM_PERMUTATIONS` : int, default=100
Number of permutations for the permutation test.
- `Parameters.THRESH_P_VALUE` : float, default=0.5
Threshold for p-values to consider a change point significant (lower values indicate stronger evidence of drift).
- `Parameters.MAX_NO_CHANGE_POINTS` : int, default=5
Maximum number of change points to detect.
- `Parameters.ACTIVITY_KEY` : str, default='concept:name'
Key to identify the activity attribute in the event log.
- `Parameters.TIMESTAMP_KEY` : str, default='time:timestamp'
Key to identify the timestamp attribute in the event log.
- `Parameters.CASE_ID_KEY` : str, default='case:concept:name'
Key to identify the case ID attribute in the event log.
**Returns**
------------
returned_sublogs : List[EventLog]
A list of sub-logs, where each sub-log is an EventLog object representing the cumulative segment of the original event log from the start up to each detected change point (and the final sub-log up to the end). Note: Due to a potential implementation issue, these sub-logs are not segments between change points but rather cumulative logs up to each change point.
change_timestamps : List[float]
A list of timestamps where concept drifts are detected. Each timestamp corresponds to the start time of the first trace in the sub-log where a change point occurs, based on case start timestamps.
p_values : List[float]
A list of p-values associated with each detected change point, indicating the statistical significance of the drift (lower values suggest stronger evidence of a change).
**Notes**
--------
- The method uses a permutation test to compare feature vectors (e.g., Relation Type Count) extracted from sub-logs within sliding windows. Change points are identified where the p-value falls below the threshold.
"""
if parameters is None:
parameters = {}
import pm4py
sub_log_size = exec_utils.get_param_value(Parameters.SUB_LOG_SIZE, parameters, 50)
window_size = exec_utils.get_param_value(Parameters.WINDOW_SIZE, parameters, 8)
num_permutations = exec_utils.get_param_value(Parameters.NUM_PERMUTATIONS, parameters, 100)
thresh_p_value = exec_utils.get_param_value(Parameters.THRESH_P_VALUE, parameters, 0.5)
max_no_change_points = exec_utils.get_param_value(Parameters.MAX_NO_CHANGE_POINTS, parameters, 5)
activity_key = exec_utils.get_param_value(Parameters.ACTIVITY_KEY, parameters, xes_constants.DEFAULT_NAME_KEY)
timestamp_key = exec_utils.get_param_value(Parameters.TIMESTAMP_KEY, parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY)
case_id_key = exec_utils.get_param_value(Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME)
if isinstance(log, pd.DataFrame):
case_start_timestamps = log.groupby(case_id_key).first()[timestamp_key].to_list()
case_start_timestamps = [x.timestamp() for x in case_start_timestamps]
else:
case_start_timestamps = [c[0][timestamp_key].timestamp() for c in log]
sub_logs, change_points = detect_concept_drift(pm4py.project_on_event_attribute(log, activity_key),
sub_log_size=sub_log_size, window_size=window_size,
num_permutations=num_permutations, thresh_p_value=thresh_p_value,
max_no_change_points=max_no_change_points)
change_indexes = [x[0] for x in change_points]
change_timestamps = [case_start_timestamps[x[0] * sub_log_size] for x in change_points]
p_values = [x[1] for x in change_points]
curr = []
returned_sublogs = []
i = 0
while i < len(sub_logs):
curr = curr + [",".join(x) for x in sub_logs[i]]
if i+1 in change_indexes or i == len(sub_logs) - 1:
if curr:
returned_sublogs.append(pm4py.parse_event_log_string(curr))
curr = None
curr = []
i = i + 1
return returned_sublogs, change_timestamps, p_values
[docs]
def split_into_sub_logs(event_log, sub_log_size=50, keep_leftover=True):
"""
Split the event log (list of traces) into sub-logs of size `sub_log_size`.
Optionally keep leftover traces as a final smaller sub-log.
"""
s = sub_log_size
k = len(event_log) // s # how many full-size sublogs we can get
sub_logs = [event_log[i * s: (i + 1) * s] for i in range(k)]
if keep_leftover:
remainder = len(event_log) % s
if remainder > 0:
sub_logs.append(event_log[k * s: k * s + remainder])
return sub_logs
[docs]
def compute_follows_relation(trace, Sigma):
"""
Compute the 'eventually follows' relation for a single trace.
GLOBAL FOLLOWS (original):
- For each activity 'a' encountered so far,
mark that 'a' is followed by the current activity.
DIRECT FOLLOWS (commented out):
- For each consecutive pair (a, b), add (a, b).
"""
follows_in_trace = set()
# --- GLOBAL FOLLOWS version (as in your original code) ---
seen = set()
for activity in trace:
for a in seen:
if activity in Sigma:
follows_in_trace.add((a, activity))
seen.add(activity)
# --- DIRECT FOLLOWS version ---
# for idx in range(len(trace) - 1):
# a = trace[idx]
# b = trace[idx + 1]
# if a in Sigma and b in Sigma:
# follows_in_trace.add((a, b))
return follows_in_trace
[docs]
def permutation_test(P1, P2, num_permutations=100):
"""
Perform a permutation test to compare the Euclidean distance
between means of two sets of feature vectors P1 and P2.
"""
P1 = np.array(P1)
P2 = np.array(P2)
mean_P1 = np.mean(P1, axis=0)
mean_P2 = np.mean(P2, axis=0)
d_obs = np.linalg.norm(mean_P1 - mean_P2) # Observed distance
# Stack them for shuffling
all_data = np.vstack((P1, P2))
n1 = len(P1)
count = 0
for _ in range(num_permutations):
# A more explicit way:
perm_indices = np.random.permutation(len(all_data))
perm_data = all_data[perm_indices]
perm_P1 = perm_data[:n1]
perm_P2 = perm_data[n1:]
mean_perm_P1 = np.mean(perm_P1, axis=0)
mean_perm_P2 = np.mean(perm_P2, axis=0)
d_perm = np.linalg.norm(mean_perm_P1 - mean_perm_P2)
if d_perm >= d_obs:
count += 1
p_value = count / num_permutations
return p_value
[docs]
def detect_concept_drift(event_log,
sub_log_size=50,
window_size=8,
num_permutations=100,
thresh_p_value=0.5,
max_no_change_points=5):
"""
Detect concept drift in an event log using a permutation test
over consecutive windows of sub-logs.
Parameters:
- event_log: List of lists, where each inner list is a trace of activities.
- sub_log_size: Number of traces per sub-log (default: 50).
- window_size: Number of sub-logs in each window for comparison (default: 8).
- num_permutations: Number of permutations for the statistical test (default: 100).
- thresh_p_value: Threshold for the p-value
- max_no_change_points: Maximum number of change points detected
Returns:
- sub_logs: list of sub-logs, each with (up to) sub_log_size traces
- change_points: list of sub-log indices and p-values where drift is detected
"""
# 1) Extract unique activities
Sigma = extract_unique_activities(event_log)
# 2) Split event log into sub-logs
sub_logs = split_into_sub_logs(event_log, sub_log_size=sub_log_size, keep_leftover=True)
# 3) Compute RC feature vectors for each sub-log
feature_vectors = [extract_global_features(slog, Sigma) for slog in sub_logs]
# 4) Perform permutation tests in a sliding-window fashion
k = len(sub_logs)
p_values = []
# We need at least 2*window_size sub-logs for a valid test
# because we compare P1= sublogs[i : i+window_size] vs P2= sublogs[i+window_size : i+2*window_size]
for i in range(k - 2 * window_size + 1):
P1 = feature_vectors[i: i + window_size]
P2 = feature_vectors[i + window_size: i + 2 * window_size]
p_value = permutation_test(P1, P2, num_permutations=num_permutations)
p_values.append((i + window_size, p_value))
# 5) Identify change points where p-value < threshold
change_points = sorted(p_values, key=lambda x: (x[1], x[0]))
change_points = [x for x in change_points if x[1] < thresh_p_value]
change_points = change_points[:min(len(change_points), max_no_change_points)]
return sub_logs, change_points