Source code for pm4py.algo.anonymization.pripel.util.TraceMatcher

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import random
import sys
from collections import deque

import numpy as np
from scipy.optimize import linear_sum_assignment
from tqdm.auto import tqdm

from pm4py.algo.anonymization.pripel.util.trace_levenshtein import trace_levenshtein
from pm4py.objects.log import obj


[docs] class TraceMatcher: def __init__(self, tv_query_log, log): self.__timestamp = "time:timestamp" self.__allTimestamps = list() self.__allTimeStampDifferences = list() self.__distanceMatrix = dict() self.__trace_variants_query = self.__addTraceToAttribute(tv_query_log) self.__trace_variants_log = self.__addTraceToAttribute(log) attributeIgnorelist = self.__getIgnorelistOfAttributes() self.__distributionOfAttributes, self.__eventStructure = self.__getDistributionOfAttributesAndEventStructure( log, attributeIgnorelist) self.__query_log = tv_query_log self.__log = log def __addTraceToAttribute(self, log): trace_variants = dict() for trace in log: variant = "" for event in trace: variant = variant + "@" + event["concept:name"] trace.attributes["variant"] = variant traceSet = trace_variants.get(variant, set()) traceSet.add(trace) trace_variants[variant] = traceSet return trace_variants def __getIgnorelistOfAttributes(self): ignorelist = set() ignorelist.add("concept:name") ignorelist.add("variant") ignorelist.add(self.__timestamp) return ignorelist def __handleVariantsWithSameCount(self, variants, traceMatching): for variant in variants: for trace in self.__trace_variants_query[variant]: traceMatching[trace.attributes["concept:name"]] = self.__trace_variants_log[variant].pop() del self.__trace_variants_log[variant] del self.__trace_variants_query[variant] def __handleVariantsUnderrepresentedInQuery(self, variants, traceMatching): for variant in variants: if variant in self.__trace_variants_query: for trace in self.__trace_variants_query.get(variant, list()): traceMatching[trace.attributes["concept:name"]] = self.__trace_variants_log[variant].pop() del self.__trace_variants_query[variant] def __handleVariantsOverrepresentedInQuery(self, variants, traceMatching): for variant in variants: for trace in self.__trace_variants_log[variant]: traceFromQuery = self.__trace_variants_query[variant].pop() traceMatching[traceFromQuery.attributes["concept:name"]] = trace del self.__trace_variants_log[variant] def __getDistanceVariants(self, variant1, variant2): if variant1 not in self.__distanceMatrix: self.__distanceMatrix[variant1] = dict() if variant2 not in self.__distanceMatrix[variant1]: distance = trace_levenshtein(variant1, variant2) self.__distanceMatrix[variant1][variant2] = distance else: distance = self.__distanceMatrix[variant1][variant2] return distance def __findCLosestVariantInLog(self, variant, log): closestVariant = None closestDistance = sys.maxsize for comparisonVariant in log.keys(): distance = self.__getDistanceVariants(variant, comparisonVariant) if distance < closestDistance: closestVariant = comparisonVariant closestDistance = distance return closestVariant def __findOptimalMatches(self): rows = list() progress = tqdm(total=len(self.__query_log), desc="matching query traces, matched traces :: ") for traceQuery in self.__query_log: row = list() for traceLog in self.__log: row.append(self.__getDistanceVariants(traceQuery.attributes["variant"], traceLog.attributes["variant"])) rows.append(row) progress.update() progress.close() del progress distanceMatrix = np.array(rows) row_ind, col_ind = linear_sum_assignment(distanceMatrix) traceMatching = dict() for (traceQueryPos, traceLogPos) in zip(row_ind, col_ind): traceMatching[self.__query_log[traceQueryPos].attributes["concept:name"]] = self.__log[traceLogPos] return traceMatching def __matchTraces(self, traceMatching): for variant in self.__trace_variants_query.keys(): closestVariant = self.__findCLosestVariantInLog(variant, self.__trace_variants_log) for trace in self.__trace_variants_query[variant]: traceMatching[trace.attributes["concept:name"]] = self.__trace_variants_log[closestVariant].pop() if not self.__trace_variants_log[closestVariant]: del self.__trace_variants_log[closestVariant] if self.__trace_variants_log: closestVariant = self.__findCLosestVariantInLog(variant, self.__trace_variants_log) else: return def __getTraceMatching(self): traceMatching = dict() variantsWithSameCount = set() variantsUnderepresentedInQuery = set() variantsOverepresentedInQuery = set() for variant in self.__trace_variants_log.keys(): if len(self.__trace_variants_log[variant]) == len(self.__trace_variants_query.get(variant, set())): variantsWithSameCount.add(variant) elif len(self.__trace_variants_log[variant]) > len(self.__trace_variants_query.get(variant, set())) and len( self.__trace_variants_query.get(variant, set())) != set(): variantsUnderepresentedInQuery.add(variant) elif len(self.__trace_variants_log[variant]) < len(self.__trace_variants_query.get(variant, 0)): variantsOverepresentedInQuery.add(variant) self.__handleVariantsWithSameCount(variantsWithSameCount, traceMatching) self.__handleVariantsUnderrepresentedInQuery(variantsUnderepresentedInQuery, traceMatching) self.__handleVariantsOverrepresentedInQuery(variantsOverepresentedInQuery, traceMatching) self.__matchTraces(traceMatching) return traceMatching def __resolveTrace(self, traceInQuery, correspondingTrace, distributionOfAttributes): eventStacks = self.__transformTraceInEventStack(correspondingTrace) previousEvent = None # add trace attributes from the matched trace to the query trace ''' if not isinstance(correspondingTrace, list): for key in correspondingTrace.attributes: if (key != 'variant' and key != 'variant-index'): traceInQuery.attributes[key] = correspondingTrace.attributes[key] ''' for eventNr in range(0, len(traceInQuery)): currentEvent = traceInQuery[eventNr] activity = currentEvent["concept:name"] latestTimeStamp = self.__getLastTimestampTraceResolving(traceInQuery, eventNr) if activity in eventStacks: currentEvent = self.__getEventAndUpdateFromEventStacks(activity, eventStacks) if currentEvent[self.__timestamp] < latestTimeStamp: currentEvent[self.__timestamp] = self.__getNewTimeStamp(previousEvent, currentEvent, eventNr, distributionOfAttributes) else: currentEvent = self.__createRandomNewEvent(currentEvent, activity, distributionOfAttributes, previousEvent, eventNr) traceInQuery[eventNr] = currentEvent previousEvent = currentEvent return traceInQuery def __getEventAndUpdateFromEventStacks(self, activity, eventStacks): event = eventStacks[activity].popleft() if not eventStacks[activity]: del eventStacks[activity] return event def __getLastTimestampTraceResolving(self, trace, eventNr): if eventNr == 0: latestTimeStamp = trace[eventNr][self.__timestamp] else: latestTimeStamp = trace[eventNr - 1][self.__timestamp] return latestTimeStamp def __transformTraceInEventStack(self, trace): eventStacks = dict() for event in trace: stack = eventStacks.get(event["concept:name"], deque()) stack.append(event) eventStacks[event["concept:name"]] = stack return eventStacks def __createRandomNewEvent(self, event, activity, distributionOfAttributes, previousEvent, eventNr): for attribute in self.__eventStructure[activity]: if attribute in distributionOfAttributes and attribute not in event and attribute != self.__timestamp: event[attribute] = random.choice(distributionOfAttributes[attribute]) elif attribute == self.__timestamp: event[self.__timestamp] = self.__getNewTimeStamp(previousEvent, event, eventNr, distributionOfAttributes) return event def __getNewTimeStamp(self, previousEvent, currentEvent, eventNr, distributionOfAttributes): if eventNr == 0: timestamp = random.choice(self.__allTimestamps) else: if previousEvent["concept:name"] in distributionOfAttributes[self.__timestamp]: timestamp = previousEvent[self.__timestamp] + random.choice( distributionOfAttributes[self.__timestamp][previousEvent["concept:name"]].get( currentEvent["concept:name"], self.__allTimeStampDifferences)) else: timestamp = previousEvent[self.__timestamp] + random.choice(self.__allTimeStampDifferences) return timestamp def __resolveTraceMatching(self, traceMatching, distributionOfAttributes, fillUp): log = obj.EventLog() for trace in self.__query_log: traceID = trace.attributes["concept:name"] if fillUp or traceID in traceMatching: matchedTrace = self.__resolveTrace(trace, traceMatching.get(traceID, list()), distributionOfAttributes) log.append(matchedTrace) return log def __handleAttributesOfDict(self, dictOfAttributes, distributionOfAttributes, attributeIgnorelist, previousEvent=None): for attribute in dictOfAttributes.keys(): if attribute not in attributeIgnorelist: distribution = distributionOfAttributes.get(attribute, list()) distribution.append(dictOfAttributes[attribute]) distributionOfAttributes[attribute] = distribution elif attribute == self.__timestamp and previousEvent is not None: self.__handleTimeStamp(distributionOfAttributes, previousEvent, dictOfAttributes) def __handleTimeStamp(self, distributionOfAttributes, previousEvent, currentEvent): timeStampsDicts = distributionOfAttributes.get(self.__timestamp, dict()) activityDict = timeStampsDicts.get(previousEvent["concept:name"], dict()) timeStampsDicts[previousEvent["concept:name"]] = activityDict distribution = activityDict.get(currentEvent["concept:name"], list()) timeStampDifference = currentEvent[self.__timestamp] - previousEvent[self.__timestamp] distribution.append(timeStampDifference) activityDict[currentEvent["concept:name"]] = distribution distributionOfAttributes[self.__timestamp] = timeStampsDicts self.__allTimestamps.append(currentEvent[self.__timestamp]) self.__allTimeStampDifferences.append(timeStampDifference) def __getDistributionOfAttributesAndEventStructure(self, log, attributeIgnorelist): distributionOfAttributes = dict() eventStructure = dict() for trace in log: #self.__handleAttributesOfDict(trace.attributes, distributionOfAttributes, attributeIgnorelist) previousEvent = None currentEvent = None for eventNr in range(0, len(trace)): if currentEvent is not None: previousEvent = currentEvent currentEvent = trace[eventNr] self.__handleAttributesOfDict(currentEvent, distributionOfAttributes, attributeIgnorelist, previousEvent) if not currentEvent["concept:name"] in eventStructure: attributesOfEvent = set(currentEvent.keys()) attributesOfEvent.remove("concept:name") eventStructure[currentEvent["concept:name"]] = attributesOfEvent return distributionOfAttributes, eventStructure
[docs] def matchQueryToLog(self, fillUp=True, greedy=False): if greedy: traceMatching = self.__getTraceMatching() else: traceMatching = self.__findOptimalMatches() matched_log = self.__resolveTraceMatching(traceMatching, self.__distributionOfAttributes, fillUp) return matched_log
[docs] def getAttributeDistribution(self): return self.__distributionOfAttributes
[docs] def getTimeStampData(self): return self.__allTimestamps, self.__allTimeStampDifferences