'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
__doc__ = """
The ``pm4py.convert`` module contains the cross-conversions implemented in ``pm4py``
"""
from typing import Union, Tuple, Optional, Collection, List, Any, Dict
import pandas as pd
from copy import deepcopy
import pm4py
from pm4py.objects.bpmn.obj import BPMN
from pm4py.objects.ocel.obj import OCEL
from pm4py.objects.powl.obj import POWL
from pm4py.objects.heuristics_net.obj import HeuristicsNet
from pm4py.objects.log.obj import EventLog, EventStream
from pm4py.objects.petri_net.obj import Marking
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.objects.petri_net.obj import PetriNet
from pm4py.util import constants, nx_utils
from pm4py.utils import get_properties, __event_log_deprecation_warning
from pm4py.objects.transition_system.obj import TransitionSystem
from pm4py.util.pandas_utils import (
check_is_pandas_dataframe,
check_pandas_dataframe_columns,
)
import networkx as nx
[docs]
def convert_to_event_log(
obj: Union[pd.DataFrame, EventStream],
case_id_key: str = "case:concept:name",
**kwargs,
) -> EventLog:
"""
Converts a DataFrame or EventStream object to an event log object.
:param obj: The DataFrame or EventStream object to convert.
:param case_id_key: The attribute to be used as the case identifier. Defaults to "case:concept:name".
:param kwargs: Additional keyword arguments to pass to the converter.
:return: An ``EventLog`` object.
.. code-block:: python3
import pandas as pd
import pm4py
dataframe = pm4py.read_csv("tests/input_data/running-example.csv")
dataframe = pm4py.format_dataframe(dataframe, case_id_column='case:concept:name', activity_column='concept:name', timestamp_column='time:timestamp')
log = pm4py.convert_to_event_log(dataframe)
"""
if check_is_pandas_dataframe(obj):
check_pandas_dataframe_columns(obj, case_id_key=case_id_key)
parameters = get_properties(obj, case_id_key=case_id_key)
for k, v in kwargs.items():
parameters[k] = v
from pm4py.objects.conversion.log import converter
log = converter.apply(
obj, variant=converter.Variants.TO_EVENT_LOG, parameters=parameters
)
__event_log_deprecation_warning(log)
return log
[docs]
def convert_to_event_stream(
obj: Union[EventLog, pd.DataFrame],
case_id_key: str = "case:concept:name",
**kwargs,
) -> EventStream:
"""
Converts a log object or DataFrame to an event stream.
:param obj: The log object (``EventLog``) or DataFrame to convert.
:param case_id_key: The attribute to be used as the case identifier. Defaults to "case:concept:name".
:param kwargs: Additional keyword arguments to pass to the converter.
:return: An ``EventStream`` object.
.. code-block:: python3
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
event_stream = pm4py.convert_to_event_stream(log)
"""
if check_is_pandas_dataframe(obj):
check_pandas_dataframe_columns(obj, case_id_key=case_id_key)
parameters = get_properties(obj, case_id_key=case_id_key)
for k, v in kwargs.items():
parameters[k] = v
from pm4py.objects.conversion.log import converter
stream = converter.apply(
obj, variant=converter.Variants.TO_EVENT_STREAM, parameters=parameters
)
__event_log_deprecation_warning(stream)
return stream
[docs]
def convert_to_dataframe(
obj: Union[EventStream, EventLog], **kwargs
) -> pd.DataFrame:
"""
Converts a log object (``EventStream`` or ``EventLog``) to a Pandas DataFrame.
:param obj: The log object to convert.
:param kwargs: Additional keyword arguments to pass to the converter.
:return: A ``pd.DataFrame`` object.
.. code-block:: python3
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
dataframe = pm4py.convert_to_dataframe(log)
"""
if check_is_pandas_dataframe(obj):
check_pandas_dataframe_columns(obj)
parameters = get_properties(obj)
for k, v in kwargs.items():
parameters[k] = v
from pm4py.objects.conversion.log import converter
df = converter.apply(
obj, variant=converter.Variants.TO_DATA_FRAME, parameters=parameters
)
return df
[docs]
def convert_to_bpmn(
*args: Union[Tuple[PetriNet, Marking, Marking], ProcessTree]
) -> BPMN:
"""
Converts an object to a BPMN diagram.
As input, either a Petri net (with corresponding initial and final markings) or a process tree can be provided.
A process tree can always be converted into a BPMN model, ensuring the quality of the resulting object.
For Petri nets, the quality of the conversion largely depends on the net provided (e.g., sound WF-nets are likely to produce reasonable BPMN models).
:param args:
- If converting a Petri net: a tuple of (``PetriNet``, ``Marking``, ``Marking``).
- If converting a process tree: a single ``ProcessTree`` object.
:return: A ``BPMN`` object.
.. code-block:: python3
import pm4py
# Import a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
bpmn_graph = pm4py.convert_to_bpmn(net, im, fm)
"""
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.objects.bpmn.obj import BPMN
if isinstance(args[0], BPMN):
# the object is already a BPMN
return args[0]
elif isinstance(args[0], ProcessTree):
from pm4py.objects.conversion.process_tree.variants import to_bpmn
return to_bpmn.apply(args[0])
else:
# try to convert the object to a Petri net. Then, use the PM4Py PN-to-BPMN converter
# to get the BPMN object
try:
net, im, fm = convert_to_petri_net(*args)
from pm4py.objects.conversion.wf_net.variants import to_bpmn
return to_bpmn.apply(net, im, fm)
except BaseException:
# don't do nothing and throw the following exception
pass
# if no conversion is done, then the format of the arguments is unsupported
raise Exception("Unsupported conversion of the provided object to BPMN")
[docs]
def convert_to_petri_net(
*args: Union[BPMN, ProcessTree, HeuristicsNet, POWL, dict]
) -> Tuple[PetriNet, Marking, Marking]:
"""
Converts an input model to an (accepting) Petri net.
The input objects can be a process tree, BPMN model, Heuristic net, POWL model, or a dictionary representing a Directly-Follows Graph (DFG).
The output is a tuple containing the Petri net and the initial and final markings.
The markings are only returned if they can be reasonably derived from the input model.
:param args:
- If converting from a BPMN, ProcessTree, HeuristicsNet, or POWL: a single object of the respective type.
- If converting from a DFG: a dictionary representing the DFG, followed by lists of start and end activities.
:return: A tuple of (``PetriNet``, ``Marking``, ``Marking``).
.. code-block:: python3
import pm4py
# Imports a process tree from a PTML file
process_tree = pm4py.read_ptml("tests/input_data/running-example.ptml")
net, im, fm = pm4py.convert_to_petri_net(process_tree)
"""
if isinstance(args[0], PetriNet):
# the object is already a Petri net
return args[0], args[1], args[2]
elif isinstance(args[0], ProcessTree):
if isinstance(args[0], POWL):
from pm4py.objects.conversion.powl import converter
return converter.apply(args[0])
from pm4py.objects.conversion.process_tree.variants import to_petri_net
return to_petri_net.apply(args[0])
elif isinstance(args[0], BPMN):
from pm4py.objects.conversion.bpmn.variants import to_petri_net
return to_petri_net.apply(args[0])
elif isinstance(args[0], HeuristicsNet):
from pm4py.objects.conversion.heuristics_net.variants import (
to_petri_net,
)
return to_petri_net.apply(args[0])
elif isinstance(args[0], dict):
# DFG
from pm4py.objects.conversion.dfg.variants import (
to_petri_net_activity_defines_place,
)
return to_petri_net_activity_defines_place.apply(
args[0],
parameters={
to_petri_net_activity_defines_place.Parameters.START_ACTIVITIES: args[1],
to_petri_net_activity_defines_place.Parameters.END_ACTIVITIES: args[2],
},
)
# if no conversion is done, then the format of the arguments is unsupported
raise Exception(
"Unsupported conversion of the provided object to Petri net"
)
[docs]
def convert_to_process_tree(
*args: Union[Tuple[PetriNet, Marking, Marking], BPMN, ProcessTree, POWL]
) -> ProcessTree:
"""
Converts an input model to a process tree.
The input models can be Petri nets (with markings) or BPMN models.
For both input types, the conversion is not guaranteed to work and may raise an exception.
:param args:
- If converting from a Petri net: a tuple of (``PetriNet``, ``Marking``, ``Marking``).
- If converting from a BPMN or ProcessTree: a single object of the respective type.
:return: A ``ProcessTree`` object.
.. code-block:: python3
import pm4py
# Imports a BPMN file
bpmn_graph = pm4py.read_bpmn("tests/input_data/running-example.bpmn")
# Converts the BPMN to a process tree (through intermediate conversion to a Petri net)
process_tree = pm4py.convert_to_process_tree(bpmn_graph)
"""
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.objects.petri_net.obj import PetriNet
if isinstance(args[0], POWL):
from pm4py.objects.conversion.powl.variants import to_process_tree
return to_process_tree.apply(args[0])
elif isinstance(args[0], ProcessTree):
# the object is already a process tree
return args[0]
if isinstance(args[0], PetriNet):
net, im, fm = args[0], args[1], args[2]
else:
net, im, fm = convert_to_petri_net(*args)
from pm4py.objects.conversion.wf_net.variants import to_process_tree
tree = to_process_tree.apply(net, im, fm)
if tree is not None:
return tree
raise Exception(
"The object represents a model that cannot be represented as a process tree!"
)
[docs]
def convert_to_powl(*args: Union[Tuple[PetriNet, Marking, Marking], BPMN, ProcessTree]) -> POWL:
"""
Converts an input model to a POWL model.
The input models can be Petri nets (with markings) or BPMN models or process trees.
For both input types, the conversion is not guaranteed to work and may raise an exception.
:param args:
- If converting from a Petri net: a tuple of (``PetriNet``, ``Marking``, ``Marking``).
- If converting from a BPMN or ProcessTree: a single object of the respective type.
:return: A ``ProcessTree`` object.
.. code-block:: python3
import pm4py
# Imports a BPMN file
bpmn_graph = pm4py.read_bpmn("tests/input_data/running-example.bpmn")
# Converts the BPMN to a POWL (through intermediate conversion to a Petri net)
powl = pm4py.convert_to_powl(bpmn_graph)
print(powl)
"""
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.objects.petri_net.obj import PetriNet
if isinstance(args[0], ProcessTree):
from pm4py.objects.conversion.process_tree.variants import to_powl
return to_powl.apply(args[0])
elif isinstance(args[0], PetriNet):
from pm4py.objects.conversion.wf_net.variants import to_powl
return to_powl.apply(args[0])
elif isinstance(args[0], BPMN):
from pm4py.objects.conversion.wf_net.variants import to_powl
net, im, fm = pm4py.convert_to_petri_net(args[0])
return to_powl.apply(net)
raise Exception(
"The object represents a model that cannot be directly represented as a POWL!"
)
[docs]
def convert_to_reachability_graph(
*args: Union[Tuple[PetriNet, Marking, Marking], BPMN, ProcessTree]
) -> TransitionSystem:
"""
Converts an input model to a reachability graph (transition system).
The input models can be Petri nets (with markings), BPMN models, or process trees.
The output is the state-space of the model, encoded as a ``TransitionSystem`` object.
:param args:
- If converting from a Petri net: a tuple of (``PetriNet``, ``Marking``, ``Marking``).
- If converting from a BPMN or ProcessTree: a single object of the respective type.
:return: A ``TransitionSystem`` object.
.. code-block:: python3
import pm4py
# Reads a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
# Converts it to a reachability graph
reach_graph = pm4py.convert_to_reachability_graph(net, im, fm)
"""
if isinstance(args[0], PetriNet):
net, im, fm = args[0], args[1], args[2]
else:
net, im, fm = convert_to_petri_net(*args)
from pm4py.objects.petri_net.utils import reachability_graph
return reachability_graph.construct_reachability_graph(net, im)
[docs]
def convert_log_to_ocel(
log: Union[EventLog, EventStream, pd.DataFrame],
activity_column: str = "concept:name",
timestamp_column: str = "time:timestamp",
object_types: Optional[Collection[str]] = None,
obj_separator: str = " AND ",
additional_event_attributes: Optional[Collection[str]] = None,
additional_object_attributes: Optional[Dict[str, Collection[str]]] = None,
) -> OCEL:
"""
Converts an event log to an object-centric event log (OCEL) with one or more object types.
:param log: The log object to convert.
:param activity_column: The name of the column representing activities.
:param timestamp_column: The name of the column representing timestamps.
:param object_types: A collection of column names to consider as object types. If None, defaults are used.
:param obj_separator: The separator used between different objects in the same column. Defaults to " AND ".
:param additional_event_attributes: Additional attribute names to include as event attributes in the OCEL.
:param additional_object_attributes: Additional attributes per object type to include as object attributes in the OCEL. Should be a dictionary mapping object types to lists of attribute names.
:return: An ``OCEL`` object.
.. code-block:: python3
import pm4py
ocel = pm4py.convert_log_to_ocel(
log,
activity_column='concept:name',
timestamp_column='time:timestamp',
object_types=['case:concept:name']
)
"""
__event_log_deprecation_warning(log)
if isinstance(log, EventStream):
log = convert_to_dataframe(log)
if object_types is None:
object_types = list(
set(
x
for x in log.columns
if x == "case:concept:name" or x.startswith("ocel:type")
)
)
from pm4py.objects.ocel.util import log_ocel
return log_ocel.log_to_ocel_multiple_obj_types(
log,
activity_column,
timestamp_column,
object_types,
obj_separator,
additional_event_attributes=additional_event_attributes,
additional_object_attributes=additional_object_attributes,
)
[docs]
def convert_ocel_to_networkx(
ocel: OCEL, variant: str = "ocel_to_nx"
) -> nx.DiGraph:
"""
Converts an OCEL to a NetworkX DiGraph object.
:param ocel: The object-centric event log to convert.
:param variant: The variant of the conversion to use.
Options:
- "ocel_to_nx": Graph containing event and object IDs and two types of relations (REL=related objects, DF=directly-follows).
- "ocel_features_to_nx": Graph containing different types of interconnections at the object level.
:return: A ``nx.DiGraph`` object representing the OCEL.
.. code-block:: python3
import pm4py
nx_digraph = pm4py.convert_ocel_to_networkx(ocel, variant='ocel_to_nx')
"""
from pm4py.objects.conversion.ocel import converter
variant1 = None
if variant == "ocel_to_nx":
variant1 = converter.Variants.OCEL_TO_NX
elif variant == "ocel_features_to_nx":
variant1 = converter.Variants.OCEL_FEATURES_TO_NX
else:
raise ValueError(
f"Unsupported variant '{variant}'. Supported variants are 'ocel_to_nx' and 'ocel_features_to_nx'.")
return converter.apply(ocel, variant=variant1)
[docs]
def convert_log_to_networkx(
log: Union[EventLog, EventStream, pd.DataFrame],
include_df: bool = True,
case_id_key: str = "concept:name",
other_case_attributes_as_nodes: Optional[Collection[str]] = None,
event_attributes_as_nodes: Optional[Collection[str]] = None,
) -> nx.DiGraph:
"""
Converts an event log to a NetworkX DiGraph object.
The nodes of the graph include events, cases, and optionally log attributes.
The edges represent:
- BELONGS_TO: Connecting each event to its corresponding case.
- DF: Connecting events that directly follow each other (if enabled).
- ATTRIBUTE_EDGE: Connecting cases/events to their attribute values.
:param log: The log object to convert (``EventLog``, ``EventStream``, or Pandas DataFrame).
:param include_df: Whether to include the directly-follows relation in the graph. Defaults to True.
:param case_id_key: The attribute to be used as the case identifier. Defaults to "concept:name".
:param other_case_attributes_as_nodes: Attributes at the case level to include as nodes, excluding the case ID.
:param event_attributes_as_nodes: Attributes at the event level to include as nodes.
:return: A ``nx.DiGraph`` object representing the event log.
.. code-block:: python3
import pm4py
nx_digraph = pm4py.convert_log_to_networkx(
log,
other_case_attributes_as_nodes=['responsible', 'department'],
event_attributes_as_nodes=['concept:name', 'org:resource']
)
"""
from pm4py.objects.conversion.log import converter
return converter.apply(
log,
variant=converter.Variants.TO_NX,
parameters={
"include_df": include_df,
"case_id_attribute": case_id_key,
"other_case_attributes_as_nodes": other_case_attributes_as_nodes,
"event_attributes_as_nodes": event_attributes_as_nodes,
},
)
[docs]
def convert_log_to_time_intervals(
log: Union[EventLog, pd.DataFrame],
filter_activity_couple: Optional[Tuple[str, str]] = None,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name",
start_timestamp_key: str = "time:timestamp",
) -> List[List[Any]]:
"""
Extracts a list of time intervals from an event log.
Each interval contains two temporally consecutive events within the same case and measures the time between them
(complete timestamp of the first event against the start timestamp of the second event).
:param log: The log object to convert.
:param filter_activity_couple: Optional tuple to filter intervals by a specific pair of activities.
:param activity_key: The attribute to be used as the activity identifier. Defaults to "concept:name".
:param timestamp_key: The attribute to be used as the timestamp. Defaults to "time:timestamp".
:param case_id_key: The attribute to be used as the case identifier. Defaults to "case:concept:name".
:param start_timestamp_key: The attribute to be used as the start timestamp in the interval. Defaults to "time:timestamp".
:return: A list of intervals, where each interval is a list containing relevant information about the time gap.
.. code-block:: python3
import pm4py
log = pm4py.read_xes('tests/input_data/receipt.xes')
time_intervals = pm4py.convert_log_to_time_intervals(log)
print(len(time_intervals))
time_intervals = pm4py.convert_log_to_time_intervals(
log,
filter_activity_couple=('Confirmation of receipt', 'T02 Check confirmation of receipt')
)
print(len(time_intervals))
"""
__event_log_deprecation_warning(log)
properties = get_properties(
log,
activity_key=activity_key,
case_id_key=case_id_key,
timestamp_key=timestamp_key,
)
properties["filter_activity_couple"] = filter_activity_couple
properties[constants.PARAMETER_CONSTANT_START_TIMESTAMP_KEY] = (
start_timestamp_key
)
from pm4py.algo.transformation.log_to_interval_tree.variants import (
open_paths,
)
return open_paths.log_to_intervals(log, parameters=properties)
[docs]
def convert_petri_net_to_networkx(
net: PetriNet, im: Marking, fm: Marking
) -> nx.DiGraph:
"""
Converts a Petri net to a NetworkX DiGraph.
Each place and transition in the Petri net is represented as a node in the graph.
:param net: The Petri net to convert.
:param im: The initial marking of the Petri net.
:param fm: The final marking of the Petri net.
:return: A ``nx.DiGraph`` object representing the Petri net.
.. code-block:: python3
import pm4py
net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
nx_digraph = pm4py.convert_petri_net_to_networkx(net, im, fm)
"""
G = nx_utils.DiGraph()
for place in net.places:
G.add_node(
place.name,
attr={
"name": place.name,
"is_in_im": place in im,
"is_in_fm": place in fm,
"type": "place",
},
)
for trans in net.transitions:
G.add_node(
trans.name,
attr={
"name": trans.name,
"label": trans.label,
"type": "transition",
},
)
for arc in net.arcs:
G.add_edge(
arc.source.name,
arc.target.name,
attr={"weight": arc.weight, "properties": arc.properties},
)
return G
[docs]
def convert_petri_net_type(
net: PetriNet, im: Marking, fm: Marking, type: str = "classic"
) -> Tuple[PetriNet, Marking, Marking]:
"""
Changes the internal type of a Petri net.
Supports conversion to different Petri net types such as classic, reset, inhibitor, and reset_inhibitor nets.
:param net: The Petri net to convert.
:param im: The initial marking of the Petri net.
:param fm: The final marking of the Petri net.
:param type: The target Petri net type. Options are "classic", "reset", "inhibitor", "reset_inhibitor". Defaults to "classic".
:return: A tuple of the converted (``PetriNet``, ``Marking``, ``Marking``).
.. code-block:: python3
import pm4py
net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
reset_net, new_im, new_fm = pm4py.convert_petri_net_type(net, im, fm, type='reset_inhibitor')
"""
from pm4py.objects.petri_net.utils import petri_utils
[net, im, fm] = deepcopy([net, im, fm])
new_net = None
if type == "classic":
from pm4py.objects.petri_net.obj import PetriNet
new_net = PetriNet(net.name)
elif type == "reset":
from pm4py.objects.petri_net.obj import ResetNet
new_net = ResetNet(net.name)
elif type == "inhibitor":
from pm4py.objects.petri_net.obj import InhibitorNet
new_net = InhibitorNet(net.name)
elif type == "reset_inhibitor":
from pm4py.objects.petri_net.obj import ResetInhibitorNet
new_net = ResetInhibitorNet(net.name)
else:
raise ValueError(
f"Unsupported Petri net type '{type}'. Supported types are 'classic', 'reset', 'inhibitor', 'reset_inhibitor'.")
for place in net.places:
new_net.places.add(place)
in_arcs = set(place.in_arcs)
out_arcs = set(place.out_arcs)
for arc in in_arcs:
place.in_arcs.remove(arc)
for arc in out_arcs:
place.out_arcs.remove(arc)
for trans in net.transitions:
new_net.transitions.add(trans)
in_arcs = set(trans.in_arcs)
out_arcs = set(trans.out_arcs)
for arc in in_arcs:
trans.in_arcs.remove(arc)
for arc in out_arcs:
trans.out_arcs.remove(arc)
for arc in net.arcs:
arc_type = (
arc.properties["arctype"] if "arctype" in arc.properties else None
)
new_arc = petri_utils.add_arc_from_to(
arc.source, arc.target, new_net, weight=arc.weight, type=arc_type
)
return new_net, im, fm