'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.util import xes_constants as xes_util
from pm4py.objects.petri_net import semantics
from pm4py.objects.petri_net.utils.petri_utils import (
get_places_shortest_path_by_hidden,
get_s_components_from_petri,
)
from pm4py.objects.log import obj as log_implementation
from pm4py.objects.petri_net.utils import align_utils
from copy import copy
from enum import Enum
from pm4py.util import exec_utils, constants
from pm4py.util import variants_util, pandas_utils
import importlib.util
from typing import Optional, Dict, Any, Union
from pm4py.objects.log.obj import EventLog
import pandas as pd
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.util import typing
from collections import deque
from pm4py.objects.conversion.log import converter as log_converter
[docs]
class Parameters(Enum):
CASE_ID_KEY = constants.PARAMETER_CONSTANT_CASEID_KEY
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
EXHAUSTIVE_INVISIBLE_EXPLORATION = "exhaustive_invisible_exploration"
PARAMETER_VARIANT_DELIMITER = "variant_delimiter"
VARIANTS = "variants"
PLACES_SHORTEST_PATH_BY_HIDDEN = "places_shortest_path_by_hidden"
THREAD_MAX_EX_TIME = "thread_maximum_ex_time"
DISABLE_VARIANTS = "disable_variants"
CLEANING_TOKEN_FLOOD = "cleaning_token_flood"
IS_REDUCTION = "is_reduction"
WALK_THROUGH_HIDDEN_TRANS = "walk_through_hidden_trans"
RETURN_NAMES = "return_names"
STOP_IMMEDIATELY_UNFIT = "stop_immediately_unfit"
TRY_TO_REACH_FINAL_MARKING_THROUGH_HIDDEN = (
"try_to_reach_final_marking_through_hidden"
)
CONSIDER_REMAINING_IN_FITNESS = "consider_remaining_in_fitness"
CONSIDER_ACTIVITIES_NOT_IN_MODEL_IN_FITNESS = (
"consider_activities_not_in_model_in_fitness"
)
ENABLE_PLTR_FITNESS = "enable_pltr_fitness"
SHOW_PROGRESS_BAR = "show_progress_bar"
[docs]
class TechnicalParameters(Enum):
MAX_REC_DEPTH = 50
MAX_IT_FINAL1 = 5
MAX_IT_FINAL2 = 5
MAX_REC_DEPTH_HIDTRANSENABL = 2
MAX_POSTFIX_SUFFIX_LENGTH = 20
MAX_NO_THREADS = 1024
MAX_DEF_THR_EX_TIME = 10
ENABLE_POSTFIX_CACHE = False
ENABLE_MARKTOACT_CACHE = False
[docs]
class DebugConst:
REACH_MRH = -1
REACH_ITF1 = -1
REACH_ITF2 = -1
[docs]
class NoConceptNameException(Exception):
def __init__(self, message):
self.message = message
[docs]
def add_missing_tokens(t, marking):
"""
Adds missing tokens needed to activate a transition
Parameters
----------
t
Transition that should be enabled
marking
Current marking
"""
missing = 0
tokens_added = {}
for a in t.in_arcs:
if marking[a.source] < a.weight:
missing = missing + (a.weight - marking[a.source])
tokens_added[a.source] = a.weight - marking[a.source]
marking[a.source] = marking[a.source] + a.weight
return [missing, tokens_added]
[docs]
def get_consumed_tokens(t):
"""
Get tokens consumed firing a transition
Parameters
----------
t
Transition that should be enabled
"""
consumed = 0
consumed_map = {}
for a in t.in_arcs:
consumed = consumed + a.weight
consumed_map[a.source] = a.weight
return consumed, consumed_map
[docs]
def get_produced_tokens(t):
"""
Get tokens produced firing a transition
Parameters
----------
t
Transition that should be enabled
"""
produced = 0
produced_map = {}
for a in t.out_arcs:
produced = produced + a.weight
produced_map[a.target] = a.weight
return produced, produced_map
[docs]
def merge_dicts(x, y):
"""
Merge two dictionaries keeping the least value
Parameters
----------
x
First map (string, integer)
y
Second map (string, integer)
"""
for key in y:
if key not in x:
x[key] = y[key]
else:
if y[key] < x[key]:
x[key] = y[key]
[docs]
def get_places_with_missing_tokens(t, marking):
"""
Get places with missing tokens
Parameters
----------
t
Transition to enable
marking
Current marking
"""
places_with_missing = set()
for a in t.in_arcs:
if marking[a.source] < a.weight:
places_with_missing.add(a.source)
return places_with_missing
[docs]
def get_hidden_transitions_to_enable(
marking, places_with_missing, places_shortest_path_by_hidden
):
"""
Calculate an ordered list of transitions to visit in order to enable a given transition
Parameters
----------
marking
Current marking
places_with_missing
List of places with missing tokens
places_shortest_path_by_hidden
Minimal connection between places by hidden transitions
"""
hidden_transitions_to_enable = []
marking_places = [x for x in marking]
marking_places = sorted(marking_places, key=lambda x: x.name)
places_with_missing_keys = [x for x in places_with_missing]
places_with_missing_keys = sorted(
places_with_missing_keys, key=lambda x: x.name
)
for p1 in marking_places:
for p2 in places_with_missing_keys:
if (
p1 in places_shortest_path_by_hidden
and p2 in places_shortest_path_by_hidden[p1]
):
hidden_transitions_to_enable.append(
places_shortest_path_by_hidden[p1][p2]
)
hidden_transitions_to_enable = sorted(
hidden_transitions_to_enable, key=lambda x: len(x)
)
return hidden_transitions_to_enable
[docs]
def get_req_transitions_for_final_marking(
marking, final_marking, places_shortest_path_by_hidden
):
"""
Gets required transitions for final marking
Parameters
----------
marking
Current marking
final_marking
Final marking assigned to the Petri net
places_shortest_path_by_hidden
Minimal connection between places by hidden transitions
"""
hidden_transitions_to_enable = []
marking_places = [x for x in marking]
marking_places = sorted(marking_places, key=lambda x: x.name)
final_marking_places = [x for x in final_marking]
final_marking_places = sorted(final_marking_places, key=lambda x: x.name)
for p1 in marking_places:
for p2 in final_marking_places:
if (
p1 in places_shortest_path_by_hidden
and p2 in places_shortest_path_by_hidden[p1]
):
hidden_transitions_to_enable.append(
places_shortest_path_by_hidden[p1][p2]
)
hidden_transitions_to_enable = sorted(
hidden_transitions_to_enable, key=lambda x: len(x)
)
return hidden_transitions_to_enable
[docs]
def enable_hidden_transitions(
net,
marking,
activated_transitions,
visited_transitions,
all_visited_markings,
hidden_transitions_to_enable,
t,
exhaustive_invisible_exploration
):
"""
Actually enable hidden transitions on the Petri net
Parameters
-----------
net
Petri net
marking
Current marking
activated_transitions
All activated transitions during the replay
visited_transitions
All visited transitions by the recursion
all_visited_markings
All visited markings
hidden_transitions_to_enable
List of hidden transition to enable
t
Transition against we should check if they are enabled
"""
if exhaustive_invisible_exploration:
queue = deque([(marking, activated_transitions, visited_transitions, all_visited_markings)])
seen_markings = set()
while queue:
curr_marking, curr_act_trans, curr_visit_trans, curr_vis_mark = queue.popleft()
if semantics.is_enabled(t, net, curr_marking):
return [curr_marking, curr_act_trans, curr_visit_trans, curr_vis_mark]
marking_hash = hash(curr_marking)
if marking_hash in seen_markings:
continue
seen_markings.add(marking_hash)
for group in hidden_transitions_to_enable:
for t3 in group:
if t3 != t and t3 not in curr_visit_trans and semantics.is_enabled(t3, net, curr_marking):
new_marking = semantics.execute(t3, net, curr_marking)
new_act_trans = curr_act_trans + [t3]
new_visit_trans = curr_visit_trans | {t3}
new_vis_mark = curr_vis_mark + [new_marking]
queue.append((new_marking, new_act_trans, new_visit_trans, new_vis_mark))
return [marking, activated_transitions, visited_transitions, all_visited_markings]
else:
j_indexes = [0] * len(hidden_transitions_to_enable)
for z in range(10000000):
something_changed = False
for k in range(
j_indexes[z % len(hidden_transitions_to_enable)],
len(
hidden_transitions_to_enable[
z % len(hidden_transitions_to_enable)
]
),
):
t3 = hidden_transitions_to_enable[
z % len(hidden_transitions_to_enable)
][j_indexes[z % len(hidden_transitions_to_enable)]]
if not t3 == t:
if semantics.is_enabled(t3, net, marking):
if t3 not in visited_transitions:
marking = semantics.execute(t3, net, marking)
activated_transitions.append(t3)
visited_transitions.add(t3)
all_visited_markings.append(marking)
something_changed = True
j_indexes[z % len(hidden_transitions_to_enable)] = (
j_indexes[z % len(hidden_transitions_to_enable)] + 1
)
if semantics.is_enabled(t, net, marking):
break
if semantics.is_enabled(t, net, marking):
break
if not something_changed:
break
return [
marking,
activated_transitions,
visited_transitions,
all_visited_markings,
]
[docs]
def apply_hidden_trans(
t,
net,
marking,
places_shortest_paths_by_hidden,
act_tr,
rec_depth,
visit_trans,
vis_mark,
exhaustive_invisible_exploration
):
"""
Apply hidden transitions in order to enable a given transition
Parameters
----------
t
Transition to eventually enable
net
Petri net
marking
Marking
places_shortest_paths_by_hidden
Shortest paths between places connected by hidden transitions
act_tr
All activated transitions
rec_depth
Current recursion depth
visit_trans
All visited transitions by hiddenTrans method
vis_mark
All visited markings
"""
if (
rec_depth >= TechnicalParameters.MAX_REC_DEPTH_HIDTRANSENABL.value
or t in visit_trans
):
return [net, marking, act_tr, vis_mark]
# if rec_depth > DebugConst.REACH_MRH:
# DebugConst.REACH_MRH = rec_depth
visit_trans.add(t)
marking_at_start = copy(marking)
places_with_missing = get_places_with_missing_tokens(t, marking)
hidden_transitions_to_enable = get_hidden_transitions_to_enable(
marking, places_with_missing, places_shortest_paths_by_hidden
)
if hidden_transitions_to_enable:
[marking, act_tr, visit_trans, vis_mark] = enable_hidden_transitions(
net,
marking,
act_tr,
visit_trans,
vis_mark,
hidden_transitions_to_enable,
t,
exhaustive_invisible_exploration
)
if not semantics.is_enabled(t, net, marking):
hidden_transitions_to_enable = get_hidden_transitions_to_enable(
marking, places_with_missing, places_shortest_paths_by_hidden
)
for z in range(len(hidden_transitions_to_enable)):
for k in range(len(hidden_transitions_to_enable[z])):
t4 = hidden_transitions_to_enable[z][k]
if not t4 == t:
if t4 not in visit_trans:
if not semantics.is_enabled(t4, net, marking):
[net, marking, act_tr, vis_mark] = (
apply_hidden_trans(
t4,
net,
marking,
places_shortest_paths_by_hidden,
act_tr,
rec_depth + 1,
visit_trans,
vis_mark,
exhaustive_invisible_exploration
)
)
if semantics.is_enabled(t4, net, marking):
marking = semantics.execute(t4, net, marking)
act_tr.append(t4)
visit_trans.add(t4)
vis_mark.append(marking)
if not semantics.is_enabled(t, net, marking):
if not (marking_at_start == marking):
[net, marking, act_tr, vis_mark] = apply_hidden_trans(
t,
net,
marking,
places_shortest_paths_by_hidden,
act_tr,
rec_depth + 1,
visit_trans,
vis_mark,
exhaustive_invisible_exploration
)
return [net, marking, act_tr, vis_mark]
[docs]
def break_condition_final_marking(marking, final_marking):
"""
Verify break condition for final marking
Parameters
-----------
marking
Current marking
final_marking
Target final marking
"""
final_marking_dict = dict(final_marking)
marking_dict = dict(marking)
final_marking_dict_keys = set(final_marking_dict.keys())
marking_dict_keys = set(marking_dict.keys())
return final_marking_dict_keys.issubset(marking_dict_keys)
[docs]
def apply_trace(
trace,
net,
initial_marking,
final_marking,
trans_map,
enable_pltr_fitness,
place_fitness,
transition_fitness,
notexisting_activities_in_model,
places_shortest_path_by_hidden,
consider_remaining_in_fitness,
activity_key="concept:name",
try_to_reach_final_marking_through_hidden=True,
stop_immediately_unfit=False,
walk_through_hidden_trans=True,
post_fix_caching=None,
marking_to_activity_caching=None,
is_reduction=False,
thread_maximum_ex_time=TechnicalParameters.MAX_DEF_THR_EX_TIME.value,
enable_postfix_cache=False,
enable_marktoact_cache=False,
cleaning_token_flood=False,
s_components=None,
trace_occurrences=1,
consider_activities_not_in_model_in_fitness=False,
exhaustive_invisible_exploration=False
):
"""
Apply the token replaying algorithm to a trace
Parameters
----------
trace
Trace in the event log
net
Petri net
initial_marking
Initial marking
final_marking
Final marking
trans_map
Map between transitions labels and transitions
enable_pltr_fitness
Enable fitness retrieval at place/transition level
place_fitness
Current dictionary of places associated with unfit traces
transition_fitness
Current dictionary of transitions associated with unfit traces
notexisting_activities_in_model
Map that stores the notexisting activities in the model
places_shortest_path_by_hidden
Shortest paths between places by hidden transitions
consider_remaining_in_fitness
Boolean value telling if the remaining tokens should be considered in fitness evaluation
activity_key
Name of the attribute that contains the activity
try_to_reach_final_marking_through_hidden
Boolean value that decides if we shall try to reach the final marking through hidden transitions
stop_immediately_unfit
Boolean value that decides if we shall stop immediately when a non-conformance is detected
walk_through_hidden_trans
Boolean value that decides if we shall walk through hidden transitions in order to enable visible transitions
post_fix_caching
Stores the post fix caching object
marking_to_activity_caching
Stores the marking-to-activity cache
is_reduction
Expresses if the token-based replay is called in a reduction attempt
thread_maximum_ex_time
Alignment threads maximum allowed execution time
enable_postfix_cache
Enables postfix cache
enable_marktoact_cache
Enables marking to activity cache
cleaning_token_flood
Decides if a cleaning of the token flood shall be operated
s_components
S-components of the Petri net (if workflow net)
"""
trace_activities = [event[activity_key] for event in trace]
act_trans = []
transitions_with_problems = []
vis_mark = []
activating_transition_index = {}
activating_transition_interval = []
used_postfix_cache = False
marking = copy(initial_marking)
vis_mark.append(marking)
missing = 0
consumed = 0
sum_tokens_im = 0
for place in initial_marking:
sum_tokens_im = sum_tokens_im + initial_marking[place]
sum_tokens_fm = 0
for place in final_marking:
sum_tokens_fm = sum_tokens_fm + final_marking[place]
produced = sum_tokens_im
current_event_map = {}
current_remaining_map = {}
for i in range(len(trace)):
if enable_postfix_cache and (
str(trace_activities) in post_fix_caching.cache
and hash(marking) in post_fix_caching.cache[str(trace_activities)]
):
trans_to_act = post_fix_caching.cache[str(trace_activities)][
hash(marking)
]["trans_to_activate"]
for z in range(len(trans_to_act)):
t = trans_to_act[z]
act_trans.append(t)
used_postfix_cache = True
marking = post_fix_caching.cache[str(trace_activities)][
hash(marking)
]["final_marking"]
break
else:
prev_len_activated_transitions = len(act_trans)
if enable_marktoact_cache and (
hash(marking) in marking_to_activity_caching.cache
and trace[i][activity_key]
in marking_to_activity_caching.cache[hash(marking)]
and trace[i - 1][activity_key]
== marking_to_activity_caching.cache[hash(marking)][
trace[i][activity_key]
]["previousActivity"]
):
this_end_marking = marking_to_activity_caching.cache[
hash(marking)
][trace[i][activity_key]]["end_marking"]
this_act_trans = marking_to_activity_caching.cache[
hash(marking)
][trace[i][activity_key]]["this_activated_transitions"]
this_vis_markings = marking_to_activity_caching.cache[
hash(marking)
][trace[i][activity_key]]["this_visited_markings"]
act_trans = act_trans + this_act_trans
vis_mark = vis_mark + this_vis_markings
marking = copy(this_end_marking)
else:
if trace[i][activity_key] in trans_map:
current_event_map.update(trace[i])
# change 14/10/2020: to better support duplicate transitions with this approach, we check
# whether in the current marking there is at least one transition corresponding to the activity
# key without looking at the transition map (that contains
# one entry per label)
corr_en_t = [
x
for x in semantics.enabled_transitions(net, marking)
if x.label == trace[i][activity_key]
]
if corr_en_t:
t = corr_en_t[0]
else:
t = trans_map[trace[i][activity_key]]
if walk_through_hidden_trans and not semantics.is_enabled(
t, net, marking
):
visited_transitions = set()
prev_len_activated_transitions = len(act_trans)
[net, new_marking, new_act_trans, new_vis_mark] = (
apply_hidden_trans(
t,
net,
copy(marking),
places_shortest_path_by_hidden,
copy(act_trans),
0,
copy(visited_transitions),
copy(vis_mark),
exhaustive_invisible_exploration
)
)
for jj5 in range(len(act_trans), len(new_act_trans)):
tt5 = new_act_trans[jj5]
c, cmap = get_consumed_tokens(tt5)
p, pmap = get_produced_tokens(tt5)
if enable_pltr_fitness:
for pl2 in cmap:
if pl2 in place_fitness:
place_fitness[pl2]["c"] += (
cmap[pl2] * trace_occurrences
)
for pl2 in pmap:
if pl2 in place_fitness:
place_fitness[pl2]["p"] += (
pmap[pl2] * trace_occurrences
)
consumed = consumed + c
produced = produced + p
marking, act_trans, vis_mark = (
new_marking,
new_act_trans,
new_vis_mark,
)
is_initially_enabled = True
old_marking_names = [x.name for x in list(marking.keys())]
if not semantics.is_enabled(t, net, marking):
is_initially_enabled = False
transitions_with_problems.append(t)
if stop_immediately_unfit:
missing = missing + 1
break
[m, tokens_added] = add_missing_tokens(t, marking)
missing = missing + m
if enable_pltr_fitness:
for place in tokens_added.keys():
if place in place_fitness:
place_fitness[place][
"underfed_traces"
].add(trace)
place_fitness[place]["m"] += tokens_added[
place
]
if (
trace
not in transition_fitness[t]["underfed_traces"]
):
transition_fitness[t]["underfed_traces"][
trace
] = list()
transition_fitness[t]["underfed_traces"][
trace
].append(current_event_map)
elif enable_pltr_fitness:
if trace not in transition_fitness[t]["fit_traces"]:
transition_fitness[t]["fit_traces"][trace] = list()
transition_fitness[t]["fit_traces"][trace].append(
current_event_map
)
c, cmap = get_consumed_tokens(t)
p, pmap = get_produced_tokens(t)
consumed = consumed + c
produced = produced + p
if enable_pltr_fitness:
for pl2 in cmap:
if pl2 in place_fitness:
place_fitness[pl2]["c"] += (
cmap[pl2] * trace_occurrences
)
for pl2 in pmap:
if pl2 in place_fitness:
place_fitness[pl2]["p"] += (
pmap[pl2] * trace_occurrences
)
if semantics.is_enabled(t, net, marking):
marking = semantics.execute(t, net, marking)
act_trans.append(t)
vis_mark.append(marking)
if not is_initially_enabled and cleaning_token_flood:
# here, a routine for cleaning token flood shall go
new_marking_names = [
x.name for x in list(marking.keys())
]
new_marking_names_diff = [
x
for x in new_marking_names
if x not in old_marking_names
]
new_marking_names_inte = [
x
for x in new_marking_names
if x in old_marking_names
]
for p1 in new_marking_names_inte:
for p2 in new_marking_names_diff:
for comp in s_components:
if p1 in comp and p2 in comp:
place_to_delete = [
place
for place in list(marking.keys())
if place.name == p1
]
if len(place_to_delete) == 1:
del marking[place_to_delete[0]]
if (
not place_to_delete[0]
in current_remaining_map
):
current_remaining_map[
place_to_delete[0]
] = 0
current_remaining_map[
place_to_delete[0]
] = (
current_remaining_map[
place_to_delete[0]
]
+ 1
)
pass
else:
if (
not trace[i][activity_key]
in notexisting_activities_in_model
):
notexisting_activities_in_model[
trace[i][activity_key]
] = {}
notexisting_activities_in_model[trace[i][activity_key]][
trace
] = current_event_map
del trace_activities[0]
if (
len(trace_activities)
< TechnicalParameters.MAX_POSTFIX_SUFFIX_LENGTH.value
):
activating_transition_index[str(trace_activities)] = {
"index": len(act_trans),
"marking": hash(marking),
}
if i > 0:
activating_transition_interval.append(
[
trace[i][activity_key],
prev_len_activated_transitions,
len(act_trans),
trace[i - 1][activity_key],
]
)
else:
activating_transition_interval.append(
[
trace[i][activity_key],
prev_len_activated_transitions,
len(act_trans),
"",
]
)
if try_to_reach_final_marking_through_hidden and not used_postfix_cache:
for i in range(TechnicalParameters.MAX_IT_FINAL1.value):
if not break_condition_final_marking(marking, final_marking):
hidden_transitions_to_enable = (
get_req_transitions_for_final_marking(
marking, final_marking, places_shortest_path_by_hidden
)
)
for group in hidden_transitions_to_enable:
for t in group:
if semantics.is_enabled(t, net, marking):
marking = semantics.execute(t, net, marking)
act_trans.append(t)
vis_mark.append(marking)
c, cmap = get_consumed_tokens(t)
p, pmap = get_produced_tokens(t)
if enable_pltr_fitness:
for pl2 in cmap:
if pl2 in place_fitness:
place_fitness[pl2]["c"] += (
cmap[pl2] * trace_occurrences
)
for pl2 in pmap:
if pl2 in place_fitness:
place_fitness[pl2]["p"] += (
pmap[pl2] * trace_occurrences
)
consumed = consumed + c
produced = produced + p
if break_condition_final_marking(marking, final_marking):
break
else:
break
# try to reach the final marking in a different fashion, if not already
# reached
if not break_condition_final_marking(marking, final_marking):
if len(final_marking) == 1:
sink_place = list(final_marking)[0]
connections_to_sink = []
for place in marking:
if (
place in places_shortest_path_by_hidden
and sink_place in places_shortest_path_by_hidden[place]
):
connections_to_sink.append(
[
place,
places_shortest_path_by_hidden[place][
sink_place
],
]
)
connections_to_sink = sorted(
connections_to_sink, key=lambda x: len(x[1])
)
for i in range(TechnicalParameters.MAX_IT_FINAL2.value):
for j in range(len(connections_to_sink)):
for z in range(len(connections_to_sink[j][1])):
t = connections_to_sink[j][1][z]
if semantics.is_enabled(t, net, marking):
marking = semantics.execute(t, net, marking)
act_trans.append(t)
c, cmap = get_consumed_tokens(t)
p, pmap = get_produced_tokens(t)
if enable_pltr_fitness:
for pl2 in cmap:
if pl2 in place_fitness:
place_fitness[pl2]["c"] += (
cmap[pl2] * trace_occurrences
)
for pl2 in pmap:
if pl2 in place_fitness:
place_fitness[pl2]["p"] += (
pmap[pl2] * trace_occurrences
)
consumed = consumed + c
produced = produced + p
vis_mark.append(marking)
continue
else:
break
marking_before_cleaning = copy(marking)
# 25/02/2020: fix to the missing tokens mark (if the final marking is not reached)
# (not inficiating the previous stat)
diff_fin_mark_mark = Marking()
for p in final_marking:
diff = final_marking[p] - marking[p]
if diff > 0:
diff_fin_mark_mark[p] = diff
# set up the remaining tokens count
remaining = 0
for p in marking:
if p in final_marking:
marking[p] = max(0, marking[p] - final_marking[p])
if enable_pltr_fitness:
if marking[p] > 0:
if p in place_fitness:
if trace not in place_fitness[p]["underfed_traces"]:
place_fitness[p]["overfed_traces"].add(trace)
place_fitness[p]["r"] += marking[p] * trace_occurrences
# 25/02/2020: added missing part to statistics
elif enable_pltr_fitness:
if p in place_fitness:
if trace not in place_fitness[p]["underfed_traces"]:
place_fitness[p]["overfed_traces"].add(trace)
place_fitness[p]["r"] += marking[p] * trace_occurrences
remaining = remaining + marking[p]
for p in current_remaining_map:
if enable_pltr_fitness:
if p in place_fitness:
if (
trace not in place_fitness[p]["underfed_traces"]
and trace not in place_fitness[p]["overfed_traces"]
):
place_fitness[p]["overfed_traces"].add(trace)
place_fitness[p]["r"] += (
current_remaining_map[p] * trace_occurrences
)
remaining = remaining + current_remaining_map[p]
if consider_remaining_in_fitness:
is_fit = (missing == 0) and (remaining == 0)
else:
is_fit = missing == 0
if (
consider_activities_not_in_model_in_fitness
and notexisting_activities_in_model
):
is_fit = False
# separate global counts from local statistics in the case these are not
# enabled by the options
for pl in final_marking:
consumed += final_marking[pl]
# 25/02/2020: update the missing tokens count here
for pl in diff_fin_mark_mark:
missing += diff_fin_mark_mark[pl]
if enable_pltr_fitness:
for pl in initial_marking:
place_fitness[pl]["p"] += initial_marking[pl] * trace_occurrences
for pl in final_marking:
place_fitness[pl]["c"] += final_marking[pl] * trace_occurrences
for pl in diff_fin_mark_mark:
place_fitness[pl]["m"] += (
diff_fin_mark_mark[pl] * trace_occurrences
)
if consumed > 0 and produced > 0:
trace_fitness = 0.5 * (
1.0 - float(missing) / float(consumed)
) + 0.5 * (1.0 - float(remaining) / float(produced))
else:
trace_fitness = 1.0
if is_fit:
for suffix in activating_transition_index:
if suffix not in post_fix_caching.cache:
post_fix_caching.cache[suffix] = {}
if (
activating_transition_index[suffix]["marking"]
not in post_fix_caching.cache[suffix]
):
post_fix_caching.cache[suffix][
activating_transition_index[suffix]["marking"]
] = {
"trans_to_activate": act_trans[
activating_transition_index[suffix]["index"]:
],
"final_marking": marking,
}
for trans in activating_transition_interval:
activity = trans[0]
start_marking_index = trans[1]
end_marking_index = trans[2]
previous_activity = trans[3]
if end_marking_index < len(vis_mark):
start_marking_object = vis_mark[start_marking_index]
start_marking_hash = hash(start_marking_object)
end_marking_object = vis_mark[end_marking_index]
if activity in trans_map:
this_activated_trans = act_trans[
start_marking_index:end_marking_index
]
this_visited_markings = vis_mark[
start_marking_index + 1: end_marking_index + 1
]
if (
start_marking_hash
not in marking_to_activity_caching.cache
):
marking_to_activity_caching.cache[
start_marking_hash
] = {}
if (
activity
not in marking_to_activity_caching.cache[
start_marking_hash
]
):
marking_to_activity_caching.cache[start_marking_hash][
activity
] = {
"start_marking": start_marking_object,
"end_marking": end_marking_object,
"this_activated_transitions": this_activated_trans,
"this_visited_markings": this_visited_markings,
"previousActivity": previous_activity,
}
return [
is_fit,
trace_fitness,
act_trans,
transitions_with_problems,
marking_before_cleaning,
align_utils.get_visible_transitions_eventually_enabled_by_marking(
net, marking_before_cleaning
),
missing,
consumed,
remaining,
produced,
]
[docs]
class ApplyTraceTokenReplay:
def __init__(
self,
trace,
net,
initial_marking,
final_marking,
trans_map,
enable_pltr_fitness,
place_fitness,
transition_fitness,
notexisting_activities_in_model,
places_shortest_path_by_hidden,
consider_remaining_in_fitness,
activity_key="concept:name",
reach_mark_through_hidden=True,
stop_immediately_when_unfit=False,
walk_through_hidden_trans=True,
post_fix_caching=None,
marking_to_activity_caching=None,
is_reduction=False,
thread_maximum_ex_time=TechnicalParameters.MAX_DEF_THR_EX_TIME.value,
cleaning_token_flood=False,
s_components=None,
trace_occurrences=1,
consider_activities_not_in_model_in_fitness=False,
exhaustive_invisible_exploration=False
):
"""
Constructor
net
Petri net
initial_marking
Initial marking
final_marking
Final marking
trans_map
Map between transitions labels and transitions
enable_pltr_fitness
Enable fitness retrieval at place/transition level
place_fitness
Current dictionary of places associated with unfit traces
transition_fitness
Current dictionary of transitions associated with unfit traces
notexisting_activities_in_model
Map that stores the notexisting activities in the model
triggered in the log
places_shortest_path_by_hidden
Shortest paths between places by hidden transitions
consider_remaining_in_fitness
Boolean value telling if the remaining tokens should be considered in fitness evaluation
activity_key
Name of the attribute that contains the activity
try_to_reach_final_marking_through_hidden
Boolean value that decides if we shall try to reach the final marking through hidden transitions
stop_immediately_unfit
Boolean value that decides if we shall stop immediately when a non-conformance is detected
walk_through_hidden_trans
Boolean value that decides if we shall walk through hidden transitions in order to enable visible transitions
post_fix_caching
Stores the post fix caching object
marking_to_activity_caching
Stores the marking-to-activity cache
is_reduction
Expresses if the token-based replay is called in a reduction attempt
thread_maximum_ex_time
Alignment threads maximum allowed execution time
cleaning_token_flood
Decides if a cleaning of the token flood shall be operated
s_components
S-components of the Petri net
trace_occurrences
Trace weight (number of occurrences)
"""
self.thread_is_alive = True
self.trace = trace
self.net = net
self.initial_marking = initial_marking
self.final_marking = final_marking
self.trans_map = trans_map
self.enable_pltr_fitness = enable_pltr_fitness
self.place_fitness = place_fitness
self.transition_fitness = transition_fitness
self.notexisting_activities_in_model = notexisting_activities_in_model
self.places_shortest_path_by_hidden = places_shortest_path_by_hidden
self.consider_remaining_in_fitness = consider_remaining_in_fitness
self.consider_activities_not_in_model_in_fitness = (
consider_activities_not_in_model_in_fitness
)
self.activity_key = activity_key
self.try_to_reach_final_marking_through_hidden = (
reach_mark_through_hidden
)
self.stop_immediately_when_unfit = stop_immediately_when_unfit
self.walk_through_hidden_trans = walk_through_hidden_trans
self.post_fix_caching = post_fix_caching
self.marking_to_activity_caching = marking_to_activity_caching
self.is_reduction = is_reduction
self.thread_maximum_ex_time = thread_maximum_ex_time
self.cleaning_token_flood = cleaning_token_flood
self.enable_postfix_cache = (
TechnicalParameters.ENABLE_POSTFIX_CACHE.value
)
self.enable_marktoact_cache = (
TechnicalParameters.ENABLE_MARKTOACT_CACHE.value
)
if self.is_reduction:
self.enable_postfix_cache = True
self.enable_marktoact_cache = True
self.t_fit = None
self.t_value = None
self.act_trans = None
self.trans_probl = None
self.reached_marking = None
self.enabled_trans_in_mark = None
self.missing = None
self.consumed = None
self.remaining = None
self.produced = None
self.s_components = s_components
self.trace_occurrences = trace_occurrences
self.exhaustive_invisible_exploration = exhaustive_invisible_exploration
[docs]
def run(self):
"""
Runs the thread and stores the results
"""
(self.t_fit,
self.t_value,
self.act_trans,
self.trans_probl,
self.reached_marking,
self.enabled_trans_in_mark,
self.missing,
self.consumed,
self.remaining,
self.produced,
) = apply_trace(self.trace,
self.net,
self.initial_marking,
self.final_marking,
self.trans_map,
self.enable_pltr_fitness,
self.place_fitness,
self.transition_fitness,
self.notexisting_activities_in_model,
self.places_shortest_path_by_hidden,
self.consider_remaining_in_fitness,
activity_key=self.activity_key,
try_to_reach_final_marking_through_hidden=self.try_to_reach_final_marking_through_hidden,
stop_immediately_unfit=self.stop_immediately_when_unfit,
walk_through_hidden_trans=self.walk_through_hidden_trans,
post_fix_caching=self.post_fix_caching,
marking_to_activity_caching=self.marking_to_activity_caching,
is_reduction=self.is_reduction,
thread_maximum_ex_time=self.thread_maximum_ex_time,
enable_postfix_cache=self.enable_postfix_cache,
enable_marktoact_cache=self.enable_marktoact_cache,
cleaning_token_flood=self.cleaning_token_flood,
s_components=self.s_components,
trace_occurrences=self.trace_occurrences,
consider_activities_not_in_model_in_fitness=self.consider_activities_not_in_model_in_fitness,
exhaustive_invisible_exploration=self.exhaustive_invisible_exploration
)
self.thread_is_alive = False
[docs]
class PostFixCaching:
"""
Post fix caching object
"""
def __init__(self):
self.cache = 0
self.cache = {}
[docs]
class MarkingToActivityCaching:
"""
Marking to activity caching
"""
def __init__(self):
self.cache = 0
self.cache = {}
[docs]
def get_variant_from_trace(trace, activity_key, disable_variants=False):
"""
Gets the variant from the trace (allow disabling)
Parameters
------------
trace
Trace
activity_key
Attribute that is the activity
disable_variants
Boolean value that disable variants
Returns
-------------
variant
Variant describing the trace
"""
if disable_variants:
return str(hash(trace))
parameters = {}
parameters[variants_util.Parameters.ACTIVITY_KEY] = activity_key
return variants_util.get_variant_from_trace(trace, parameters=parameters)
[docs]
def transcribe_result(t, return_object_names=True):
corr_value = {
"trace_is_fit": copy(t.t_fit),
"trace_fitness": float(copy(t.t_value)),
"activated_transitions": copy(t.act_trans),
"reached_marking": copy(t.reached_marking),
"enabled_transitions_in_marking": copy(t.enabled_trans_in_mark),
"transitions_with_problems": copy(t.trans_probl),
"missing_tokens": int(t.missing),
"consumed_tokens": int(t.consumed),
"remaining_tokens": int(t.remaining),
"produced_tokens": int(t.produced),
}
if return_object_names:
corr_value["activated_transitions_labels"] = [
x.label for x in corr_value["activated_transitions"]
]
corr_value["activated_transitions"] = [
x.name for x in corr_value["activated_transitions"]
]
corr_value["enabled_transitions_in_marking_labels"] = [
x.label for x in corr_value["enabled_transitions_in_marking"]
]
corr_value["enabled_transitions_in_marking"] = [
x.name for x in corr_value["enabled_transitions_in_marking"]
]
corr_value["transitions_with_problems"] = [
x.name for x in corr_value["transitions_with_problems"]
]
corr_value["reached_marking"] = {
x.name: y for x, y in corr_value["reached_marking"].items()
}
return corr_value
[docs]
def apply_log(
log,
net,
initial_marking,
final_marking,
enable_pltr_fitness=False,
consider_remaining_in_fitness=False,
activity_key="concept:name",
reach_mark_through_hidden=True,
stop_immediately_unfit=False,
walk_through_hidden_trans=True,
places_shortest_path_by_hidden=None,
is_reduction=False,
thread_maximum_ex_time=TechnicalParameters.MAX_DEF_THR_EX_TIME.value,
cleaning_token_flood=False,
disable_variants=False,
return_object_names=False,
show_progress_bar=True,
consider_activities_not_in_model_in_fitness=False,
case_id_key=constants.CASE_CONCEPT_NAME,
exhaustive_invisible_exploration = False
):
"""
Apply token-based replay to a log
Parameters
----------
log
Trace log
net
Petri net
initial_marking
Initial marking
final_marking
Final marking
enable_pltr_fitness
Enable fitness retrieval at place level
consider_remaining_in_fitness
Boolean value telling if the remaining tokens should be considered in fitness evaluation
activity_key
Name of the attribute that contains the activity
reach_mark_through_hidden
Boolean value that decides if we shall try to reach the final marking through hidden transitions
stop_immediately_unfit
Boolean value that decides if we shall stop immediately when a non-conformance is detected
walk_through_hidden_trans
Boolean value that decides if we shall walk through hidden transitions in order to enable visible transitions
places_shortest_path_by_hidden
Shortest paths between places by hidden transitions
is_reduction
Expresses if the token-based replay is called in a reduction attempt
thread_maximum_ex_time
Alignment threads maximum allowed execution time
cleaning_token_flood
Decides if a cleaning of the token flood shall be operated
disable_variants
Disable variants grouping
return_object_names
Decides whether names instead of object pointers shall be returned
"""
post_fix_cache = PostFixCaching()
marking_to_activity_cache = MarkingToActivityCaching()
if places_shortest_path_by_hidden is None:
places_shortest_path_by_hidden = get_places_shortest_path_by_hidden(
net, TechnicalParameters.MAX_REC_DEPTH.value
)
place_fitness_per_trace = {}
transition_fitness_per_trace = {}
aligned_traces = []
if enable_pltr_fitness:
for place in net.places:
place_fitness_per_trace[place] = {
"underfed_traces": set(),
"overfed_traces": set(),
"m": 0,
"r": 0,
"c": 0,
"p": 0,
}
for transition in net.transitions:
if transition.label:
transition_fitness_per_trace[transition] = {
"underfed_traces": {},
"fit_traces": {},
}
s_components = []
if cleaning_token_flood:
s_components = get_s_components_from_petri(
net, initial_marking, final_marking
)
notexisting_activities_in_model = {}
# update 23/04/2024 (removing undeterminism with duplicate transitions)
# transitions are now fired as follows:
# - (from a previous update on 14/10/2020) it is checked if in the current market any transition having as label the current activity is enabled. If that's true, then the given transition is fired
# - otherwise, if there are no corresponding transitions enabled in the current marking, the TBR tries to enable with invisibles always the same transition (removing undeterminism)
trans_map = {}
for t in sorted(list(net.transitions), key=lambda x: x.name):
trans_map[t.label] = t
if pandas_utils.check_is_pandas_dataframe(log):
traces = [
(tuple(x), y)
for y, x in log.groupby(case_id_key)[activity_key]
.agg(list)
.to_dict()
.items()
]
traces = [(traces[i][0], i) for i in range(len(traces))]
else:
traces = [
(tuple(x[activity_key] for x in log[i]), i)
for i in range(len(log))
]
variants = dict()
for t in traces:
if t[0] not in variants:
variants[t[0]] = list()
variants[t[0]].append(t[1])
traces = [t[0] for t in traces]
vc = [(k, v) for k, v in variants.items()]
vc = list(sorted(vc, key=lambda x: (len(x[1]), x[0]), reverse=True))
threads_results = {}
progress = None
if (
importlib.util.find_spec("tqdm")
and show_progress_bar
and len(variants) > 1
):
from tqdm.auto import tqdm
if disable_variants and not pandas_utils.check_is_pandas_dataframe(
log
):
progress = tqdm(
total=len(traces),
desc="replaying log with TBR, completed traces :: ",
)
else:
progress = tqdm(
total=len(variants),
desc="replaying log with TBR, completed traces :: ",
)
for i in range(len(vc)):
variant = vc[i][0]
all_cases = vc[i][1]
if disable_variants and not pandas_utils.check_is_pandas_dataframe(
log
):
for j in range(len(all_cases)):
case_position = all_cases[j]
considered_case = log[case_position]
t = ApplyTraceTokenReplay(
considered_case,
net,
initial_marking,
final_marking,
trans_map,
enable_pltr_fitness,
place_fitness_per_trace,
transition_fitness_per_trace,
notexisting_activities_in_model,
places_shortest_path_by_hidden,
consider_remaining_in_fitness,
activity_key=activity_key,
reach_mark_through_hidden=reach_mark_through_hidden,
stop_immediately_when_unfit=stop_immediately_unfit,
walk_through_hidden_trans=walk_through_hidden_trans,
post_fix_caching=post_fix_cache,
marking_to_activity_caching=marking_to_activity_cache,
is_reduction=is_reduction,
thread_maximum_ex_time=thread_maximum_ex_time,
cleaning_token_flood=cleaning_token_flood,
s_components=s_components,
trace_occurrences=1,
consider_activities_not_in_model_in_fitness=consider_activities_not_in_model_in_fitness,
exhaustive_invisible_exploration=exhaustive_invisible_exploration
)
t.run()
threads_results[case_position] = transcribe_result(
t, return_object_names=return_object_names
)
if progress is not None:
progress.update()
else:
considered_case = variants_util.variant_to_trace(
variant,
parameters={
constants.PARAMETER_CONSTANT_ACTIVITY_KEY: activity_key
},
)
t = ApplyTraceTokenReplay(
considered_case,
net,
initial_marking,
final_marking,
trans_map,
enable_pltr_fitness,
place_fitness_per_trace,
transition_fitness_per_trace,
notexisting_activities_in_model,
places_shortest_path_by_hidden,
consider_remaining_in_fitness,
activity_key=activity_key,
reach_mark_through_hidden=reach_mark_through_hidden,
stop_immediately_when_unfit=stop_immediately_unfit,
walk_through_hidden_trans=walk_through_hidden_trans,
post_fix_caching=post_fix_cache,
marking_to_activity_caching=marking_to_activity_cache,
is_reduction=is_reduction,
thread_maximum_ex_time=thread_maximum_ex_time,
cleaning_token_flood=cleaning_token_flood,
s_components=s_components,
trace_occurrences=len(vc[i][1]),
consider_activities_not_in_model_in_fitness=consider_activities_not_in_model_in_fitness,
exhaustive_invisible_exploration=exhaustive_invisible_exploration
)
t.run()
for j in range(len(all_cases)):
case_position = all_cases[j]
threads_results[case_position] = transcribe_result(
t, return_object_names=return_object_names
)
if progress is not None:
progress.update()
for i in range(len(traces)):
aligned_traces.append(threads_results[i])
# gracefully close progress bar
if progress is not None:
progress.close()
del progress
if enable_pltr_fitness:
return (
aligned_traces,
place_fitness_per_trace,
transition_fitness_per_trace,
notexisting_activities_in_model,
)
else:
return aligned_traces
[docs]
def apply(
log: EventLog,
net: PetriNet,
initial_marking: Marking,
final_marking: Marking,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> typing.ListAlignments:
"""
Method to apply token-based replay
Parameters
-----------
log
Log
net
Petri net
initial_marking
Initial marking
final_marking
Final marking
parameters
Parameters of the algorithm
"""
if parameters is None:
parameters = {}
enable_pltr_fitness = exec_utils.get_param_value(
Parameters.ENABLE_PLTR_FITNESS, parameters, False
)
# changed default to uniform behavior with token-based replay fitness
consider_remaining_in_fitness = exec_utils.get_param_value(
Parameters.CONSIDER_REMAINING_IN_FITNESS, parameters, True
)
try_to_reach_final_marking_through_hidden = exec_utils.get_param_value(
Parameters.TRY_TO_REACH_FINAL_MARKING_THROUGH_HIDDEN, parameters, True
)
stop_immediately_unfit = exec_utils.get_param_value(
Parameters.STOP_IMMEDIATELY_UNFIT, parameters, False
)
walk_through_hidden_trans = exec_utils.get_param_value(
Parameters.WALK_THROUGH_HIDDEN_TRANS, parameters, True
)
is_reduction = exec_utils.get_param_value(
Parameters.IS_REDUCTION, parameters, False
)
cleaning_token_flood = exec_utils.get_param_value(
Parameters.CLEANING_TOKEN_FLOOD, parameters, False
)
disable_variants = exec_utils.get_param_value(
Parameters.DISABLE_VARIANTS, parameters, enable_pltr_fitness
)
return_names = exec_utils.get_param_value(
Parameters.RETURN_NAMES, parameters, False
)
thread_maximum_ex_time = exec_utils.get_param_value(
Parameters.THREAD_MAX_EX_TIME,
parameters,
TechnicalParameters.MAX_DEF_THR_EX_TIME.value,
)
places_shortest_path_by_hidden = exec_utils.get_param_value(
Parameters.PLACES_SHORTEST_PATH_BY_HIDDEN, parameters, None
)
activity_key = exec_utils.get_param_value(
Parameters.ACTIVITY_KEY, parameters, xes_util.DEFAULT_NAME_KEY
)
consider_activities_not_in_model_in_fitness = exec_utils.get_param_value(
Parameters.CONSIDER_ACTIVITIES_NOT_IN_MODEL_IN_FITNESS,
parameters,
False,
)
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, constants.CASE_CONCEPT_NAME
)
exhaustive_invisible_exploration = exec_utils.get_param_value(Parameters.EXHAUSTIVE_INVISIBLE_EXPLORATION, parameters, False)
if type(log) is not pd.DataFrame:
log = log_converter.apply(
log,
variant=log_converter.Variants.TO_EVENT_LOG,
parameters=parameters,
)
return apply_log(
log,
net,
initial_marking,
final_marking,
enable_pltr_fitness=enable_pltr_fitness,
consider_remaining_in_fitness=consider_remaining_in_fitness,
reach_mark_through_hidden=try_to_reach_final_marking_through_hidden,
stop_immediately_unfit=stop_immediately_unfit,
walk_through_hidden_trans=walk_through_hidden_trans,
places_shortest_path_by_hidden=places_shortest_path_by_hidden,
activity_key=activity_key,
is_reduction=is_reduction,
thread_maximum_ex_time=thread_maximum_ex_time,
cleaning_token_flood=cleaning_token_flood,
disable_variants=disable_variants,
return_object_names=return_names,
show_progress_bar=show_progress_bar,
consider_activities_not_in_model_in_fitness=consider_activities_not_in_model_in_fitness,
case_id_key=case_id_key,
exhaustive_invisible_exploration=exhaustive_invisible_exploration
)
[docs]
def apply_variants_list(
variants_list, net, initial_marking, final_marking, parameters=None
):
if parameters is None:
parameters = {}
parameters[Parameters.RETURN_NAMES] = True
log = log_implementation.EventLog()
for var_item in variants_list:
trace = variants_util.variant_to_trace(
var_item[0], parameters=parameters
)
log.append(trace)
return apply(
log, net, initial_marking, final_marking, parameters=parameters
)
[docs]
def apply_variants_dictionary(
variants, net, initial_marking, final_marking, parameters=None
):
if parameters is None:
parameters = {}
var_list = {x: len(y) for x, y in variants.items()}
return apply_variants_list(
var_list, net, initial_marking, final_marking, parameters=parameters
)
[docs]
def apply_variants_list_petri_string(
variants_list, petri_string, parameters=None
):
if parameters is None:
parameters = {}
from pm4py.objects.petri_net.importer.variants import (
pnml as petri_importer,
)
net, im, fm = petri_importer.import_petri_from_string(
petri_string, parameters=parameters
)
return apply_variants_list(
variants_list, net, im, fm, parameters=parameters
)
[docs]
def apply_variants_list_petri_string_multiprocessing(
output, variants_list, petri_string, parameters=None
):
if parameters is None:
parameters = {}
ret = apply_variants_list_petri_string(
variants_list, petri_string, parameters=parameters
)
output.put(ret)
[docs]
def get_diagnostics_dataframe(
log: EventLog,
tbr_output: typing.ListAlignments,
parameters: Optional[Dict[Union[str, Parameters], Any]] = None,
) -> pd.DataFrame:
"""
Gets the results of token-based replay in a dataframe
Parameters
--------------
log
Event log
tbr_output
Output of the token-based replay technique
Returns
--------------
dataframe
Diagnostics dataframe
"""
if parameters is None:
parameters = {}
case_id_key = exec_utils.get_param_value(
Parameters.CASE_ID_KEY, parameters, xes_util.DEFAULT_TRACEID_KEY
)
import pandas as pd
diagn_stream = []
for index in range(len(log)):
case_id = log[index].attributes[case_id_key]
is_fit = tbr_output[index]["trace_is_fit"]
trace_fitness = tbr_output[index]["trace_fitness"]
missing = tbr_output[index]["missing_tokens"]
remaining = tbr_output[index]["remaining_tokens"]
produced = tbr_output[index]["produced_tokens"]
consumed = tbr_output[index]["consumed_tokens"]
diagn_stream.append(
{
"case_id": case_id,
"is_fit": is_fit,
"trace_fitness": trace_fitness,
"missing": missing,
"remaining": remaining,
"produced": produced,
"consumed": consumed,
}
)
return pandas_utils.instantiate_dataframe(diagn_stream)