Source code for pm4py.algo.discovery.footprints.powl.variants.bottomup
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects.powl.obj import (
POWL,
Transition,
SilentTransition,
OperatorPOWL,
StrictPartialOrder,
FrequentTransition,
)
from pm4py.objects.process_tree.obj import Operator
from enum import Enum
from typing import Optional, Dict, Any, Set, Tuple
###############################################################################
# Enums and constants
###############################################################################
[docs]
class Outputs(Enum):
DFG = "dfg"
SEQUENCE = "sequence"
PARALLEL = "parallel"
START_ACTIVITIES = "start_activities"
END_ACTIVITIES = "end_activities"
ACTIVITIES = "activities"
SKIPPABLE = "skippable"
ACTIVITIES_ALWAYS_HAPPENING = "activities_always_happening"
MIN_TRACE_LENGTH = "min_trace_length"
TRACE = "trace"
START_ACTIVITIES = Outputs.START_ACTIVITIES.value
END_ACTIVITIES = Outputs.END_ACTIVITIES.value
ACTIVITIES = Outputs.ACTIVITIES.value
SKIPPABLE = Outputs.SKIPPABLE.value
SEQUENCE = Outputs.SEQUENCE.value
PARALLEL = Outputs.PARALLEL.value
ACTIVITIES_ALWAYS_HAPPENING = Outputs.ACTIVITIES_ALWAYS_HAPPENING.value
###############################################################################
# Utility functions to fix parallel vs sequence footprints
###############################################################################
[docs]
def fix_fp(sequence: Set[Tuple[str, str]], parallel: Set[Tuple[str, str]]):
"""
Fix footprints:
- Remove parallel relations from the sequence relations
- If A->B and B->A both appear in sequence, that is a conflict. Turn them into parallel.
"""
# Remove from sequence any pair that also appears in parallel
sequence = sequence.difference(parallel)
# If a pair (x,y) is in sequence and (y,x) is in sequence, they are actually parallel
conflicting = {(x[0], x[1]) for x in sequence if (x[1], x[0]) in sequence}
for el in conflicting:
parallel.add(el)
sequence.remove(el)
return sequence, parallel
###############################################################################
# Combine footprints
###############################################################################
[docs]
def merge_footprints(list_of_footprints):
"""
Utility function to merge a list of footprints dictionaries in a 'parallel/AND' sense.
Returns a dictionary that merges all sets and booleans according to the appropriate logic.
- 'activities' => union
- 'skippable' => AND
- 'start_activities' => union (will be refined later in partial order)
- 'end_activities' => union (refined later)
- 'activities_always_happening' => union for non-skippable children
- 'sequence', 'parallel' => union, then fix_fp at the end
"""
if not list_of_footprints:
return {
START_ACTIVITIES: set(),
END_ACTIVITIES: set(),
ACTIVITIES: set(),
SKIPPABLE: True,
SEQUENCE: set(),
PARALLEL: set(),
ACTIVITIES_ALWAYS_HAPPENING: set(),
}
merged = {
START_ACTIVITIES: set(list_of_footprints[0][START_ACTIVITIES]),
END_ACTIVITIES: set(list_of_footprints[0][END_ACTIVITIES]),
ACTIVITIES: set(list_of_footprints[0][ACTIVITIES]),
SKIPPABLE: list_of_footprints[0][SKIPPABLE],
SEQUENCE: set(list_of_footprints[0][SEQUENCE]),
PARALLEL: set(list_of_footprints[0][PARALLEL]),
ACTIVITIES_ALWAYS_HAPPENING: set(list_of_footprints[0][ACTIVITIES_ALWAYS_HAPPENING]),
}
for fp in list_of_footprints[1:]:
merged[ACTIVITIES] = merged[ACTIVITIES].union(fp[ACTIVITIES])
merged[SKIPPABLE] = merged[SKIPPABLE] and fp[SKIPPABLE]
merged[SEQUENCE] = merged[SEQUENCE].union(fp[SEQUENCE])
merged[PARALLEL] = merged[PARALLEL].union(fp[PARALLEL])
if not fp[SKIPPABLE]:
merged[ACTIVITIES_ALWAYS_HAPPENING] = merged[ACTIVITIES_ALWAYS_HAPPENING].union(
fp[ACTIVITIES_ALWAYS_HAPPENING]
)
return merged
###############################################################################
# Combining minimum trace length from child footprints
###############################################################################
[docs]
def combine_min_trace_length_par(children_fps):
"""
For partial order or parallel/AND: we must execute each non-skippable submodel at least once.
So the min trace length = sum of the min lengths of non-skippable children.
"""
total = 0
for fp in children_fps:
if not fp[SKIPPABLE]:
total += fp.get(Outputs.MIN_TRACE_LENGTH.value, 0)
return total
[docs]
def combine_min_trace_length_xor(children_fps):
"""
For XOR/choice: min trace length = min among children.
"""
if not children_fps:
return 0
return min(fp.get(Outputs.MIN_TRACE_LENGTH.value, 0) for fp in children_fps)
[docs]
def combine_min_trace_length_loop(do_fp, redo_fp):
"""
For LOOP (do, redo): must do 'do' at least once, so min length = min_length(do).
"""
return do_fp.get(Outputs.MIN_TRACE_LENGTH.value, 0)
###############################################################################
# Handling transitions
###############################################################################
[docs]
def get_footprints_of_transition(node: Transition) -> Dict[str, Any]:
"""
Footprints for a simple Transition (labeled or silent).
"""
if node.label is None:
# Silent transition => effectively "skip"
return {
START_ACTIVITIES: set(),
END_ACTIVITIES: set(),
ACTIVITIES: set(),
SKIPPABLE: True,
SEQUENCE: set(),
PARALLEL: set(),
ACTIVITIES_ALWAYS_HAPPENING: set(),
Outputs.MIN_TRACE_LENGTH.value: 0,
}
else:
# Labeled transition => a leaf with one activity
return {
START_ACTIVITIES: {node.label},
END_ACTIVITIES: {node.label},
ACTIVITIES: {node.label},
SKIPPABLE: False,
SEQUENCE: set(),
PARALLEL: set(),
ACTIVITIES_ALWAYS_HAPPENING: {node.label},
Outputs.MIN_TRACE_LENGTH.value: 1,
}
###############################################################################
# Handling XOR
###############################################################################
[docs]
def get_footprints_of_xor(node: OperatorPOWL, footprints_cache: Dict[POWL, Dict[str, Any]]) -> Dict[str, Any]:
"""
Footprints for an XOR/choice node:
- start_activities = union(children.start_activities)
- end_activities = union(children.end_activities)
- activities = union
- skippable = OR of children.skippable
- activities_always_happening = intersection of all non-skippable children's AAH
- sequence = union of children
- parallel = union of children
- min_trace_length = min of children
"""
child_fps = [get_footprints_powl(ch, footprints_cache) for ch in node.children]
start_activities = set()
end_activities = set()
activities = set()
skippable = False
sequence = set()
parallel = set()
# Intersection of always-happening among non-skippable children
aah = None
for fp in child_fps:
start_activities |= fp[START_ACTIVITIES]
end_activities |= fp[END_ACTIVITIES]
activities |= fp[ACTIVITIES]
skippable = skippable or fp[SKIPPABLE]
sequence |= fp[SEQUENCE]
parallel |= fp[PARALLEL]
if not fp[SKIPPABLE]:
if aah is None:
aah = set(fp[ACTIVITIES_ALWAYS_HAPPENING])
else:
aah &= fp[ACTIVITIES_ALWAYS_HAPPENING]
if aah is None:
aah = set()
sequence, parallel = fix_fp(sequence, parallel)
min_trace_length = combine_min_trace_length_xor(child_fps)
return {
START_ACTIVITIES: start_activities,
END_ACTIVITIES: end_activities,
ACTIVITIES: activities,
SKIPPABLE: skippable,
SEQUENCE: sequence,
PARALLEL: parallel,
ACTIVITIES_ALWAYS_HAPPENING: aah,
Outputs.MIN_TRACE_LENGTH.value: min_trace_length,
}
###############################################################################
# Handling LOOP
###############################################################################
[docs]
def get_footprints_of_loop(node: OperatorPOWL, footprints_cache: Dict[POWL, Dict[str, Any]]) -> Dict[str, Any]:
"""
Footprints for a LOOP node: children=[do, redo].
Similar logic as process-tree loop footprints.
"""
do_child = node.children[0]
redo_child = node.children[1]
do_fp = get_footprints_powl(do_child, footprints_cache)
redo_fp = get_footprints_powl(redo_child, footprints_cache)
start_activities = set(do_fp[START_ACTIVITIES])
end_activities = set(do_fp[END_ACTIVITIES])
activities = do_fp[ACTIVITIES] | redo_fp[ACTIVITIES]
sequence = set(do_fp[SEQUENCE]) | set(redo_fp[SEQUENCE])
parallel = set(do_fp[PARALLEL]) | set(redo_fp[PARALLEL])
# skippable if 'do' is skippable
skippable = do_fp[SKIPPABLE]
# always happening if 'do' is not skippable
aah = set()
if not do_fp[SKIPPABLE]:
aah = set(do_fp[ACTIVITIES_ALWAYS_HAPPENING])
# if do is skippable => can also start/end in redo
if do_fp[SKIPPABLE]:
start_activities |= redo_fp[START_ACTIVITIES]
end_activities |= redo_fp[END_ACTIVITIES]
# Add edges do.end -> redo.start, redo.end -> do.start, etc.
for a1 in do_fp[END_ACTIVITIES]:
for a2 in redo_fp[START_ACTIVITIES]:
sequence.add((a1, a2))
for a1 in redo_fp[END_ACTIVITIES]:
for a2 in do_fp[START_ACTIVITIES]:
sequence.add((a1, a2))
# If do is skippable => redo can loop with itself
if do_fp[SKIPPABLE]:
for a1 in redo_fp[END_ACTIVITIES]:
for a2 in redo_fp[START_ACTIVITIES]:
sequence.add((a1, a2))
# If redo is skippable => do can loop with itself
if redo_fp[SKIPPABLE]:
for a1 in do_fp[END_ACTIVITIES]:
for a2 in do_fp[START_ACTIVITIES]:
sequence.add((a1, a2))
sequence, parallel = fix_fp(sequence, parallel)
min_trace_length = combine_min_trace_length_loop(do_fp, redo_fp)
return {
START_ACTIVITIES: start_activities,
END_ACTIVITIES: end_activities,
ACTIVITIES: activities,
SKIPPABLE: skippable,
SEQUENCE: sequence,
PARALLEL: parallel,
ACTIVITIES_ALWAYS_HAPPENING: aah,
Outputs.MIN_TRACE_LENGTH.value: min_trace_length,
}
###############################################################################
# Handling StrictPartialOrder + transitive reduction
###############################################################################
[docs]
def transitive_reduction(node: StrictPartialOrder):
"""
Applies transitive reduction on the partial order contained in `node`.
If node.partial_order has edges A->B and B->C and A->C, we remove A->C.
In general, if there's an alternative path from A to C, we remove the direct edge A->C.
"""
from collections import defaultdict, deque
children = node.children
adjacency = {c: [] for c in children}
edges = []
# Build adjacency + list of edges
for c in children:
for d in children:
if node.partial_order.is_edge(c, d):
adjacency[c].append(d)
edges.append((c, d))
edges_to_remove = []
for (source, target) in edges:
# Check if there's another path from source to target ignoring the direct edge
visited = set()
queue = deque([source])
found_target = False
while queue:
current = queue.popleft()
if current == target and current != source:
# We reached target from source ignoring direct edge => it's transitive
found_target = True
break
# Traverse neighbors
for nxt in adjacency[current]:
# skip the direct edge source->target
if current == source and nxt == target:
continue
if nxt not in visited:
visited.add(nxt)
queue.append(nxt)
if found_target:
# The direct edge source->target is implied by a longer path
edges_to_remove.append((source, target))
# Remove them from the partial_order
for (source, target) in edges_to_remove:
node.partial_order.remove_edge(source, target)
[docs]
def get_footprints_of_partial_order(
node: StrictPartialOrder, footprints_cache: Dict[POWL, Dict[str, Any]]
) -> Dict[str, Any]:
"""
Footprints for a StrictPartialOrder node:
1) Build original adjacency (before transitive reduction).
2) Compute transitive closure for concurrency + skip logic + start/end detection.
3) Apply transitive reduction to keep the partial order minimal.
4) Compute footprints for each child, merge them (AND/parallel).
5) Refine start_activities: child c is 'start' if no non-skippable p != c has c in closure[p].
6) Refine end_activities: child c is 'end' if no non-skippable q != c is in closure[c].
7) Build sequence links:
- direct edges from partial_order
- skip edges if all intermediates are skippable
8) Concurrency detection with the closure.
9) fix_fp(...) and set min_trace_length.
"""
from collections import defaultdict, deque
# --------------------------------------------------------------------------
# 1) Original adjacency
# --------------------------------------------------------------------------
children = node.children
original_adjacency = {c: [] for c in children}
for c in children:
for d in children:
if node.partial_order.is_edge(c, d):
original_adjacency[c].append(d)
# --------------------------------------------------------------------------
# 2) Compute transitive closure from original adjacency
# --------------------------------------------------------------------------
# closure[c] = set of nodes reachable from c (including c itself).
closure = {c: set() for c in children}
for c in children:
visited = set([c])
queue = deque([c])
while queue:
cur = queue.popleft()
for nxt in original_adjacency[cur]:
if nxt not in visited:
visited.add(nxt)
queue.append(nxt)
closure[c] = visited
# --------------------------------------------------------------------------
# 3) Transitive reduction => partial order is minimal
# --------------------------------------------------------------------------
transitive_reduction(node)
# --------------------------------------------------------------------------
# 4) Footprints of each child, merge them
# --------------------------------------------------------------------------
child_fps = [get_footprints_powl(c, footprints_cache) for c in children]
merged_fp = merge_footprints(child_fps)
sequence = set(merged_fp[SEQUENCE])
parallel = set(merged_fp[PARALLEL])
start_activities = set()
end_activities = set()
child_index = {c: i for i, c in enumerate(children)}
# --------------------------------------------------------------------------
# 5) Refine START
# c is a start node if there is NO non-skippable p != c with c in closure[p].
# i.e. c can't be preceded by a mandatory node that must happen first.
# --------------------------------------------------------------------------
for i, c in enumerate(children):
fp_c = child_fps[i]
is_start = True
for p in children:
if p != c:
fp_p = child_fps[child_index[p]]
if not fp_p[SKIPPABLE]:
# p is mandatory
if c in closure[p]:
# means p->...->c
is_start = False
break
if is_start:
start_activities |= fp_c[START_ACTIVITIES]
# --------------------------------------------------------------------------
# 6) Refine END
# c is an end node if there is NO non-skippable q != c with q in closure[c].
# i.e. c can't be followed by a mandatory node in any path.
# --------------------------------------------------------------------------
for i, c in enumerate(children):
fp_c = child_fps[i]
is_end = True
for q in children:
if q != c:
fp_q = child_fps[child_index[q]]
if not fp_q[SKIPPABLE]:
# q is mandatory
if q in closure[c]:
# means c->...->q
is_end = False
break
if is_end:
end_activities |= fp_c[END_ACTIVITIES]
# --------------------------------------------------------------------------
# 7) Build sequence edges
# (a) direct edges from partial_order after reduction
# (b) skip edges if all intermediates are skippable
# --------------------------------------------------------------------------
reduced_adjacency = defaultdict(list)
for c in children:
for d in children:
if node.partial_order.is_edge(c, d):
reduced_adjacency[c].append(d)
# direct link in footprints
fp_c = child_fps[child_index[c]]
fp_d = child_fps[child_index[d]]
for a1 in fp_c[END_ACTIVITIES]:
for a2 in fp_d[START_ACTIVITIES]:
sequence.add((a1, a2))
def all_paths_through_only_skippable(c, d):
"""
Return True if c can reach d (c!=d) via closure, AND
for every path c->...->d, all intermediate nodes are skippable.
Equivalently:
There's no non-skippable n != c,d with c->...->n->...->d.
If there is any mandatory n in that path, we cannot skip it.
"""
if d not in closure[c] or c == d:
return False
for n in children:
if n != c and n != d:
fp_n = child_fps[child_index[n]]
if not fp_n[SKIPPABLE]:
# n is mandatory
# if c->n->...->d => can't skip n
if (n in closure[c]) and (d in closure[n]):
return False
return True
# skip edges
for c in children:
fp_c = child_fps[child_index[c]]
for d in children:
if c != d:
if all_paths_through_only_skippable(c, d):
fp_d = child_fps[child_index[d]]
for a1 in fp_c[END_ACTIVITIES]:
for a2 in fp_d[START_ACTIVITIES]:
sequence.add((a1, a2))
# --------------------------------------------------------------------------
# 8) Concurrency detection
# c,d concurrent if c->...->d not in closure and d->...->c not in closure
# --------------------------------------------------------------------------
for i in range(len(children)):
for j in range(i + 1, len(children)):
c = children[i]
d = children[j]
if (d not in closure[c]) and (c not in closure[d]):
# concurrency
fp_c = child_fps[i]
fp_d = child_fps[j]
for a1 in fp_c[ACTIVITIES]:
for a2 in fp_d[ACTIVITIES]:
parallel.add((a1, a2))
parallel.add((a2, a1))
# --------------------------------------------------------------------------
# 9) fix_fp(...) and finalize
# --------------------------------------------------------------------------
sequence, parallel = fix_fp(sequence, parallel)
merged_fp[START_ACTIVITIES] = start_activities
merged_fp[END_ACTIVITIES] = end_activities
merged_fp[SEQUENCE] = sequence
merged_fp[PARALLEL] = parallel
# min_trace_length = sum of non-skippable children
merged_fp[Outputs.MIN_TRACE_LENGTH.value] = combine_min_trace_length_par(child_fps)
return merged_fp
###############################################################################
# Recursive footprints function with memoization
###############################################################################
[docs]
def get_footprints_powl(node: POWL, footprints_cache: Dict[POWL, Dict[str, Any]]) -> Dict[str, Any]:
"""
Computes the footprints of a POWL node, caching results in footprints_cache to avoid recomputation.
"""
if node in footprints_cache:
return footprints_cache[node]
if isinstance(node, Transition):
fp = get_footprints_of_transition(node)
elif isinstance(node, OperatorPOWL):
if node.operator == Operator.XOR:
fp = get_footprints_of_xor(node, footprints_cache)
elif node.operator == Operator.LOOP:
fp = get_footprints_of_loop(node, footprints_cache)
else:
raise NotImplementedError(f"Unsupported Operator: {node.operator}")
elif isinstance(node, StrictPartialOrder):
fp = get_footprints_of_partial_order(node, footprints_cache)
else:
raise NotImplementedError(f"Unknown POWL node type: {type(node)}")
footprints_cache[node] = fp
return fp
###############################################################################
# Main entry point
###############################################################################
[docs]
def apply(model: POWL, parameters: Optional[Dict[Any, Any]] = None) -> Dict[str, Any]:
"""
Main entry point to compute footprints of a POWL model.
Applies transitive reduction on any partial order, then computes standard footprints.
The returned dictionary includes:
"start_activities",
"end_activities",
"activities",
"skippable",
"sequence",
"parallel",
"activities_always_happening",
"min_trace_length"
"""
footprints_cache = {}
footprints = get_footprints_powl(model, footprints_cache)
return footprints