'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from abc import ABC
from collections import Counter
from typing import List, Optional, Collection, Any, Tuple, Generic, Dict
from pm4py.util import nx_utils
from pm4py.algo.discovery.inductive.cuts.abc import Cut, T
from pm4py.algo.discovery.inductive.dtypes.im_dfg import InductiveDFG
from pm4py.algo.discovery.inductive.dtypes.im_ds import (
IMDataStructureUVCL,
IMDataStructureDFG,
)
from pm4py.objects.dfg import util as dfu
from pm4py.objects.dfg.obj import DFG
from pm4py.objects.process_tree.obj import Operator, ProcessTree
from pm4py.util.compression.dtypes import UVCL
[docs]
class LoopCut(Cut[T], ABC, Generic[T]):
[docs]
@classmethod
def operator(
cls, parameters: Optional[Dict[str, Any]] = None
) -> ProcessTree:
return ProcessTree(operator=Operator.LOOP)
[docs]
@classmethod
def holds(
cls, obj: T, parameters: Optional[Dict[str, Any]] = None
) -> Optional[List[Collection[Any]]]:
"""
Finds a loop cut in the DFG and returns exactly two groups:
- groups[0]: the 'do' group (start ∪ end activities plus merged components as per checks)
- groups[1]: a single 'redo' group obtained by merging all remaining non-empty groups
If no non-empty redo part remains, returns None.
"""
dfg = obj.dfg
start_activities = set(dfg.start_activities.keys())
end_activities = set(dfg.end_activities.keys())
if len(dfg.graph) == 0:
return None
# Initial groups: do-part is start ∪ end; other parts are connected components after removing boundaries
groups = [start_activities.union(end_activities)]
for c in cls._compute_connected_components(dfg, start_activities, end_activities):
groups.append(set(c.nodes))
# Apply the original reachability/completeness checks
groups = cls._exclude_sets_non_reachable_from_start(dfg, start_activities, end_activities, groups)
groups = cls._exclude_sets_no_reachable_from_end(dfg, start_activities, end_activities, groups)
groups = cls._check_start_completeness(dfg, start_activities, end_activities, groups)
groups = cls._check_end_completeness(dfg, start_activities, end_activities, groups)
# Keep only non-empty groups
groups = [set(g) for g in groups if len(g) > 0]
# Require at least a do group and something to redo
if len(groups) <= 1:
return None
# Merge all remaining non-empty groups (from the second to the last) into a single redo group
redo_merged = set()
for g in groups[1:]:
redo_merged.update(g)
return [set(groups[0]), redo_merged]
@classmethod
def _check_start_completeness(
cls,
dfg: DFG,
start_activities: Collection[Any],
end_activities: Collection[Any],
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[Collection[Any]]:
i = 1
while i < len(groups):
merge = False
for a in groups[i]:
if merge:
break
for x, b in dfg.graph:
if x == a and b in start_activities:
for s in start_activities:
if not (a, s) in dfg.graph:
merge = True
if merge:
groups[0] = set(groups[0]).union(groups[i])
del groups[i]
continue
i = i + 1
return groups
@classmethod
def _check_end_completeness(
cls,
dfg: DFG,
start_activities: Collection[Any],
end_activities: Collection[Any],
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[Collection[Any]]:
i = 1
while i < len(groups):
merge = False
for a in groups[i]:
if merge:
break
for b, x in dfg.graph:
if x == a and b in end_activities:
for e in end_activities:
if not (e, a) in dfg.graph:
merge = True
if merge:
groups[0] = set(groups[0]).union(groups[i])
del groups[i]
continue
i = i + 1
return groups
@classmethod
def _exclude_sets_non_reachable_from_start(
cls,
dfg: DFG,
start_activities: Collection[Any],
end_activities: Collection[Any],
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[Collection[Any]]:
for a in set(start_activities).difference(set(end_activities)):
for x, b in dfg.graph:
if x == a:
group_a, group_b = None, None
for group in groups:
group_a = group if a in group else group_a
group_b = group if b in group else group_b
groups = [
group
for group in groups
if group != group_a and group != group_b
]
# we are always merging on the do-part
groups.insert(0, group_a.union(group_b))
return groups
@classmethod
def _exclude_sets_no_reachable_from_end(
cls,
dfg: DFG,
start_activities: Collection[Any],
end_activities: Collection[Any],
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[Collection[Any]]:
for b in set(end_activities).difference(start_activities):
for a, x in dfg.graph:
if x == b:
group_a, group_b = None, None
for group in groups:
group_a = group if a in group else group_a
group_b = group if b in group else group_b
groups = [
group
for group in groups
if group != group_a and group != group_b
]
groups.insert(0, group_a.union(group_b))
return groups
@classmethod
def _compute_connected_components(
cls,
dfg: DFG,
start_activities: Collection[Any],
end_activities: Collection[Any],
parameters: Optional[Dict[str, Any]] = None,
):
nxd = dfu.as_nx_graph(dfg)
[
nxd.remove_edge(a, b)
for (a, b) in dfg.graph
if a in start_activities
or a in end_activities
or b in start_activities
or b in end_activities
]
[nxd.remove_node(a) for a in start_activities if nxd.has_node(a)]
[nxd.remove_node(a) for a in end_activities if nxd.has_node(a)]
nxu = nxd.to_undirected()
return [
nxd.subgraph(c).copy() for c in nx_utils.connected_components(nxu)
]
[docs]
class LoopCutUVCL(LoopCut[IMDataStructureUVCL]):
[docs]
@classmethod
def project(
cls,
obj: IMDataStructureUVCL,
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[IMDataStructureUVCL]:
"""
Split the (compressed) log into one 'do' log and N 'redo' logs.
Traces are segmented into do/redo slices; redo slices are routed to the
redo log whose activity set overlaps most with the slice.
"""
do = set(groups[0])
redo_groups = [set(g) for g in groups[1:]]
# For quick membership checks
redo_activities = set().union(*redo_groups) if redo_groups else set()
do_log = Counter()
redo_logs = [Counter() for _ in redo_groups]
for t, card in obj.data_structure.items():
do_trace: Tuple[Any, ...] = tuple()
redo_trace: Tuple[Any, ...] = tuple()
for e in t:
if e in do:
do_trace += (e,)
# flush redo if we were inside a redo slice
if redo_trace:
redo_logs = cls._append_trace_to_redo_log(
redo_trace, redo_logs, redo_groups, card
)
redo_trace = tuple()
elif e in redo_activities:
redo_trace += (e,)
# flush do if we were inside a do slice
if do_trace:
do_log.update({do_trace: card})
do_trace = tuple()
else:
# Safety: if an event is in neither do nor any redo group,
# flush any current slice and ignore the event
if do_trace:
do_log.update({do_trace: card})
do_trace = tuple()
if redo_trace:
redo_logs = cls._append_trace_to_redo_log(
redo_trace, redo_logs, redo_groups, card
)
redo_trace = tuple()
# Flush tail slices
if redo_trace:
redo_logs = cls._append_trace_to_redo_log(
redo_trace, redo_logs, redo_groups, card
)
do_log.update({do_trace: card}) # keep empty do slices, consistent with original
logs = [do_log] + redo_logs
return [IMDataStructureUVCL(l) for l in logs]
@classmethod
def _append_trace_to_redo_log(
cls,
redo_trace: Tuple[Any, ...],
redo_logs: List[UVCL],
redo_groups: List[Collection[Any]],
cardinality: int,
parameters: Optional[Dict[str, Any]] = None,
) -> List[UVCL]:
"""
Append a redo slice to the best matching redo log (largest activity overlap).
"""
if not redo_logs:
return redo_logs
activities = set(redo_trace)
overlaps = [
(i, len(activities.intersection(set(redo_groups[i]))))
for i in range(len(redo_groups))
]
overlaps.sort(key=lambda x: (x[1], x[0]), reverse=True)
target = overlaps[0][0]
redo_logs[target].update({redo_trace: cardinality})
return redo_logs
[docs]
class LoopCutDFG(LoopCut[IMDataStructureDFG]):
[docs]
@classmethod
def project(
cls,
obj: IMDataStructureUVCL,
groups: List[Collection[Any]],
parameters: Optional[Dict[str, Any]] = None,
) -> List[IMDataStructureDFG]:
dfg = obj.dfg
dfgs = []
skippable = [False, False]
for gind, g in enumerate(groups):
dfn = DFG()
for a, b in dfg.graph:
if a in g and b in g:
dfn.graph[(a, b)] = dfg.graph[(a, b)]
if b in dfg.start_activities and a in dfg.end_activities:
skippable[1] = True
if gind == 0:
for a in dfg.start_activities:
if a in g:
dfn.start_activities[a] = dfg.start_activities[a]
else:
skippable[0] = True
for a in dfg.end_activities:
if a in g:
dfn.end_activities[a] = dfg.end_activities[a]
else:
skippable[0] = True
elif gind == 1:
for a in g:
dfn.start_activities[a] = 1
dfn.end_activities[a] = 1
dfgs.append(dfn)
return [
IMDataStructureDFG(InductiveDFG(dfg=dfgs[i], skip=skippable[i]))
for i in range(len(dfgs))
]