"""
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
"""
from collections import Counter, defaultdict
import itertools
from typing import Any, Generic, Set, TypeVar, Union
from copy import deepcopy
from pm4py.objects.oc_causal_net.obj import OCCausalNet
N = TypeVar("N", bound=OCCausalNet)
M = TypeVar("M", bound=OCCausalNet.Marker)
MG = TypeVar("MG", bound=OCCausalNet.MarkerGroup)
[docs]
class OCCausalNetState(Generic[N], defaultdict):
"""
The state of an object-centric causal net is a mapping from activities act to
multisets of outstanding obligations (act2, object_id, object_type) from act2 to act.
```
state = OCCausalNetState({act: Counter([(act2, object_id, object_type)])})
```
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initializes the OCCausalNetState, querying unspecified activities defaults to an empty multiset."""
super().__init__(Counter)
data_args = args
if args and args[0] is Counter:
data_args = args[1:]
initial_data = dict(*data_args, **kwargs)
for act, obligations in initial_data.items():
self[act] = Counter(obligations)
def __hash__(self):
return frozenset(
(act, frozenset(counter.items()))
for act, counter in self.items()
if counter
).__hash__()
def __eq__(self, other):
if not isinstance(other, OCCausalNetState):
return False
return all(
self.get(a, Counter()) == other.get(a, Counter())
for a in set(self.keys()) | set(other.keys())
)
def __le__(self, other):
for a, self_counter in self.items():
other_counter = other.get(a, Counter())
# Every obligation count in self must be less than or equal to the count in other.
if not all(
other_counter.get(pred, 0) >= count
for pred, count in self_counter.items()
):
return False
return True
def __add__(self, other):
result = OCCausalNetState()
for a, self_counter in self.items():
result[a] += self_counter
for a, other_counter in other.items():
result[a] += other_counter
return result
def __sub__(self, other):
result = OCCausalNetState()
for a, self_counter in self.items():
diff = self_counter - other.get(a, Counter())
if diff != Counter():
result[a] = diff
return result
def __repr__(self):
# e.g. [(a, o1[order], a'), ...]
sorted_entries = sorted(self.items(), key=lambda item: item[0])
obligations = [
f"({pred}, {obj_id}[{ot}], {act}):{count}"
for (act, obl) in sorted_entries
for ((pred, obj_id, ot), count) in obl.items()
]
return f'[{", ".join(obligations) if obligations else ""}]'
def __str__(self):
return self.__repr__()
def __deepcopy__(self, memodict={}):
new_state = OCCausalNetState()
memodict[id(self)] = new_state
for act, obligations in self.items():
act_copy = (
memodict[id(act)] if id(act) in memodict else deepcopy(act, memodict)
)
counter_copy = (
memodict[id(obligations)]
if id(obligations) in memodict
else deepcopy(obligations, memodict)
)
new_state[act_copy] = counter_copy
return new_state
@property
def activities(self) -> Set:
"""
Set of activities with outstanding obligations.
"""
return set(act for act in self.keys() if self[act])
[docs]
class OCCausalNetSemantics(Generic[N]):
"""
Class for the semantics of object-centric causal nets
"""
[docs]
@classmethod
def enabled_activities(
cls,
occn: N,
state: OCCausalNetState,
include_start_activities=False,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> Set:
"""
Returns the enabled activities in the given state.
Parameters
----------
occn
Object-centric causal net
state
State of the OCCN
include_start_activities
True if start activities should be included in the set.
Start activities are always enabled.
act_to_idx
If activities are denoted in the state by an id instead of their name,
a dictionary mapping activities to their index has to be provided here.
ot_to_idx
If object types are denoted in the state by an id instead of their name,
a dictionary mapping object types to their index has to be provided here.
Returns
-------
set
Set of all activities enabled.
"""
return set(
act
for act in occn.activities
if (include_start_activities or not act.startswith("START_"))
and cls.is_enabled(occn, act, state, act_to_idx, ot_to_idx)
)
[docs]
@classmethod
def is_enabled(
cls,
occn: N,
act: str,
state: OCCausalNetState,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> bool:
"""
Checks whether a given activity is enabled in a given object-centric
casal net and state.
An activity is enabled if there exists an input marker group that can be
bound. Start activities are always enabled.
Parameters
----------
occn
Object-centric causal net
act
Activity to check
state
State of the OCCN
act_to_idx
If activities are denoted in the state by an id instead of their name,
a dictionary mapping activities to their index has to be provided here.
ot_to_idx
If object types are denoted in the state by an id instead of their name,
a dictionary mapping object types to their index has to be provided here.
Returns
-------
bool
true if enabled, false otherwise
"""
# Start activities are always enabled
if act.startswith("START_"):
return True
if act_to_idx:
act_id = act_to_idx[act]
else:
act_id = act
# preprocess Counter for this activity to a lookup table where values
# are sets of objects with outstanding obligations to this act
objects = defaultdict(set)
for rel_act, obj_id, ot_id in state[act_id].keys():
objects[(rel_act, ot_id)].add(obj_id)
imgs = occn.input_marker_groups[act]
# if there are not outstanding obligations, the activity cannot be enabled
if not state[act_id]:
return False
# check each img
for img in imgs:
# markers that allow for consumption of 0 obligations do not need to be enabled
# at least one marker of the img needs to be enabled
one_marker_enabled = False
for marker in img.markers:
min_count = marker.min_count
rel_act_id = (
marker.related_activity
if not act_to_idx
else act_to_idx[marker.related_activity]
)
ot_id = (
marker.object_type
if not ot_to_idx
else ot_to_idx[marker.object_type]
)
num_objects = len(objects[(rel_act_id, ot_id)])
if num_objects >= max(min_count, 1):
one_marker_enabled = True
elif min_count != 0:
# marker is not enabled, move on to next img
one_marker_enabled = False
break
if one_marker_enabled:
return True
return False
@classmethod
def _find_matching_marker_group(
cls, obligations: dict, marker_groups: list
) -> Union[MG, None]:
"""
Finda a matching marker group that is able to consume/produce the given obligations.
Parameters
----------
obligations : dict
Obligations to consume, mapping related activities to a dict mapping
object types to a set of object ids
marker_groups : list
List of marker groups to check
Returns
-------
Union[MG, None]
A matching marker group if found, None otherwise.
"""
if not obligations:
return None
# Calculate object counts from the obligations
obj_counts = {
related_act: {
ot: len(obligations[related_act][ot]) for ot in obligations[related_act]
}
for related_act in obligations
}
# check each group
for mg in marker_groups:
mg_dict = mg.dict_representation
# Check that markers for all required related activities exist
# and all required ots are present
failure = False
for related_act in obj_counts:
if related_act not in mg_dict:
failure = True
for ot in obj_counts[related_act]:
if ot not in mg_dict[related_act]:
failure = True
if failure:
continue
# 1: check that count matches (= is within cardinality bounds)
counts_match = all(
mg_dict[related_act][ot][1]
>= obj_counts.get(related_act, {}).get(ot, 0)
>= mg_dict[related_act][ot][0]
for related_act in mg_dict
for ot in mg_dict[related_act]
)
if not counts_match:
continue
# 2: check key constraints
constraints_violated = any(
obligations.get(rel_act_1, {})
.get(ot, set())
.intersection(obligations.get(rel_act_2, {}).get(ot, set()))
for (rel_act_1, ot, rel_act_2) in mg.key_constraints
)
if constraints_violated:
continue
# found matching group
return mg
return None
[docs]
@classmethod
def is_binding_enabled(
cls,
net: N,
act: str,
cons: dict[str, dict[str, Set]],
prod: dict[str, dict[str, Set]],
state: OCCausalNetState,
) -> Union[tuple[MG, MG], None]:
"""
Checks whether the given binding is enabled in the object-centric causal net.
A binding is enabled if the activity has input and output marker groupos that
match the given objects and the state contains all necessary obligations.
Parameters
----------
net : N
The object-centric causal net
act : str
The activity to bind
cons : dict[str, dict[str, Set]]
The obligations to consume, mapping predecessor activities to a dict mapping
object types to a set of object ids
prod : dict[str, dict[str, Set]]
The obligations to produce, mapping successor activities to a dict mapping
object types to a set of object ids
state : OCCausalNetState
The current state of the OCCN
Returns
-------
Union[tuple[MG, MG], None]:
The input and output marker groups enabling the binding if it is enabled, None otherwise.
"""
# 1: check that all consumed obligations are present in the state
if cons:
if any(
state[act].get((pred, obj_id, ot), 0) <= 0
for pred in cons.keys()
for ot in cons[pred].keys()
for obj_id in cons[pred][ot]
):
return None
else:
if not prod:
# we need to either consume or produce obligations
return None
# 2: Find a matching input marker group
if act.startswith("START_"):
if cons:
return None
# For START activities, we do not consume obligations, cons has to be empty
matched_img = None
else:
matched_img = cls._find_matching_marker_group(
cons, net.input_marker_groups[act]
)
if matched_img is None:
return None
# 3: Find a matching output marker group
if act.startswith("END_"):
if prod:
return None
# For START activities, we do not consume obligations, prod has to be empty
matched_omg = None
else:
matched_omg = cls._find_matching_marker_group(
prod, net.output_marker_groups[act]
)
if matched_omg is None:
return None
return (matched_img, matched_omg)
[docs]
@classmethod
def bind_activity(
cls,
net: N,
act: str,
cons: dict[str, dict[str, Set]],
prod: dict[str, dict[str, Set]],
state: OCCausalNetState,
) -> OCCausalNetState:
"""
Binds an activity in the object-centric causal net.
For performance reasons, this method does not check whether the binding
is valid given the current state. If necessary, the caller should
ensure this, e.g., using the `is_binding_enabled` method.
Parameters
----------
net : N
The object-centric causal net
act : str
The activity to bind
cons : dict[str, dict[str, Set]]
The obligations to consume, mapping predecessor activities to a dict mapping
object types to a set of object ids
prod : dict[str, dict[str, Set]]
The obligations to produce, mapping successor activities to a dict mapping
object types to a set of object ids
state : OCCausalNetState
The current state of the OCCN
Returns
-------
OCCausalNetState
The new state after binding the activity
"""
# consume obligations
if cons:
consume = OCCausalNetState(
{
act: Counter(
[
(pred, obj_id, ot)
for pred in cons
for ot in cons[pred]
for obj_id in cons[pred][ot]
]
)
}
)
state -= consume
# produce obligations
if prod:
produce = OCCausalNetState(
{
succ: Counter(
[
(act, obj_id, ot)
for ot in prod[succ]
for obj_id in prod[succ][ot]
]
)
for succ in prod
}
)
state += produce
return state
[docs]
@classmethod
def replay(cls, occn: N, sequence: tuple) -> bool:
"""
Replays a sequence of bindings on the object-centric causal net.
Parameters
----------
occn : N
The object-centric causal net to replay on.
sequence : tuple
A sequence of bindings, where each binding is a tuple of (activity_name, consumed_obligations, produced_obligations).
consumed_obligations and produced_obligations are dictionaries mapping
related activities to a dict mapping object types to a set of object ids.
Returns
-------
bool
True if the sequence can be replayed on the net, False otherwise.
"""
# start in the empty state
state = OCCausalNetState()
# replay each binding
for act, cons, prod in sequence:
if not cls.is_binding_enabled(occn, act, cons, prod, state):
return False
state = cls.bind_activity(occn, act, cons, prod, state)
# check if we are in the empty state
if state.activities:
return False
return True
[docs]
@classmethod
def enabled_bindings_start_activity(
cls,
occn: N,
act: str,
object_type: str,
objects: set,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> tuple:
"""
Computes all enabled bindings for a start activity with a given set of objects.
These bindings will produce obligations for at least one of the objects.
Based on the `enabled_bindings` method, but specialized for start activities.
Parameters
----------
occn : N
The object-centric causal net
act : str
The start activity to bind
object_type : str
The object type of the start activity
objects : set
A set of objects to bind to the activity. All bindings will bind at least one.
act_to_idx : dict, optional
If activities are denoted in the state by an id instead of their name,
a dictionary mapping activities to their index has to be provided here.
ot_to_idx : dict, optional
If object types are denoted in the state by an id instead of their name,
a dictionary mapping object types to their index has to be provided here.
Returns
-------
tuple
A tuple where entries are tuples of activity id, consumed objects, and produced objects.
Consumed / produces are tuples of (predecessor/successor activity id, objects_per_ot)
where objects_per_ot is a tuples of entries (object_type, objects)
where objects is a tuple (obj_id_1, obj_id_2, ...).
If act_to_idx and ot_to_idx are provided, indices instead of names are
used for activities and object types.
"""
assert act.startswith("START_"), "This method is only for start activities."
if act_to_idx:
act_id = act_to_idx[act]
else:
act_id = act
if ot_to_idx:
ot_id = ot_to_idx[object_type]
else:
ot_id = object_type
# create a list of fake consumed tuples that binds the powerset of objects
fake_consumed = []
for i in range(len(objects)):
for combo in itertools.combinations(objects, i + 1):
fake_consumed.append((
(-1, (
(ot_id, combo),
)
)
,))
# create the corresponding produced tuples
memo_produced = {}
memo_ot_assignments = {}
enabled_bindings = []
for consumed in fake_consumed:
possible_produced_for_consumed = cls.__generate_produced_for_consumed(
occn,
act,
consumed,
memo_produced,
memo_ot_assignments,
act_to_idx,
ot_to_idx,
)
for produced in possible_produced_for_consumed:
enabled_bindings.append((act_id, None, produced))
return tuple(enabled_bindings)
[docs]
@classmethod
def enabled_bindings(
cls,
occn: N,
act: str,
state: OCCausalNetState,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> tuple:
"""
Computes all enabled bindings for a given activity in a given state.
Parameters
----------
occn : N
The object-centric causal net
act : str
The activity to bind
state : OCCausalNetState
The current state of the OCCN
act_to_idx : dict, optional
If activities are denoted in the state by an id instead of their name,
a dictionary mapping activities to their index has to be provided here.
ot_to_idx : dict, optional
If object types are denoted in the state by an id instead of their name,
a dictionary mapping object types to their index has to be provided here.
Returns
-------
tuple
A tuple where entries are tuples of activity id, consumed objects, and produced objects.
Consumed / produces are tuples of (predecessor/successor activity id, objects_per_ot)
where objects_per_ot is a tuples of entries (object_type, objects)
where objects is a tuple (obj_id_1, obj_id_2, ...).
If act_to_idx and ot_to_idx are provided, indices instead of names are
used for activities and object types.
"""
if act_to_idx:
act_id = act_to_idx[act]
else:
act_id = act
# outstanding obligations for the activity
obligations = state[act_id]
# pre-process obligations to a dict where keys are (related activity, object_type)
# and values are sets of object ids (neglecting the count)
obligations_dict = defaultdict(set)
for (related_act, obj_id, ot_id), _ in obligations.items():
obligations_dict[(related_act, ot_id)].add(obj_id)
# ----------- Create all possibilities for consumed -----------
possible_consumed = cls.__generate_consumed(
occn, act, obligations_dict, act_to_idx, ot_to_idx
)
if not possible_consumed:
return ()
# ----------- Create all possible produced tuples per consumed tuple -----------
final_bindings = []
# Memoization for produced tuples, keyed by comsumed object sets by object type
memo_produced = {}
# Memoization for sub-problem of assigning objects of one ot for one omg
memo_ot_assignments = {}
for consumed in possible_consumed:
# End activities do not produce obligations
if act.startswith("END_"):
possible_produced_for_consumed = {None}
else:
possible_produced_for_consumed = cls.__generate_produced_for_consumed(
occn,
act,
consumed,
memo_produced,
memo_ot_assignments,
act_to_idx,
ot_to_idx,
)
# Create final bindings
for produced in possible_produced_for_consumed:
final_bindings.append((act_id, consumed, produced))
return tuple(final_bindings)
@classmethod
def __generate_consumed(
cls,
occn: N,
act: str,
obligations_dict: dict,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> Set[tuple]:
"""
Generates all possible consumed tuples for a given activity and obligations.
"""
possible_consumed = set()
for img in occn.input_marker_groups[act]:
img_dict = img.dict_representation
# preprocess key constraints if ids are used
key_constraints_by_id = []
if act_to_idx:
for rel_act_1_id, ot_id, rel_act_2_id in img.key_constraints:
key_constraints_by_id.append(
(
act_to_idx[rel_act_1_id],
ot_to_idx[ot_id],
act_to_idx[rel_act_2_id],
)
)
else:
key_constraints_by_id = img.key_constraints
# get all combinations on which objects we can consume given the img
keys, combinations_iter_list = (
cls.__generate_predecessor_object_combinations(
img_dict, obligations_dict, act_to_idx, ot_to_idx
)
)
if not keys:
# img not enabled
continue
# from the consumed_per_pred, create all combinations of consumed obligations
# these are added to possible_consumed
cls.__consumed_from_predecessor_combinations(
possible_consumed, keys, combinations_iter_list, key_constraints_by_id
)
return possible_consumed
@classmethod
def __generate_predecessor_object_combinations(
cls,
img_dict: dict,
obligations_dict: dict,
act_to_idx: dict = None,
ot_to_idx: dict = None,
):
"""
Generates all combinations of objects from obligations_dict that can be consumed given the img_dict.
"""
# we build two lists: keys of format (predecessor_id, ot_id) and
# a list of iterations where each iterator yields all possible object
# combinations for all (predecessor, ot) pairs
keys = []
combinations_iter_list = []
for predecessor in img_dict:
for ot in img_dict[predecessor]:
min_count, max_count = img_dict[predecessor][ot]
if act_to_idx:
pred_id = act_to_idx[predecessor]
else:
pred_id = predecessor
if ot_to_idx:
ot_id = ot_to_idx[ot]
else:
ot_id = ot
# get all objects for obligations of the predecessor activity and object type
objects_for_pred = sorted(obligations_dict[(pred_id, ot_id)])
if len(objects_for_pred) < min_count:
# get to next img, this one cannot be bound
return [], []
if not objects_for_pred:
# no objects available for this (pred, ot) pair, skip
continue
else:
# get all combinations of objects
keys.append((pred_id, ot_id))
combinations_for_req = itertools.chain.from_iterable(
[
itertools.combinations(objects_for_pred, r)
for r in range(
# this (pred, ot) pair may consume min_count to max_count objects
min_count,
min(max_count, len(objects_for_pred)) + 1,
)
]
)
combinations_iter_list.append(combinations_for_req)
return keys, combinations_iter_list
@staticmethod
def __consumed_from_predecessor_combinations(
possible_consumed: set,
keys: list,
combinations_iter_list: list,
key_constraints_by_id: list,
):
"""
Generates all consumed tuples given all possible assignments from
(predecessor, object type) tuples to sets of objects that may be consumed
by this pair.
"""
# create cross product of all options per predecessor and object type
cross_product_iter = itertools.product(*combinations_iter_list)
for binding_selection in cross_product_iter:
# create dict mapping predecessor activitiy to (ot_id, objects) tuples
grouped_by_pred = defaultdict(dict)
for (pred_id, ot_id), objects in zip(keys, binding_selection):
if len(objects) > 0:
grouped_by_pred[pred_id][ot_id] = objects
# if all markers of the img are optional, we can skip empty bindings
if not grouped_by_pred:
continue
# Check key constraints
constraint_violated = False
for rel_act_1_id, ot_id, rel_act_2_id in key_constraints_by_id:
objects1 = grouped_by_pred.get(rel_act_1_id, {}).get(ot_id)
objects2 = grouped_by_pred.get(rel_act_2_id, {}).get(ot_id)
if objects1 and objects2 and set(objects1).intersection(objects2):
# key constraint violated, move on to next binding
constraint_violated = True
break
if constraint_violated:
continue
# convert to memory-efficient tuple representation
consumed_tuple = tuple(
# sorting necessary to ensure duplicate-free tuples in possible_consumed
(pred_id, tuple(sorted(ot_obj_pairs.items())))
for pred_id, ot_obj_pairs in sorted(grouped_by_pred.items())
)
# add to possible consumed obligations (duplicate-free)
possible_consumed.add(consumed_tuple)
@classmethod
def __generate_produced_for_consumed(
cls,
occn: N,
act: str,
consumed: tuple,
memo_produced: dict,
memo_ot_assignments: dict,
act_to_idx: dict = None,
ot_to_idx: dict = None,
) -> Set[tuple]:
"""
Generates all possible produced tuples for a given consumed tuple.
"""
# 1. Process consumed tuple into sets of consumed objects per ot
consumed_objects_by_ot = defaultdict(set)
for _, ot_obj_pairs in consumed:
for ot_id, objects in ot_obj_pairs:
consumed_objects_by_ot[ot_id].update(objects)
# Create hashable key for memo
consumed_key = cls.__make_hashable(consumed_objects_by_ot)
# 2. Check memo cache
if consumed_key in memo_produced:
possible_produced_for_consumed = memo_produced[consumed_key]
else:
# Compute all possible produced tuples
possible_produced_for_consumed = set() # set to avoid duplicates
# Compute per omg; cache to avoid recomputation
for omg in occn.output_marker_groups[act]:
produced_for_omg = cls.__generate_produced_for_omg(
omg,
consumed_objects_by_ot,
memo_ot_assignments,
act_to_idx,
ot_to_idx,
)
possible_produced_for_consumed.update(produced_for_omg)
# Store result in cache
memo_produced[consumed_key] = possible_produced_for_consumed
return possible_produced_for_consumed
@classmethod
def __generate_produced_for_omg(
cls, omg, consumed_objects_by_ot, memo, act_to_idx, ot_to_idx
):
"""
Generates all possible produced tuples for a given output marker group and
consumed objects by object type (these need to be produced).
"""
omg_dict = omg.dict_representation
# Pre-process omg requirements and key constraints into efficient lookups in case ids are used
reqs_by_ot = defaultdict(list)
key_constraints_by_ot = defaultdict(set)
for succ_name, ot_map in omg_dict.items():
succ_id = act_to_idx[succ_name] if act_to_idx else succ_name
for ot_name, (min_c, max_c) in ot_map.items():
ot_id = ot_to_idx[ot_name] if ot_to_idx else ot_name
reqs_by_ot[ot_id].append((succ_id, min_c, max_c))
for s1_name, ot_name, s2_name in omg.key_constraints:
s1_id = act_to_idx[s1_name] if act_to_idx else s1_name
ot_id = ot_to_idx[ot_name] if ot_to_idx else ot_name
s2_id = act_to_idx[s2_name] if act_to_idx else s2_name
key_constraints_by_ot[ot_id].add(frozenset([s1_id, s2_id]))
# Check if objects types match for early exit
consumed_ots = set(consumed_objects_by_ot.keys())
required_ots = set(reqs_by_ot.keys())
if not consumed_ots.issubset(required_ots):
# omg has no marker for some consumed object type
return []
missing_required_ots = required_ots - consumed_ots
for missing_ot in missing_required_ots:
# marker has an ot that was not consumed
# this is an issue if that marker is not optional
if any(min_c > 0 for _, min_c, _ in reqs_by_ot[missing_ot]):
return []
# Get all possible assignments of successor activities to consumed objects
# this is done per individually per object type
ot_ids_in_order, assignments_in_order = cls.__generate_successor_assignments(
omg, memo, consumed_objects_by_ot, reqs_by_ot, key_constraints_by_ot
)
# Get cross product of all assignments per object type to get all possible produced tuples
final_produced_tuples = cls.__produced_from_successor_assignments(
ot_ids_in_order, assignments_in_order
)
return final_produced_tuples
@staticmethod
def __generate_successor_assignments(
omg: MG,
memo: dict,
consumed_objects_by_ot: dict,
reqs_by_ot: dict,
key_constraints_by_ot: dict,
):
"""
Generates all possible assignments of consumed objects to successor activities.
"""
# we generate all possible assignments of consumed objects to successor activities
# and then prune the ones that violate key constraints and
# the ones that violate the min/max cardinality requirements
ot_ids_in_order = []
assignments_in_order = []
# Solve assignment problem for each object type separetly
# this is cached since different consumed_objects_by_ot may have
# the same object sets for the same ot_id
for ot_id, objects in consumed_objects_by_ot.items():
memo_key = (id(omg), ot_id, frozenset(objects))
if memo_key in memo:
valid_ot_assignments = memo[memo_key]
else:
ot_reqs = reqs_by_ot.get(ot_id, [])
successors_for_ot = [req[0] for req in ot_reqs]
ot_key_constraints = key_constraints_by_ot.get(ot_id, set())
per_object_choices = []
for obj in objects:
obj_choices = []
# object may be assigned to 1 or more successors
for i in range(1, len(successors_for_ot) + 1):
# get all combinations of successors of size i
for succ_set in itertools.combinations(successors_for_ot, i):
# check key constraints
is_valid = all(
frozenset(pair) not in ot_key_constraints
for pair in itertools.combinations(succ_set, 2)
)
if is_valid:
obj_choices.append(succ_set)
if not obj_choices:
return [], [] # An object has no valid assignment
per_object_choices.append(obj_choices)
# Create all combinations of assignments with cross-product over all objects
valid_ot_assignments = []
for assignment_choice in itertools.product(*per_object_choices):
final_assignment = defaultdict(list)
for obj, succs in zip(objects, assignment_choice):
for succ in succs:
final_assignment[succ].append(obj)
# Check cardinality constraints
counts_ok = True
for succ_id, min_c, max_c in ot_reqs:
count = len(final_assignment.get(succ_id, []))
if not (min_c <= count <= max_c):
counts_ok = False
break
if counts_ok:
# add to valid assignments
canonical_assignment = {
s: tuple(sorted(o)) for s, o in final_assignment.items()
}
valid_ot_assignments.append(canonical_assignment)
# Store in memoization cache
memo[memo_key] = valid_ot_assignments
# Only proceed if we have valid assignments for this ot_id
if not valid_ot_assignments:
return [], []
# Store ot_id and valid assignments
ot_ids_in_order.append(ot_id)
assignments_in_order.append(valid_ot_assignments)
return ot_ids_in_order, assignments_in_order
@staticmethod
def __produced_from_successor_assignments(
ot_ids_in_order: list, assignments_in_order: list
) -> Set[tuple]:
"""
Generates all produced tuples given all possible assignments from
successor activities to sets of objects that may be produced for this activity.
"""
final_produced_tuples = set() # avoid duplicates
for final_choice in itertools.product(*assignments_in_order):
# convert to dict first
produced_grouped_by_succ = defaultdict(dict)
for ot_id, ot_assignment_dict in zip(ot_ids_in_order, final_choice):
for succ_id, assigned_objects in ot_assignment_dict.items():
if len(assigned_objects) > 0:
produced_grouped_by_succ[succ_id][ot_id] = assigned_objects
# Has to produce at least one object
if not produced_grouped_by_succ:
continue
# Convert to tuple representation
produced_tuple = tuple(
(succ_id, tuple(sorted(ot_obj_pairs.items())))
for succ_id, ot_obj_pairs in sorted(produced_grouped_by_succ.items())
)
final_produced_tuples.add(produced_tuple)
return final_produced_tuples
@staticmethod
def __make_hashable(d):
"""Helper function to create a hashable key from a dictionary."""
# Converts a dict of {ot_id: set(obj_ids)} to a frozenset of items
# so it can be used as a dictionary key for memoization.
return frozenset((k, frozenset(v)) for k, v in d.items())