'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from enum import Enum
from pm4py.objects.log.obj import EventLog, EventStream
import pandas as pd
from typing import Optional, Dict, Any, Union
from pm4py.util import exec_utils, nx_utils
from pm4py.objects.conversion.log.variants import to_event_log
[docs]
class Parameters(Enum):
INCLUDE_DF = "include_df"
CASE_ID_ATTRIBUTE = "case_id_attribute"
OTHER_CASE_ATTRIBUTES_AS_NODES = "other_case_attributes_as_nodes"
EVENT_ATTRIBUTES_AS_NODES = "event_attributes_as_nodes"
[docs]
def apply(
log_obj: Union[EventLog, EventStream, pd.DataFrame],
parameters: Optional[Dict[Any, Any]] = None,
):
"""
Converts an event log object to a NetworkX DiGraph object.
The nodes of the graph are the events, the cases (and possibly the attributes of the log).
The edges are:
- Connecting each event to the corresponding case (BELONGS_TO type)
- Connecting every event to the directly-following one (DF type, if enabled)
- Connecting every case/event to the given attribute values (ATTRIBUTE_EDGE type)
Parameters
----------------
log_obj
Log object (EventLog, EventStream, Pandas dataframe)
parameters
Parameters of the conversion, including:
- Parameters.INCLUDE_DF => include the directly-follows graph relation in the graph
- Parameters.CASE_ID_ATTRIBUTE => specify which attribute at the case level should be considered the case ID
- Parameters.OTHER_CASE_ATTRIBUTES_AS_NODES => specify which attributes at the case level should be inserted in the graph as nodes (other than the caseID) (list, default empty)
- Parameters.EVENT_ATTRIBUTES_AS_NODES => specify which attributes at the event level should be inserted in the graph as nodes (list, default empty)
Returns
----------------
nx_digraph
NetworkX DiGraph object
"""
if parameters is None:
parameters = {}
include_df = exec_utils.get_param_value(
Parameters.INCLUDE_DF, parameters, True
)
case_id_attribute = exec_utils.get_param_value(
Parameters.CASE_ID_ATTRIBUTE, parameters, "concept:name"
)
other_case_attributes_as_nodes = exec_utils.get_param_value(
Parameters.OTHER_CASE_ATTRIBUTES_AS_NODES, parameters, None
)
event_attributes_as_nodes = exec_utils.get_param_value(
Parameters.EVENT_ATTRIBUTES_AS_NODES, parameters, None
)
parameters["stream_postprocessing"] = True
log_obj = to_event_log.apply(log_obj, parameters=parameters)
if event_attributes_as_nodes is None:
event_attributes_as_nodes = []
if other_case_attributes_as_nodes is None:
other_case_attributes_as_nodes = []
nx_digraph = nx_utils.DiGraph()
for case in log_obj:
case_id = "CASE=" + str(case.attributes[case_id_attribute])
dct_case = {"type": "CASE"}
for att in case.attributes:
dct_case[att] = case.attributes[att]
nx_digraph.add_node(case_id, attr=dct_case)
for index, event in enumerate(case):
dct_ev = {"type": "EVENT"}
for att in event:
dct_ev[att] = event[att]
ev_id = (
"EVENT="
+ str(case.attributes[case_id_attribute])
+ "_"
+ str(index)
)
nx_digraph.add_node(ev_id, attr=dct_ev)
nx_digraph.add_edge(ev_id, case_id, attr={"type": "BELONGS_TO"})
for ev_att in event_attributes_as_nodes:
if ev_att in event:
node_id = event[ev_att]
if node_id not in nx_digraph.nodes:
nx_digraph.add_node(
node_id, attr={"type": "ATTRIBUTE_NODE"}
)
nx_digraph.add_edge(
ev_id,
node_id,
attr={"type": "ATTRIBUTE_EDGE", "name": ev_att},
)
if include_df:
for index in range(len(case) - 1):
curr_ev = (
"EVENT="
+ str(case.attributes[case_id_attribute])
+ "_"
+ str(index)
)
next_ev = (
"EVENT="
+ str(case.attributes[case_id_attribute])
+ "_"
+ str(index + 1)
)
nx_digraph.add_edge(curr_ev, next_ev, attr={"type": "DF"})
for case_att in other_case_attributes_as_nodes:
if case_att in case.attributes:
node_id = case.attributes[case_att]
if node_id not in nx_digraph.nodes:
nx_digraph.add_node(
node_id, attr={"type": "ATTRIBUTE_NODE"}
)
nx_digraph.add_edge(
case_id,
node_id,
attr={"type": "ATTRIBUTE_EDGE", "name": case_att},
)
return nx_digraph