Source code for pm4py.algo.anonymization.pripel.util.AttributeAnonymizer

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or 
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import warnings
from datetime import timedelta

import diffprivlib.mechanisms as privacyMechanisms
from tqdm.auto import tqdm


[docs] class AttributeAnonymizer: def __init__(self): self.__timestamp = "time:timestamp" self.__ignorelist = self.__getIgnorelistOfAttributes() self.__sensitivity = "sensitivity" self.__max = "max" self.__min = "min" self.__infectionSuspected = list() def __getIgnorelistOfAttributes(self): ignorelist = set() ignorelist.add("concept:name") ignorelist.add(self.__timestamp) return ignorelist def __retrieveAttributeDomains(self, distributionOfAttributes, dataTypesOfAttributes): domains = dict() for attribute in dataTypesOfAttributes.keys(): if dataTypesOfAttributes[attribute] in (int, float): domain = dict() domain[self.__max] = max(distributionOfAttributes[attribute]) domain[self.__min] = min(distributionOfAttributes[attribute]) domain[self.__sensitivity] = abs(domain[self.__max] - domain[self.__min]) domains[attribute] = domain return domains def __determineDataType(self, distributionOfAttributes): dataTypesOfAttributes = dict() for attribute in distributionOfAttributes.keys(): if attribute not in self.__ignorelist: dataTypesOfAttributes[attribute] = type(distributionOfAttributes[attribute][0]) return dataTypesOfAttributes def __getPotentialValues(self, distributionOfAttributes, dataTypesOfAttributes): potentialValues = dict() for attribute in dataTypesOfAttributes: if dataTypesOfAttributes[attribute] is str: distribution = distributionOfAttributes[attribute] values = set(distribution) potentialValues[attribute] = values return potentialValues def __setupBooleanMechanism(self, epsilon): binaryMechanism = privacyMechanisms.Binary(epsilon=epsilon, value0=str(True), value1=str(False)) return binaryMechanism def __anonymizeAttribute(self, value, mechanism): isBoolean = False isInt = False if mechanism is not None: if type(value) is bool: isBoolean = True value = str(value) if type(value) is int: isInt = True value = mechanism.randomise(value) if isBoolean: value = eval(value) if isInt: value = int(round(value)) return value def __addBooleanMechanisms(self, epsilon, mechanisms, dataTypesOfAttributes): binaryMechanism = self.__setupBooleanMechanism(epsilon) for attribute in dataTypesOfAttributes.keys(): if dataTypesOfAttributes[attribute] is bool: mechanisms[attribute] = binaryMechanism return mechanisms def __addNumericMechanisms(self, epsilon, mechanisms, domains): for attribute in domains.keys(): sensitivity = domains[attribute][self.__sensitivity] lowerDomainBound = domains[attribute][self.__min] upperDomainBound = domains[attribute][self.__max] laplaceMechanism = privacyMechanisms.LaplaceBoundedDomain(epsilon=epsilon, sensitivity=sensitivity, lower=lowerDomainBound, upper=upperDomainBound) mechanisms[attribute] = laplaceMechanism return mechanisms def __setupUniformUtilityList(self, potentialValues, attribute): if len(potentialValues) >= 2000: warnings.warn( '\nThe attribute ' + attribute + ' has ' + str( len(potentialValues)) + ' different values in the log.\nTo anonymize this attribute the exponential mechanism for achieving differential privacy on categorical data must work with a list that is ' + str( len(potentialValues) * len(potentialValues)) + ' elements long.', RuntimeWarning, 2) utilityList = [] for x in potentialValues: for y in potentialValues: utilityList.append([x, y, 1]) return utilityList def __addCategoricalMechanisms(self, epsilon, mechanisms, dataTypesOfAttributes, potentialValues): for attribute in dataTypesOfAttributes.keys(): if dataTypesOfAttributes[attribute] is str and attribute != "variant": utilityList = self.__setupUniformUtilityList(potentialValues[attribute], attribute) if len(utilityList) > 0: exponentialMechanism = privacyMechanisms.ExponentialCategorical(epsilon=epsilon, utility_list=utilityList) mechanisms[attribute] = exponentialMechanism return mechanisms def __getTimestamp(self, trace, eventNr, allTimestamps): if eventNr <= 0: return min(allTimestamps) elif eventNr >= len(trace): return max(allTimestamps) else: return trace[eventNr][self.__timestamp] def __anonymizeTimeStamps(self, timestamp, previousTimestamp, nextTimestamp, sensitivity, minTimestampDifference, mechanism): upperPotentialDifference = (nextTimestamp - previousTimestamp).total_seconds() currentDifference = (timestamp - previousTimestamp).total_seconds() if upperPotentialDifference < 0: upperPotentialDifference = currentDifference mechanism.sensitivity = sensitivity mechanism.lower = minTimestampDifference mechanism.upper = upperPotentialDifference timestamp = previousTimestamp + timedelta(seconds=currentDifference) return timestamp def __setupMechanisms(self, epsilon, distributionOfAttributes, lower, upper, sensitivity): mechanisms = dict() dataTypesOfAttributes = self.__determineDataType(distributionOfAttributes) mechanisms = self.__addBooleanMechanisms(epsilon, mechanisms, dataTypesOfAttributes) domains = self.__retrieveAttributeDomains(distributionOfAttributes, dataTypesOfAttributes) mechanisms = self.__addNumericMechanisms(epsilon, mechanisms, domains) potentialValues = self.__getPotentialValues(distributionOfAttributes, dataTypesOfAttributes) mechanisms = self.__addCategoricalMechanisms(epsilon, mechanisms, dataTypesOfAttributes, potentialValues) mechanisms[self.__timestamp] = privacyMechanisms.LaplaceBoundedDomain(epsilon=epsilon, lower=lower, upper=upper, sensitivity=sensitivity) return mechanisms def __getTimestampDomain(self, trace, eventNr, distributionOfTimestamps, allTimestampDifferences): timestampDomain = self.__domainTimestampData.get(trace[eventNr - 1]["concept:name"], None) if timestampDomain is not None: timestampDomain = timestampDomain.get(trace[eventNr]["concept:name"], None) if timestampDomain is None: timestampDistribution = None if eventNr != 0: dictTimestampDifference = distributionOfTimestamps.get(trace[eventNr - 1]["concept:name"], None) if dictTimestampDifference is not None: timestampDistribution = dictTimestampDifference.get(trace[eventNr]["concept:name"], None) if timestampDistribution is None: maxTimestampDifference = self.__maxAllTimestampDifferences minTimestampDifference = self.__minAllTimestampDifferences else: maxTimestampDifference = max(timestampDistribution) minTimestampDifference = min(timestampDistribution) sensitivity = abs(maxTimestampDifference - minTimestampDifference).total_seconds() sensitivity = max(sensitivity, 1.0) timestampDomain = dict() timestampDomain["sensitivity"] = sensitivity timestampDomain["minTimeStampInLog"] = min(allTimestampDifferences).total_seconds() if self.__domainTimestampData.get(trace[eventNr - 1]["concept:name"], None) is None: self.__domainTimestampData[trace[eventNr - 1]["concept:name"]] = dict() self.__domainTimestampData[trace[eventNr - 1]["concept:name"]][ trace[eventNr]["concept:name"]] = timestampDomain return timestampDomain["sensitivity"], timestampDomain["minTimeStampInLog"] def __performTimestampShift(self, trace, mechanism): beginOfTrace = trace[0][self.__timestamp] deltaBeginOfLogToTrace = (self.__minAllTimestamp - beginOfTrace).total_seconds() endOfTrace = trace[-1][self.__timestamp] traceDuration = (endOfTrace - beginOfTrace).total_seconds() deltaEndOfLogToTrace = (self.__maxAllTimestamp - beginOfTrace).total_seconds() upperBound = deltaEndOfLogToTrace - traceDuration if deltaBeginOfLogToTrace >= upperBound: upperBound = abs((self.__maxAllTimestamp - beginOfTrace).total_seconds()) mechanism.lower = deltaBeginOfLogToTrace mechanism.upper = upperBound timestampShift = timedelta(seconds=mechanism.randomise(0.0)) for event in trace: event[self.__timestamp] = event[self.__timestamp] + timestampShift
[docs] def anonymize(self, log, distributionOfAttributes, epsilon, allTimestampDifferences, allTimestamps): self.__maxAllTimestampDifferences = max(allTimestampDifferences) self.__minAllTimestampDifferences = min(allTimestampDifferences) self.__maxAllTimestamp = max(allTimestamps) self.__minAllTimestamp = min(allTimestamps) sensitivity = (self.__maxAllTimestamp - self.__minAllTimestamp).total_seconds() # lower and upper values are just for initialisation, they get later overwritten in __anonymizeTimeStamps # and __performTimestampShift lower = 0 upper = 1 timeShiftMechanism = privacyMechanisms.LaplaceBoundedDomain(epsilon=epsilon, sensitivity=sensitivity, lower=lower, upper=upper) mechanisms = self.__setupMechanisms(epsilon, distributionOfAttributes, lower, upper, sensitivity) self.__domainTimestampData = dict() i = 0 progress = tqdm(total=len(log), desc="attribute anonymization, anonymized traces :: ") for trace in log: ''' # trace attribute anonymization if not isinstance(trace, list): for attribute in trace.attributes.keys(): if (attribute != 'variant' and attribute != 'variant-index'): trace.attributes[attribute] = self.__anonymizeAttribute(trace.attributes[attribute], mechanisms.get(attribute, None)) ''' # event attribute anonymization for eventNr in range(0, len(trace)): event = trace[eventNr] for attribute in event.keys(): if attribute != self.__timestamp: event[attribute] = self.__anonymizeAttribute(event[attribute], mechanisms.get(attribute, None)) if attribute == "InfectionSuspected" and eventNr == 0: self.__infectionSuspected.append(event[attribute]) elif eventNr > 0: previousTimestamp = self.__getTimestamp(trace, eventNr - 1, allTimestamps) nextTimestamp = self.__getTimestamp(trace, eventNr + 1, allTimestamps) sensitivity, minTimestampDifference = self.__getTimestampDomain(trace, eventNr, distributionOfAttributes[ self.__timestamp], allTimestampDifferences) event[attribute] = self.__anonymizeTimeStamps(event[attribute], previousTimestamp, nextTimestamp, sensitivity, minTimestampDifference, mechanisms[self.__timestamp]) elif eventNr == 0: self.__performTimestampShift(trace, timeShiftMechanism) i = i + 1 progress.update() progress.close() del progress return log