'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from typing import Tuple, Dict, Optional, Union
import os
import tempfile
import importlib.util
from urllib.parse import urlparse
from pm4py.objects.bpmn.obj import BPMN
from pm4py.objects.log.obj import EventLog
from pm4py.objects.ocel.obj import OCEL
from pm4py.objects.petri_net.obj import PetriNet, Marking
from pm4py.objects.process_tree.obj import ProcessTree
from pm4py.util import constants
from pandas import DataFrame
from pm4py.utils import __rustxes_usage_warning, __rustxes_non_usage_warning
INDEX_COLUMN = "@@index"
__doc__ = """
The `pm4py.read` module contains all functionality related to reading files and objects from disk (or via URIs).
"""
def _resolve_path(file_path: str) -> str:
"""
Resolve a file path which can be either:
- A local file path
- An HTTP/HTTPS URL
If the path is a remote URL, the file is downloaded to a temporary file,
and the local temporary file path is returned.
"""
parsed = urlparse(file_path)
if parsed.scheme in ("http", "https"):
import requests
response = requests.get(file_path)
response.raise_for_status()
# Infer the file extension from the URL (if available)
_, extension = os.path.splitext(parsed.path)
if not extension:
extension = ".tmp"
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=extension)
temp_file.write(response.content)
temp_file.flush()
temp_file.close()
return temp_file.name
else:
if not os.path.exists(file_path):
raise Exception(f"File does not exist at path: {file_path}")
return file_path
[docs]
def read_xes(
file_path: str,
variant: Optional[str] = None,
return_legacy_log_object: bool = constants.DEFAULT_READ_XES_LEGACY_OBJECT,
encoding: str = constants.DEFAULT_ENCODING,
**kwargs
) -> Union[DataFrame, EventLog]:
"""
Reads an event log stored in XES format (see `xes-standard <https://xes-standard.org/>`_).
Returns a table (`pandas.DataFrame`) view of the event log or an `EventLog` object.
:param file_path: Path/URI to the event log (`.xes` file).
:param variant: Variant of the importer to use. Options include:
- "iterparse" – traditional XML parser,
- "line_by_line" – text-based line-by-line importer,
- "chunk_regex" – chunk-of-bytes importer (default),
- "iterparse20" – XES 2.0 importer,
- "rustxes" – Rust-based importer.
:param return_legacy_log_object: Boolean indicating whether to return a legacy `EventLog` object (default: `False`).
:param encoding: Encoding to be used (default: `utf-8`).
:param **kwargs: Additional parameters to pass to the importer.
:rtype: `pandas.DataFrame` or `pm4py.objects.log.obj.EventLog`
.. code-block:: python3
import pm4py
log = pm4py.read_xes("<path_or_uri_to_xes_file>")
"""
local_path = _resolve_path(file_path)
if variant is None:
if importlib.util.find_spec("rustxes"):
__rustxes_usage_warning()
variant = "rustxes"
else:
__rustxes_non_usage_warning()
variant = constants.DEFAULT_XES_PARSER
from pm4py.objects.log.importer.xes import importer as xes_importer
v = xes_importer.Variants.CHUNK_REGEX
if variant == "iterparse_20":
v = xes_importer.Variants.ITERPARSE_20
elif variant == "iterparse":
v = xes_importer.Variants.ITERPARSE
elif variant == "lxml":
v = xes_importer.Variants.ITERPARSE
elif variant == "iterparse_mem_compressed":
v = xes_importer.Variants.ITERPARSE_MEM_COMPRESSED
elif variant == "line_by_line":
v = xes_importer.Variants.LINE_BY_LINE
elif variant == "chunk_regex":
v = xes_importer.Variants.CHUNK_REGEX
elif variant == "rustxes":
v = xes_importer.Variants.RUSTXES
from copy import copy
parameters = copy(kwargs)
parameters["encoding"] = encoding
parameters["return_legacy_log_object"] = return_legacy_log_object
log = xes_importer.apply(local_path, variant=v, parameters=parameters)
if isinstance(log, EventLog) and not return_legacy_log_object:
from pm4py.objects.conversion.log import converter as log_converter
log = log_converter.apply(
log, variant=log_converter.Variants.TO_DATA_FRAME
)
return log
[docs]
def read_pnml(
file_path: str,
auto_guess_final_marking: bool = False,
encoding: str = constants.DEFAULT_ENCODING,
) -> Tuple[PetriNet, Marking, Marking]:
"""
Reads a Petri net object from a `.pnml` file.
The returned Petri net object is a tuple containing:
1. PetriNet object (`PetriNet`)
2. Initial Marking (`Marking`)
3. Final Marking (`Marking`)
:param file_path: Path/URI to the Petri net model (`.pnml` file).
:param auto_guess_final_marking: Boolean indicating whether to automatically guess the final marking (default: `False`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `Tuple[PetriNet, Marking, Marking]`
.. code-block:: python3
import pm4py
pn = pm4py.read_pnml("<path_or_uri_to_pnml_file>")
"""
local_path = _resolve_path(file_path)
from pm4py.objects.petri_net.importer import importer as pnml_importer
net, im, fm = pnml_importer.apply(
local_path,
parameters={
"auto_guess_final_marking": auto_guess_final_marking,
"encoding": encoding,
},
)
return net, im, fm
[docs]
def read_ptml(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> ProcessTree:
"""
Reads a process tree object from a `.ptml` file.
:param file_path: Path/URI to the process tree file on disk.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `ProcessTree`
.. code-block:: python3
import pm4py
process_tree = pm4py.read_ptml("<path_or_uri_to_ptml_file>")
"""
local_path = _resolve_path(file_path)
from pm4py.objects.process_tree.importer import importer as tree_importer
tree = tree_importer.apply(local_path, parameters={"encoding": encoding})
return tree
[docs]
def read_dfg(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> Tuple[Dict[Tuple[str, str], int], Dict[str, int], Dict[str, int]]:
"""
Reads a Directly-Follows Graph (DFG) from a `.dfg` file.
The returned DFG object is a tuple containing:
1. DFG (`Dict[Tuple[str, str], int]`): Maps pairs of activities to their occurrence count.
For example, `DFG[('a', 'b')] = k` indicates that activity `'a'` is directly followed by activity `'b'` a total of `k` times in the log.
2. Start Activity Dictionary (`Dict[str, int]`): Maps activities to the number of traces they start.
For example, `S['a'] = k` implies that activity `'a'` starts `k` traces in the event log.
3. End Activity Dictionary (`Dict[str, int]`): Maps activities to the number of traces they end.
For example, `E['z'] = k` implies that activity `'z'` ends `k` traces in the event log.
:param file_path: Path/URI to the DFG model file.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `Tuple[Dict[Tuple[str, str], int], Dict[str, int], Dict[str, int]]`
.. code-block:: python3
import pm4py
dfg = pm4py.read_dfg("<path_or_uri_to_dfg_file>")
"""
local_path = _resolve_path(file_path)
from pm4py.objects.dfg.importer import importer as dfg_importer
dfg, start_activities, end_activities = dfg_importer.apply(
local_path, parameters={"encoding": encoding}
)
return dfg, start_activities, end_activities
[docs]
def read_bpmn(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> BPMN:
"""
Reads a BPMN model from a `.bpmn` file.
:param file_path: Path/URI to the BPMN model file.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `BPMN`
.. code-block:: python3
import pm4py
bpmn = pm4py.read_bpmn('<path_or_uri_to_bpmn_file>')
"""
local_path = _resolve_path(file_path)
from pm4py.objects.bpmn.importer import importer as bpmn_importer
bpmn_graph = bpmn_importer.apply(
local_path, parameters={"encoding": encoding}
)
return bpmn_graph
[docs]
def read_ocel(
file_path: str,
objects_path: Optional[str] = None,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an object-centric event log from a file (see: http://www.ocel-standard.org/).
Returns an `OCEL` object.
:param file_path: Path/URI to the object-centric event log file.
:param objects_path: [Optional] Path/URI to the objects dataframe file.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel("<path_or_uri_to_ocel_file>")
"""
local_path = _resolve_path(file_path)
local_objects_path = _resolve_path(objects_path) if objects_path else None
if local_path.lower().endswith("csv"):
return read_ocel_csv(local_path, local_objects_path, encoding=encoding)
elif local_path.lower().endswith("jsonocel"):
return read_ocel_json(local_path, encoding=encoding)
elif local_path.lower().endswith("xmlocel"):
return read_ocel_xml(local_path, encoding=encoding)
elif local_path.lower().endswith(".sqlite"):
return read_ocel_sqlite(local_path, encoding=encoding)
raise Exception("Unsupported file format")
[docs]
def read_ocel_csv(
file_path: str,
objects_path: Optional[str] = None,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an object-centric event log from a CSV file (see: http://www.ocel-standard.org/).
Returns an `OCEL` object.
:param file_path: Path/URI to the object-centric event log file (`.csv`).
:param objects_path: [Optional] Path/URI to the objects dataframe file.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel_csv("<path_or_uri_to_ocel_file.csv>")
"""
from pm4py.objects.ocel.importer.csv import importer as csv_importer
return csv_importer.apply(
file_path, objects_path=objects_path, parameters={"encoding": encoding}
)
[docs]
def read_ocel_json(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> OCEL:
"""
Reads an object-centric event log from a JSON-OCEL file (see: http://www.ocel-standard.org/).
Returns an `OCEL` object.
:param file_path: Path/URI to the object-centric event log file (`.jsonocel`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel_json("<path_or_uri_to_ocel_file.jsonocel>")
"""
from pm4py.objects.ocel.importer.jsonocel import importer as jsonocel_importer
return jsonocel_importer.apply(
file_path,
variant=jsonocel_importer.Variants.CLASSIC,
parameters={"encoding": encoding},
)
[docs]
def read_ocel_xml(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> OCEL:
"""
Reads an object-centric event log from an XML-OCEL file (see: http://www.ocel-standard.org/).
Returns an `OCEL` object.
:param file_path: Path/URI to the object-centric event log file (`.xmlocel`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel_xml("<path_or_uri_to_ocel_file.xmlocel>")
"""
from pm4py.objects.ocel.importer.xmlocel import importer as xmlocel_importer
return xmlocel_importer.apply(
file_path,
variant=xmlocel_importer.Variants.CLASSIC,
parameters={"encoding": encoding},
)
[docs]
def read_ocel_sqlite(
file_path: str, encoding: str = constants.DEFAULT_ENCODING
) -> OCEL:
"""
Reads an object-centric event log from a SQLite database (see: http://www.ocel-standard.org/).
Returns an `OCEL` object.
:param file_path: Path/URI to the SQLite database file (`.sqlite`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel_sqlite("<path_or_uri_to_ocel_file.sqlite>")
"""
from pm4py.objects.ocel.importer.sqlite import importer as sqlite_importer
return sqlite_importer.apply(
file_path,
variant=sqlite_importer.Variants.PANDAS_IMPORTER,
parameters={"encoding": encoding},
)
[docs]
def read_ocel2(
file_path: str,
variant_str: Optional[str] = None,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an OCEL 2.0 event log.
:param file_path: Path/URI to the OCEL 2.0 event log file.
:param variant_str: [Optional] Specification of the importer variant to be used.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
Supported file formats based on extension:
- `.sqlite` – SQLite database,
- `.xml` or `.xmlocel` – XML file,
- `.json` or `.jsonocel` – JSON file.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel2("<path_or_uri_to_ocel_file>")
"""
local_path = _resolve_path(file_path)
if local_path.lower().endswith("sqlite"):
return read_ocel2_sqlite(
local_path, variant_str=variant_str, encoding=encoding
)
elif local_path.lower().endswith("xml") or local_path.lower().endswith(
"xmlocel"
):
return read_ocel2_xml(
local_path, encoding=encoding
)
elif local_path.lower().endswith("json") or local_path.lower().endswith(
"jsonocel"
):
return read_ocel2_json(
local_path, encoding=encoding
)
raise Exception("Unsupported file format for OCEL 2.0")
[docs]
def read_ocel2_json(
file_path: str,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an OCEL 2.0 event log from a JSON-OCEL2 file.
:param file_path: Path/URI to the JSON file (`.jsonocel`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel2_json("<path_or_uri_to_ocel_file.jsonocel>")
"""
from pm4py.objects.ocel.importer.jsonocel import importer as jsonocel_importer
if importlib.util.find_spec("rustxes"):
__rustxes_usage_warning()
variant = jsonocel_importer.Variants.OCEL20_RUSTXES
else:
__rustxes_non_usage_warning()
variant = jsonocel_importer.Variants.OCEL20_STANDARD
return jsonocel_importer.apply(
file_path, variant=variant, parameters={"encoding": encoding}
)
[docs]
def read_ocel2_sqlite(
file_path: str,
variant_str: Optional[str] = None,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an OCEL 2.0 event log from a SQLite database.
:param file_path: Path/URI to the OCEL 2.0 SQLite database file (`.sqlite`).
:param variant_str: [Optional] Specification of the importer variant to be used.
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel2_sqlite("<path_or_uri_to_ocel_file.sqlite>")
"""
from pm4py.objects.ocel.importer.sqlite import importer as sqlite_importer
return sqlite_importer.apply(
file_path,
variant=sqlite_importer.Variants.OCEL20,
parameters={"encoding": encoding},
)
[docs]
def read_ocel2_xml(
file_path: str,
encoding: str = constants.DEFAULT_ENCODING,
) -> OCEL:
"""
Reads an OCEL 2.0 event log from an XML file.
:param file_path: Path/URI to the OCEL 2.0 XML file (`.xmlocel`).
:param encoding: Encoding to be used (default: `utf-8`).
:rtype: `OCEL`
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel2_xml("<path_or_uri_to_ocel_file.xmlocel>")
"""
from pm4py.objects.ocel.importer.xmlocel import importer as xml_importer
if importlib.util.find_spec("rustxes"):
__rustxes_usage_warning()
variant = xml_importer.Variants.OCEL20_RUSTXES
else:
__rustxes_non_usage_warning()
variant = xml_importer.Variants.OCEL20
return xml_importer.apply(
file_path, variant=variant, parameters={"encoding": encoding}
)