Source code for pm4py.util.nx_utils

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import networkx as nx
from enum import Enum
from typing import Optional, Dict, Any
from pm4py.util import exec_utils, constants
import importlib.util
from copy import copy


[docs] class Parameters(Enum): SHOW_PROGRESS_BAR = "show_progress_bar"
[docs] def get_default_nx_environment(): return nx
DEFAULT_NX_ENVIRONMENT = get_default_nx_environment()
[docs] def Graph(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.Graph(*args, **kwargs)
[docs] def DiGraph(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.DiGraph(*args, **kwargs)
[docs] def MultiGraph(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.MultiGraph(*args, **kwargs)
[docs] def MultiDiGraph(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.MultiDiGraph(*args, **kwargs)
[docs] def ancestors(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.ancestors(*args, **kwargs)
[docs] def descendants(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.descendants(*args, **kwargs)
[docs] def connected_components(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.connected_components(*args, **kwargs)
[docs] def bfs_tree(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.bfs_tree(*args, **kwargs)
[docs] def contracted_nodes(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.contracted_nodes(*args, **kwargs)
[docs] def shortest_path(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.shortest_path(*args, **kwargs)
[docs] def strongly_connected_components(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.strongly_connected_components( *args, **kwargs )
[docs] def has_path(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.has_path(*args, **kwargs)
[docs] def is_strongly_connected(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.is_strongly_connected(*args, **kwargs)
[docs] def all_pairs_shortest_path(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.all_pairs_shortest_path(*args, **kwargs)
[docs] def all_pairs_dijkstra(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.all_pairs_dijkstra(*args, **kwargs)
[docs] def find_cliques(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.find_cliques(*args, **kwargs)
[docs] def degree_centrality(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.degree_centrality(*args, **kwargs)
[docs] def greedy_modularity_communities(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.algorithms.community.greedy_modularity_communities( *args, **kwargs)
[docs] def maximum_flow_value(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.maximum_flow_value(*args, **kwargs)
[docs] def minimum_weight_full_matching(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.bipartite.minimum_weight_full_matching( *args, **kwargs )
[docs] def Edmonds(*args, **kwargs): return DEFAULT_NX_ENVIRONMENT.algorithms.tree.Edmonds(*args, **kwargs)
def __format_attrs(attributes0: Dict[str, Any]) -> Dict[str, Any]: """ Internal method to format properties. """ attributes = {} keys = list(attributes0.keys()) for k0 in keys: v = attributes0[k0] t = str(type(v)).lower() k = k0 if "time" in t: v = v.isoformat() attributes[k] = v elif "float" in t or "double" in t: attributes[k] = v elif "str" in t: attributes[k] = v else: attributes[k] = str(v) return attributes
[docs] def neo4j_upload( nx_graph: nx.DiGraph, session, clean_db: bool = True, parameters: Optional[Dict[Any, Any]] = None, ): """ Uploads a NetworkX DiGraph obtained from a traditional/object-centric event log to a Neo4J session Parameters --------------- nx_graph NetworkX graph session Neo4J session clean_db Cleans the database before uploading parameters Other optional parameters of the method, including: - Parameters.SHOW_PROGRESS_BAR => shows the percentage of nodes/edges uploaded to Neo4J """ if parameters is None: parameters = {} show_progress_bar = exec_utils.get_param_value( Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR ) if clean_db: session.run("MATCH (n) DETACH DELETE n") nodes = list(nx_graph.nodes) nodes_progress = None edges = list(nx_graph.edges) edges_progress = None if importlib.util.find_spec("tqdm") and show_progress_bar: from tqdm.auto import tqdm nodes_progress = tqdm( total=len(nodes), desc="uploading nodes, completed :: " ) for node_id in nodes: node_attrs = __format_attrs(nx_graph.nodes[node_id]["attr"]) node_type = node_attrs["type"] command = ( "CREATE (n:" + node_type + " {id: $id})\nSET n += $properties" ) session.run(command, id=node_id, properties=node_attrs) if nodes_progress is not None: nodes_progress.update() if nodes_progress is not None: nodes_progress.close() if importlib.util.find_spec("tqdm") and show_progress_bar: from tqdm.auto import tqdm edges_progress = tqdm( total=len(edges), desc="uploading edges, completed :: " ) for edge_id in edges: edge_attr = __format_attrs(nx_graph.edges[edge_id]["attr"]) edge_type = edge_attr["type"] command = ( "MATCH (a {id: $id1}), (b {id: $id2})\nCREATE (a)-[r:" + edge_type + " $props]->(b)" ) session.run( command, id1=edge_id[0], id2=edge_id[1], props=edge_attr, edge_type=edge_type, ) if edges_progress is not None: edges_progress.update() if edges_progress is not None: edges_progress.close()
[docs] def neo4j_download( session, parameters: Optional[Dict[Any, Any]] = None ) -> nx.DiGraph: """ Downloads a NetworkX DiGraph starting from a Neo4J database. Parameters -------------- session Neo4J session parameters Optional parameters of the method. Returns -------------- nx_graph NetworkX DiGraph """ if parameters is None: parameters = {} from pm4py.util import dt_parsing date_parser = dt_parsing.parser.get() nodes = session.run("MATCH (n) RETURN n") nodes = [dict(node["n"]) for node in nodes] edges = session.run("MATCH (n)-[r]->(m) RETURN n, r, m") edges = [ (edge["n"]["id"], edge["m"]["id"], dict(edge["r"])) for edge in edges ] nx_graph = DiGraph() for n in nodes: node_id = n["id"] node_props = copy(n) del node_props["id"] for k in ["ocel:timestamp", "time:timestamp"]: if k in node_props: node_props[k] = date_parser.apply(node_props[k]) nx_graph.add_node(node_id, attr=node_props) for e in edges: nx_graph.add_edge(e[0], e[1], attr=e[2]) return nx_graph
[docs] def nx_to_ocel( nx_graph: nx.DiGraph, parameters: Optional[Dict[Any, Any]] = None ): """ Transforms a NetworkX DiGraph representing an OCEL to a proper OCEL. Parameters ---------------- nx_graph NetworkX DiGraph parameters Optional parameters of the method Returns ---------------- ocel Object-centric event log """ if parameters is None: parameters = {} from pm4py.util import pandas_utils events = [] objects = [] relations = [] o2o = [] object_changes = [] events_activity = {} events_timestamp = {} object_types = {} for node_id in nx_graph.nodes: node_attrs = nx_graph.nodes[node_id]["attr"] node_type = node_attrs["type"] if node_type == "EVENT": activity = node_attrs["ocel:activity"] timestamp = node_attrs["ocel:timestamp"] events_activity[node_id] = activity events_timestamp[node_id] = timestamp events.append(node_attrs) elif node_type == "OBJECT": object_type = node_attrs["ocel:type"] object_types[node_id] = object_type objects.append(node_attrs) elif node_type == "CHANGE": object_changes.append(node_attrs) for edge_id in nx_graph.edges: source = edge_id[0] target = edge_id[1] edge_attrs = nx_graph.edges[edge_id]["attr"] edge_type = edge_attrs["type"] qualifier = ( edge_attrs["qualifier"] if "qualifier" in edge_attrs else "" ) if edge_type == "E2O": activity = events_activity[source] timestamp = events_timestamp[source] object_type = object_types[target] relations.append( { "ocel:eid": source, "ocel:oid": target, "ocel:activity": activity, "ocel:timestamp": timestamp, "ocel:type": object_type, "ocel:qualifier": qualifier, } ) elif edge_type == "O2O": o2o.append( { "ocel:oid": source, "ocel:oid_2": target, "ocel:qualifier": qualifier, } ) events = pandas_utils.instantiate_dataframe(events) objects = pandas_utils.instantiate_dataframe(objects) relations = pandas_utils.instantiate_dataframe(relations) o2o = pandas_utils.instantiate_dataframe(o2o) if o2o else None object_changes = ( pandas_utils.instantiate_dataframe(object_changes) if object_changes else None ) internal_index = "@@index" events = pandas_utils.insert_index( events, internal_index, reset_index=False, copy_dataframe=False ) relations = pandas_utils.insert_index( relations, internal_index, reset_index=False, copy_dataframe=False ) events = events.sort_values(["ocel:timestamp", internal_index]) relations = relations.sort_values(["ocel:timestamp", internal_index]) del events[internal_index] del relations[internal_index] del events["type"] del objects["type"] if object_changes is not None: del object_changes["type"] from pm4py.objects.ocel.obj import OCEL return OCEL( events, objects, relations, o2o=o2o, object_changes=object_changes )
[docs] def nx_to_event_log( nx_graph: nx.DiGraph, parameters: Optional[Dict[Any, Any]] = None ): """ Transforms a NetworkX DiGraph representing a traditional event log to a proper event log. Parameters ---------------- nx_graph NetworkX DiGraph parameters Optional parameters of the method Returns ---------------- event_log Traditional event log. """ if parameters is None: parameters = {} from pm4py.objects.log.obj import EventLog, Trace, Event from pm4py.objects.log.util import sorting log = EventLog() case_nodes = [ (k, v["attr"]) for k, v in nx_graph.nodes.items() if v["attr"]["type"] == "CASE" ] event_nodes = [ (k, v["attr"]) for k, v in nx_graph.nodes.items() if v["attr"]["type"] == "EVENT" ] cases = {} for i in range(len(case_nodes)): case_attrs = copy(case_nodes[i][1]) del case_attrs["type"] trace = Trace(attributes=case_attrs) cases[case_nodes[i][0]] = trace log.append(trace) events = {} for i in range(len(event_nodes)): event_attrs = copy(event_nodes[i][1]) del event_attrs["type"] events[event_nodes[i][0]] = Event(event_attrs) for edge_id, edge_attrs in nx_graph.edges.items(): edge_attrs = edge_attrs["attr"] edge_type = edge_attrs["type"] if edge_type == "BELONGS_TO": cases[edge_id[1]].append(events[edge_id[0]]) log = sorting.sort_timestamp(log, "time:timestamp") return log