Source code for pm4py.algo.filtering.dfg.dfg_filtering

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import math
from copy import deepcopy

from pm4py.objects.dfg.utils import dfg_utils
from pm4py.util import constants, nx_utils

DEFAULT_NOISE_THRESH_DF = 0.16


[docs] def generate_nx_graph_from_dfg(dfg, start_activities, end_activities, activities_count): """ Generate a NetworkX graph for reachability-checking purposes out of the DFG Parameters -------------- dfg DFG start_activities Start activities end_activities End activities activities_count Activities of the DFG along with their count Returns -------------- G NetworkX digraph start_node Identifier of the start node (connected to all the start activities) end_node Identifier of the end node (connected to all the end activities) """ start_node = '4d872045-8664-4e21-bd55-5da5edb096fe' # made static to avoid undeterminism end_node = 'b8136db7-b162-4763-bd68-4d5ccbcdff87' # made static to avoid undeterminism G = nx_utils.DiGraph() G.add_node(start_node) G.add_node(end_node) for act in activities_count: G.add_node(act) for edge in dfg: G.add_edge(edge[0], edge[1]) for act in start_activities: G.add_edge(start_node, act) for act in end_activities: G.add_edge(act, end_node) return G, start_node, end_node
[docs] def filter_dfg_on_activities_percentage(dfg0, start_activities0, end_activities0, activities_count0, percentage): """ Filters a DFG (complete, and so connected) on the specified percentage of activities (but ensuring that every node is still reachable from the start and to the end) Parameters ---------------- dfg0 (Complete, and so connected) DFG start_activities0 Start activities end_activities0 End activities activities_count0 Activities of the DFG along with their count percentage Percentage of activities Returns ---------------- dfg (Filtered) DFG start_activities (Filtered) start activities end_activities (Filtered) end activities activities_count (Filtered) activities of the DFG along with their count """ # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) start_activities = deepcopy(start_activities0) end_activities = deepcopy(end_activities0) activities_count = deepcopy(activities_count0) if len(activities_count) > 1 and len(dfg) > 1: activities_count_sorted_list = sorted([(x, y) for x, y in activities_count.items()], key=lambda x: (x[1], x[0]), reverse=True) # retrieve the minimum list of activities to keep in the graph, according to the percentage min_set_activities_to_keep = set( x[0] for x in activities_count_sorted_list[:math.ceil((len(activities_count) - 1) * percentage) + 1]) # retrieve the activities that can be possibly discarded, according to the percentage activities_to_possibly_discard = list( x[0] for x in activities_count_sorted_list[math.ceil((len(activities_count) - 1) * percentage) + 1:]) activities_to_possibly_discard.reverse() # build a graph structure that helps in deciding whether the activities can be discarded safely graph, start_node, end_node = generate_nx_graph_from_dfg(dfg, start_activities, end_activities, activities_count) for act in activities_to_possibly_discard: new_graph = nx_utils.DiGraph(graph) # try to remove the node new_graph.remove_node(act) # check whether all the activities to keep can be reached from the start and can reach the end reachable_from_start = set(nx_utils.descendants(new_graph, start_node)) reachable_to_end = set(nx_utils.ancestors(new_graph, end_node)) if min_set_activities_to_keep.issubset(reachable_from_start) and min_set_activities_to_keep.issubset( reachable_to_end): # if that is the case, try to elaborate the new DFG (without the activity) new_dfg = {x: y for x, y in dfg.items() if x[0] != act and x[1] != act} # if that is still not empty ... if new_dfg: # ... then the activity can be safely removed dfg = new_dfg del activities_count[act] if act in start_activities: del start_activities[act] if act in end_activities: del end_activities[act] graph = new_graph # at the end of the previous step, some nodes may be remaining that are not reachable from the start # or cannot reach the end. obviously the previous steps ensured that at least the activities in min_set_activities_to_keep # are connected reachable_from_start = set(nx_utils.descendants(graph, start_node)) reachable_to_end = set(nx_utils.ancestors(graph, end_node)) reachable_start_end = reachable_from_start.intersection(reachable_to_end) activities_set = set(activities_count.keys()) non_reachable_activities = activities_set.difference(reachable_start_end) # remove these non reachable activities for act in non_reachable_activities: dfg = {x: y for x, y in dfg.items() if x[0] != act and x[1] != act} del activities_count[act] if act in start_activities: del start_activities[act] if act in end_activities: del end_activities[act] return dfg, start_activities, end_activities, activities_count
def __filter_specified_paths(dfg, start_activities, end_activities, activities_count, graph, start_node, end_node, discardable_edges, activities_not_to_discard): for edge in discardable_edges: if len(dfg) > 1: new_graph = nx_utils.DiGraph(graph) # try to remove the edge new_graph.remove_edge(edge[0], edge[1]) # check whether all the activities to keep can be reached from the start and can reach the end reachable_from_start = set(nx_utils.descendants(new_graph, start_node)) reachable_to_end = set(nx_utils.ancestors(new_graph, end_node)) if activities_not_to_discard.issubset(reachable_from_start) and activities_not_to_discard.issubset( reachable_to_end): # remove the edge graph = new_graph if edge in dfg: # if the edge connects two activities simply remove that del dfg[edge] elif edge[0] == start_node: del start_activities[edge[1]] elif edge[1] == end_node: del end_activities[edge[0]] # at the end of the previous step, some nodes may be remaining that are not reachable from the start # or cannot reach the end. obviously the previous steps ensured that at least the activities in min_set_activities_to_keep # are connected reachable_from_start = set(nx_utils.descendants(graph, start_node)) reachable_to_end = set(nx_utils.ancestors(graph, end_node)) reachable_start_end = reachable_from_start.intersection(reachable_to_end) activities_set = set(activities_count.keys()) non_reachable_activities = activities_set.difference(reachable_start_end) # remove these non reachable activities for act in non_reachable_activities: dfg = {x: y for x, y in dfg.items() if x[0] != act and x[1] != act} del activities_count[act] if act in start_activities: del start_activities[act] if act in end_activities: del end_activities[act] # make sure that the DFG contains only edges between these activities dfg = {x: y for x, y in dfg.items() if x[0] in activities_count and x[1] in activities_count} return dfg, start_activities, end_activities, activities_count
[docs] def filter_dfg_on_paths_percentage(dfg0, start_activities0, end_activities0, activities_count0, percentage, keep_all_activities=False): """ Filters a DFG (complete, and so connected) on the specified percentage of paths (but ensuring that every node is still reachable from the start and to the end) Parameters ---------------- dfg0 (Complete, and so connected) DFG start_activities0 Start activities end_activities0 End activities activities_count0 Activities of the DFG along with their count percentage Percentage of paths keep_all_activities Decides if all the activities (also the ones connected by the low occurrences edges) should be kept, or only the ones appearing in the edges with more occurrences (default). Returns ---------------- dfg (Filtered) DFG start_activities (Filtered) start activities end_activities (Filtered) end activities activities_count (Filtered) activities of the DFG along with their count """ # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) start_activities = deepcopy(start_activities0) end_activities = deepcopy(end_activities0) activities_count = deepcopy(activities_count0) if len(activities_count) > 1 and len(dfg) > 1: # build a graph structure that helps in deciding whether the paths can be discarded safely graph, start_node, end_node = generate_nx_graph_from_dfg(dfg, start_activities, end_activities, activities_count) all_edges = [(x, y) for x, y in dfg.items()] + [((start_node, x), start_activities[x]) for x in start_activities] + [((x, end_node), end_activities[x]) for x in end_activities] all_edges = sorted(all_edges, key=lambda x: (x[1], x[0]), reverse=True) # calculate a set of edges that could be discarded and not non_discardable_edges = list( x[0] for x in all_edges[:math.ceil((len(all_edges) - 1) * percentage) + 1]) discardable_edges = list(x[0] for x in all_edges[math.ceil((len(all_edges) - 1) * percentage) + 1:]) discardable_edges.reverse() # according to the parameter's value, keep the activities that appears in the edges that should not be # discarded (default), OR keep all the activities, trying to remove edges but ensure connectiveness of # everything if keep_all_activities: activities_not_to_discard = set(x[0] for x in dfg).union(set(x[1] for x in dfg)).union( set(start_activities)).union(set(end_activities)).union(set(activities_count)) else: activities_not_to_discard = set(x[0] for x in non_discardable_edges if not x[0] == start_node).union( set(x[1] for x in non_discardable_edges if not x[1] == end_node)) dfg, start_activities, end_activities, activities_count = __filter_specified_paths(dfg, start_activities, end_activities, activities_count, graph, start_node, end_node, discardable_edges, activities_not_to_discard) return dfg, start_activities, end_activities, activities_count
[docs] def filter_dfg_keep_connected(dfg0, start_activities0, end_activities0, activities_count0, threshold, keep_all_activities=False): """ Filters a DFG (complete, and so connected) on the specified dependency threshold (Heuristics Miner dependency) (but ensuring that every node is still reachable from the start and to the end) Parameters ---------------- dfg0 (Complete, and so connected) DFG start_activities0 Start activities end_activities0 End activities activities_count0 Activities of the DFG along with their count threshold Dependency threshold as in the Heuristics Miner keep_all_activities Decides if all the activities should be kept, or only the ones appearing in the edges with higher threshold (default). Returns ---------------- dfg (Filtered) DFG start_activities (Filtered) start activities end_activities (Filtered) end activities activities_count (Filtered) activities of the DFG along with their count """ # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) start_activities = deepcopy(start_activities0) end_activities = deepcopy(end_activities0) activities_count = deepcopy(activities_count0) if len(activities_count) > 1 and len(dfg) > 1: # build a graph structure that helps in deciding whether the paths can be discarded safely graph, start_node, end_node = generate_nx_graph_from_dfg(dfg, start_activities, end_activities, activities_count) dependency = {} for el in dfg: elinv = (el[1], el[0]) if elinv in dfg: dep = (dfg[el] - dfg[elinv]) / (dfg[el] + dfg[elinv] + 1) else: dep = dfg[el] / (dfg[el] + 1) dependency[el] = dep all_edges = [(x, y) for x, y in dependency.items()] + [((start_node, x), 1.0) for x in start_activities] + [((x, end_node), 1.0) for x in end_activities] all_edges = sorted(all_edges, key=lambda x: (x[1], x[0]), reverse=True) # calculate a set of edges that could be discarded and not non_discardable_edges = list(x[0] for x in all_edges if x[1] >= threshold) discardable_edges = list(x[0] for x in all_edges if x[1] < threshold) discardable_edges.reverse() # according to the parameter's value, keep the activities that appears in the edges that should not be # discarded (default), OR keep all the activities, trying to remove edges but ensure connectiveness of # everything if keep_all_activities: activities_not_to_discard = set(x[0] for x in dfg).union(set(x[1] for x in dfg)).union( set(start_activities)).union(set(end_activities)).union(set(activities_count)) else: activities_not_to_discard = set(x[0] for x in non_discardable_edges if not x[0] == start_node).union( set(x[1] for x in non_discardable_edges if not x[1] == end_node)) dfg, start_activities, end_activities, activities_count = __filter_specified_paths(dfg, start_activities, end_activities, activities_count, graph, start_node, end_node, discardable_edges, activities_not_to_discard) return dfg, start_activities, end_activities, activities_count
[docs] def filter_dfg_to_activity(dfg0, start_activities0, end_activities0, activities_count0, target_activity, parameters=None): """ Filters the DFG, making "target_activity" the only possible end activity of the graph Parameters --------------- dfg0 Directly-follows graph start_activities0 Start activities end_activities0 End activities activities_count0 Activities count target_activity Target activity (only possible end activity after the filtering) parameters Parameters Returns --------------- dfg Filtered DFG start_activities Filtered start activities end_activities Filtered end activities activities_count Filtered activities count """ if parameters is None: parameters = {} # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) start_activities = deepcopy(start_activities0) activities_count = deepcopy(activities_count0) dfg = {x: y for x, y in dfg.items() if x[0] != target_activity} end_activities = {target_activity: activities_count[target_activity]} changed = True while changed: changed = False predecessors = dfg_utils.get_predecessors(dfg, activities_count) successors = dfg_utils.get_successors(dfg, activities_count) successors_from_sa = set() for act in start_activities: successors_from_sa = successors_from_sa.union(successors[act]) successors_from_sa.add(act) reachable_nodes = successors_from_sa.intersection(predecessors[target_activity]).union({target_activity}) if reachable_nodes != set(activities_count.keys()): changed = True activities_count = {x: y for x, y in activities_count.items() if x in reachable_nodes} start_activities = {x: y for x, y in start_activities.items() if x in reachable_nodes} dfg = {x: y for x, y in dfg.items() if x[0] in reachable_nodes and x[1] in reachable_nodes} return dfg, start_activities, end_activities, activities_count
[docs] def filter_dfg_from_activity(dfg0, start_activities0, end_activities0, activities_count0, source_activity, parameters=None): """ Filters the DFG, making "source_activity" the only possible source activity of the graph Parameters --------------- dfg0 Directly-follows graph start_activities0 Start activities end_activities0 End activities activities_count0 Activities count source_activity Source activity (only possible start activity after the filtering) parameters Parameters Returns --------------- dfg Filtered DFG start_activities Filtered start activities end_activities Filtered end activities activities_count Filtered activities count """ if parameters is None: parameters = {} # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) end_activities = deepcopy(end_activities0) activities_count = deepcopy(activities_count0) dfg = {x: y for x, y in dfg.items() if x[1] != source_activity} start_activities = {source_activity: activities_count[source_activity]} changed = True while changed: changed = False predecessors = dfg_utils.get_predecessors(dfg, activities_count) successors = dfg_utils.get_successors(dfg, activities_count) predecessors_from_ea = set() for ea in end_activities: predecessors_from_ea = predecessors_from_ea.union(predecessors[ea]) predecessors_from_ea.add(ea) reachable_nodes = predecessors_from_ea.intersection(successors[source_activity]).union({source_activity}) if reachable_nodes != set(activities_count.keys()): changed = True activities_count = {x: y for x, y in activities_count.items() if x in reachable_nodes} end_activities = {x: y for x, y in end_activities.items() if x in reachable_nodes} dfg = {x: y for x, y in dfg.items() if x[0] in reachable_nodes and x[1] in reachable_nodes} return dfg, start_activities, end_activities, activities_count
[docs] def filter_dfg_contain_activity(dfg0, start_activities0, end_activities0, activities_count0, activity, parameters=None): """ Filters the DFG keeping only nodes that can reach / are reachable from activity Parameters --------------- dfg0 Directly-follows graph start_activities0 Start activities end_activities0 End activities activities_count0 Activities count activity Activity that should be reachable / should reach all the nodes of the filtered graph parameters Parameters Returns --------------- dfg Filtered DFG start_activities Filtered start activities end_activities Filtered end activities activities_count Filtered activities count """ if parameters is None: parameters = {} # since the dictionaries/sets are modified, a deepcopy is the best option to ensure data integrity dfg = deepcopy(dfg0) start_activities = deepcopy(start_activities0) end_activities = deepcopy(end_activities0) activities_count = deepcopy(activities_count0) changed = True while changed: changed = False predecessors = dfg_utils.get_predecessors(dfg, activities_count) successors = dfg_utils.get_successors(dfg, activities_count) predecessors_act = predecessors[activity].union({activity}) successors_act = successors[activity].union({activity}) start_activities1 = {x: y for x, y in start_activities.items() if x in predecessors_act} end_activities1 = {x: y for x, y in end_activities.items() if x in successors_act} if start_activities != start_activities1 or end_activities != end_activities1: changed = True start_activities = start_activities1 end_activities = end_activities1 reachable_nodes = predecessors_act.union(successors_act) if reachable_nodes != set(activities_count.keys()): changed = True activities_count = {x: y for x, y in activities_count.items() if x in reachable_nodes} dfg = {x: y for x, y in dfg.items() if x[0] in reachable_nodes and x[1] in reachable_nodes} return dfg, start_activities, end_activities, activities_count
[docs] def clean_dfg_based_on_noise_thresh(dfg, activities, noise_threshold, parameters=None): """ Clean Directly-Follows graph based on noise threshold Parameters ---------- dfg Directly-Follows graph activities Activities in the DFG graph noise_threshold Noise threshold Returns ---------- newDfg Cleaned dfg based on noise threshold """ if parameters is None: parameters = {} most_common_paths = parameters[ constants.PARAM_MOST_COMMON_PATHS] if constants.PARAM_MOST_COMMON_PATHS in parameters else None if most_common_paths is None: most_common_paths = [] new_dfg = None activ_max_count = {} for act in activities: activ_max_count[act] = dfg_utils.get_max_activity_count(dfg, act) for el in dfg: if type(el[0]) is str: if new_dfg is None: new_dfg = {} act1 = el[0] act2 = el[1] val = dfg[el] else: if new_dfg is None: new_dfg = [] act1 = el[0][0] act2 = el[0][1] val = el[1] if not el in most_common_paths and val < min(activ_max_count[act1] * noise_threshold, activ_max_count[act2] * noise_threshold): pass else: if type(el[0]) is str: new_dfg[el] = dfg[el] pass else: new_dfg.append(el) pass if new_dfg is None: return dfg return new_dfg