Source code for pm4py.algo.discovery.inductive.fall_through.activity_concurrent

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from collections import Counter
from typing import Optional, Tuple, List, Any, Dict

from pm4py.algo.discovery.inductive.cuts.factory import CutFactory
from pm4py.algo.discovery.inductive.dtypes.im_ds import IMDataStructureUVCL
from pm4py.algo.discovery.inductive.fall_through.abc import FallThrough
from pm4py.algo.discovery.inductive.variants.instances import IMInstance
from pm4py.objects.process_tree.obj import ProcessTree, Operator
from pm4py.util.compression import util as comut
from pm4py.util.compression.dtypes import UVCL
from enum import Enum
from pm4py.util import exec_utils, constants


[docs] class Parameters(Enum): MULTIPROCESSING = "multiprocessing"
[docs] class ActivityConcurrentUVCL(FallThrough[IMDataStructureUVCL]): MULTI_PROCESSING_LOWER_BOUND = 20 @classmethod def _process_candidate( cls, c: Any, log: UVCL, queue=None, ev=None, parameters: Optional[Dict[str, Any]] = None, ): l_alt = Counter() for t in log: l_alt[tuple(filter(lambda e: e != c, t))] = log[t] cut = cls._find_cut( IMDataStructureUVCL(l_alt), ev, parameters=parameters ) if queue is not None: queue.put((c, cut)) return cut if cut is not None else None @classmethod def _get_candidate( cls, obj: IMDataStructureUVCL, pool, manager, parameters: Optional[Dict[str, Any]] = None, ) -> Optional[Any]: if parameters is None: parameters = {} enable_multiprocessing = exec_utils.get_param_value( Parameters.MULTIPROCESSING, parameters, constants.ENABLE_MULTIPROCESSING_DEFAULT, ) log = obj.data_structure candidates = sorted(list(comut.get_alphabet(log))) if ( pool is None or manager is None or not enable_multiprocessing or len(candidates) <= ActivityConcurrentUVCL.MULTI_PROCESSING_LOWER_BOUND ): for a in candidates: cut = cls._process_candidate(a, log, parameters=parameters) if cut is not None: return a else: q = manager.Queue() ev = manager.Event() # avoid dangerous freealloc from Python's garbage collector manager.support_list.append(q) manager.support_list.append(ev) for a in candidates: pool.apply_async( cls._process_candidate, (a, log, q, ev, parameters) ) potentials = set(candidates) while len(potentials) > 0: (c, cut) = q.get(block=True) if cut is None: potentials.remove(c) else: ev.set() return c return None @classmethod def _find_cut( cls, obj: IMDataStructureUVCL, ev, parameters: Optional[Dict[str, Any]] = None, ) -> Optional[Tuple[ProcessTree, List[IMDataStructureUVCL]]]: for c in CutFactory.get_cuts( obj, IMInstance.IM, parameters=parameters ): if ev is not None and ev.is_set(): return None r = c.apply(obj, parameters) if r is not None: return r return None
[docs] @classmethod def holds( cls, obj: IMDataStructureUVCL, parameters: Optional[Dict[str, Any]] = None, ) -> bool: return cls._get_candidate(obj, None, None, parameters) is not None
[docs] @classmethod def apply( cls, obj: IMDataStructureUVCL, pool=None, manager=None, parameters: Optional[Dict[str, Any]] = None, ) -> Optional[Tuple[ProcessTree, List[IMDataStructureUVCL]]]: candidate = cls._get_candidate(obj, pool, manager, parameters) if candidate is None: return None log = obj.data_structure l_a = Counter() l_other = Counter() for t in log: l_a.update({tuple(filter(lambda e: e == candidate, t)): log[t]}) l_other.update( {tuple(filter(lambda e: e != candidate, t)): log[t]} ) return ProcessTree(operator=Operator.PARALLEL), [ IMDataStructureUVCL(l_a), IMDataStructureUVCL(l_other), ]