Source code for pm4py.algo.evaluation.earth_mover_distance.variants.pyemd

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from typing import Optional, Dict, Any, Union, List
import numpy as np
from pm4py.util.regex import SharedObj, get_new_char
from pm4py.util import string_distance
from pm4py.util import exec_utils
from scipy.optimize import linprog
import importlib.util


[docs] class Parameters: STRING_DISTANCE = "string_distance" USE_FAST_EMD = "use_fast_emd" # New parameter to choose between implementations
[docs] def normalized_levensthein(s1, s2): return float(string_distance.levenshtein(s1, s2)) / float( max(len(s1), len(s2)) )
[docs] def get_act_correspondence(activities, parameters=None): if parameters is None: parameters = {} shared_obj = SharedObj() ret = {} for act in activities: get_new_char(act, shared_obj) ret[act] = shared_obj.mapping_dictio[act] return ret
[docs] def encode_two_languages(lang1, lang2, parameters=None): if parameters is None: parameters = {} all_activities = sorted( list( set(y for x in lang1 for y in x).union( set(y for x in lang2 for y in x) ) ) ) acts_corresp = get_act_correspondence( all_activities, parameters=parameters ) enc1 = {} enc2 = {} for k in lang1: new_key = "".join(acts_corresp[i] for i in k) enc1[new_key] = lang1[k] for k in lang2: new_key = "".join(acts_corresp[i] for i in k) enc2[new_key] = lang2[k] for x in enc1: if x not in enc2: enc2[x] = 0.0 for x in enc2: if x not in enc1: enc1[x] = 0.0 enc1 = [(x, y) for x, y in enc1.items()] enc2 = [(x, y) for x, y in enc2.items()] enc1 = sorted(enc1, reverse=True, key=lambda x: x[0]) enc2 = sorted(enc2, reverse=True, key=lambda x: x[0]) return enc1, enc2
[docs] class EMDCalculator: """ A class that provides an EMD (Earth Mover's Distance) computation similar to what `pyemd` offers. It uses linear programming via `scipy.optimize.linprog` to solve the underlying flow problem. Usage: ------ emd_value = EMDCalculator.emd(first_histogram, second_histogram, distance_matrix) """
[docs] @staticmethod def emd( first_histogram: np.ndarray, second_histogram: np.ndarray, distance_matrix: np.ndarray, ) -> float: """ Compute the Earth Mover's Distance given two histograms and a distance matrix. Parameters ---------- first_histogram : np.ndarray The first distribution (array of nonnegative numbers). second_histogram : np.ndarray The second distribution (array of nonnegative numbers). distance_matrix : np.ndarray Matrix of distances between points of the two distributions. Returns ------- float The computed EMD value. """ # Ensure the histograms sum to the same total sum1 = np.sum(first_histogram) sum2 = np.sum(second_histogram) if not np.isclose(sum1, sum2): raise ValueError( "Histograms must sum to the same total for EMD calculation." ) n = len(first_histogram) m = len(second_histogram) # Flatten the distance matrix c = distance_matrix.flatten() # Constraints: # sum_j F_ij = first_histogram[i] for each i # sum_i F_ij = second_histogram[j] for each j # F_ij >= 0 # We have n "row sum" constraints and m "column sum" constraints. A_eq = [] b_eq = [] # Row constraints for i in range(n): row_constraint = np.zeros(n * m) for j in range(m): row_constraint[i * m + j] = 1 A_eq.append(row_constraint) b_eq.append(first_histogram[i]) # Column constraints for j in range(m): col_constraint = np.zeros(n * m) for i in range(n): col_constraint[i * m + j] = 1 A_eq.append(col_constraint) b_eq.append(second_histogram[j]) A_eq = np.array(A_eq) b_eq = np.array(b_eq) # Bounds: F_ij >= 0 bounds = [(0, None) for _ in range(n * m)] # Solve the LP: # minimize c^T x subject to A_eq x = b_eq and x >= 0 res = linprog(c, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method="highs") if res.status != 0: raise ValueError( f"Linear programming failed. Status: { res.status}, Message: { res.message}") # The optimal value is the EMD return res.fun
[docs] class POTEMDCalculator: """ A faster implementation of EMD using the POT (Python Optimal Transport) library. Falls back to the SciPy implementation if POT is not available. Usage: ------ emd_value = POTEMDCalculator.emd(first_histogram, second_histogram, distance_matrix) """
[docs] @staticmethod def is_pot_available(): """Check if POT (Python Optimal Transport) is available.""" return importlib.util.find_spec("ot") is not None
[docs] @staticmethod def emd( first_histogram: np.ndarray, second_histogram: np.ndarray, distance_matrix: np.ndarray, ) -> float: """ Compute the Earth Mover's Distance using POT if available, otherwise fall back to SciPy. Parameters ---------- first_histogram : np.ndarray The first distribution (array of nonnegative numbers). second_histogram : np.ndarray The second distribution (array of nonnegative numbers). distance_matrix : np.ndarray Matrix of distances between points of the two distributions. Returns ------- float The computed EMD value. """ # Check if POT is available if POTEMDCalculator.is_pot_available(): try: import ot # Normalize histograms to sum to 1.0 if they don't sum to the same value sum1 = np.sum(first_histogram) sum2 = np.sum(second_histogram) if not np.isclose(sum1, sum2): # Normalize both histograms to sum to 1.0 a = first_histogram / sum1 b = second_histogram / sum2 else: a = first_histogram b = second_histogram # Ensure arrays have the right type a = np.asarray(a, dtype=np.float64) b = np.asarray(b, dtype=np.float64) M = np.asarray(distance_matrix, dtype=np.float64) # Use POT's EMD computation # Regularized version can be used with reg=1e-3 for efficiency when needed emd_value = ot.emd2(a, b, M) return emd_value except ImportError: # Fall back to SciPy implementation return EMDCalculator.emd(first_histogram, second_histogram, distance_matrix) except Exception as e: # If POT fails for any reason, fall back to SciPy implementation print(f"POT EMD calculation failed: {str(e)}. Falling back to SciPy implementation.") return EMDCalculator.emd(first_histogram, second_histogram, distance_matrix) else: # Fall back to SciPy implementation return EMDCalculator.emd(first_histogram, second_histogram, distance_matrix)
[docs] def apply( lang1: Dict[List[str], float], lang2: Dict[List[str], float], parameters: Optional[Dict[Union[str, Parameters], Any]] = None, ) -> float: if parameters is None: parameters = {} distance_function = exec_utils.get_param_value( Parameters.STRING_DISTANCE, parameters, normalized_levensthein ) # New parameter to choose between implementations use_fast_emd = exec_utils.get_param_value( Parameters.USE_FAST_EMD, parameters, True ) enc1, enc2 = encode_two_languages(lang1, lang2, parameters=parameters) first_histogram = np.array([x[1] for x in enc1]) second_histogram = np.array([x[1] for x in enc2]) distance_matrix = [] for x in enc1: row = [] for y in enc2: dist = distance_function(x[0], y[0]) row.append(float(dist)) distance_matrix.append(row) distance_matrix = np.array(distance_matrix) # Choose which EMD implementation to use if use_fast_emd and POTEMDCalculator.is_pot_available(): ret = POTEMDCalculator.emd(first_histogram, second_histogram, distance_matrix) else: # Use the original SciPy implementation ret = EMDCalculator.emd(first_histogram, second_histogram, distance_matrix) return ret