'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import gzip
import logging
import importlib.util
import sys
from enum import Enum
from io import BytesIO
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.objects.log.util import sorting
from pm4py.util import exec_utils, constants
from pm4py.util import xes_constants
from pm4py.util.dt_parsing import parser as dt_parser
[docs]
class Parameters(Enum):
TIMESTAMP_SORT = "timestamp_sort"
TIMESTAMP_KEY = constants.PARAMETER_CONSTANT_TIMESTAMP_KEY
REVERSE_SORT = "reverse_sort"
MAX_TRACES = "max_traces"
SHOW_PROGRESS_BAR = "show_progress_bar"
DECOMPRESS_SERIALIZATION = "decompress_serialization"
ENCODING = "encoding"
# ITERPARSE EVENTS
_EVENT_END = "end"
_EVENT_START = "start"
[docs]
def count_traces(context):
"""
Efficiently count the number of traces of a XES event log
Parameters
-------------
context
XML iterparse context
Returns
-------------
num_traces
Number of traces of the XES log
"""
num_traces = 0
for tree_event, elem in context:
if tree_event == _EVENT_START: # starting to read
if elem.tag.endswith(xes_constants.TAG_TRACE):
num_traces = num_traces + 1
elem.clear()
del context
return num_traces
[docs]
def import_from_context(context, num_traces, parameters=None):
"""
Import a XES log from an iterparse context
Parameters
--------------
context
Iterparse context
num_traces
Number of traces of the XES log
parameters
Parameters of the algorithm
Returns
--------------
log
Event log
"""
if parameters is None:
parameters = {}
max_no_traces_to_import = exec_utils.get_param_value(
Parameters.MAX_TRACES, parameters, sys.maxsize
)
timestamp_sort = exec_utils.get_param_value(
Parameters.TIMESTAMP_SORT, parameters, False
)
timestamp_key = exec_utils.get_param_value(
Parameters.TIMESTAMP_KEY,
parameters,
xes_constants.DEFAULT_TIMESTAMP_KEY,
)
reverse_sort = exec_utils.get_param_value(
Parameters.REVERSE_SORT, parameters, False
)
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
date_parser = dt_parser.get()
progress = None
if importlib.util.find_spec("tqdm") and show_progress_bar:
from tqdm.auto import tqdm
progress = tqdm(
total=num_traces, desc="parsing log, completed traces :: "
)
log = None
trace = None
event = None
tree = {}
for tree_event, elem in context:
if tree_event == _EVENT_START: # starting to read
parent = (
tree[elem.getparent()] if elem.getparent() in tree else None
)
if elem.tag.endswith(xes_constants.TAG_STRING):
if parent is not None:
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
elem.get(xes_constants.KEY_VALUE),
tree,
)
continue
elif elem.tag.endswith(xes_constants.TAG_DATE):
try:
dt = date_parser.apply(elem.get(xes_constants.KEY_VALUE))
tree = __parse_attribute(
elem, parent, elem.get(xes_constants.KEY_KEY), dt, tree
)
except TypeError:
logging.info(
"failed to parse date: "
+ str(elem.get(xes_constants.KEY_VALUE))
)
except ValueError:
logging.info(
"failed to parse date: "
+ str(elem.get(xes_constants.KEY_VALUE))
)
continue
elif elem.tag.endswith(xes_constants.TAG_EVENT):
if event is not None:
raise SyntaxError(
"file contains <event> in another <event> tag"
)
event = Event()
tree[elem] = event
continue
elif elem.tag.endswith(xes_constants.TAG_TRACE):
if len(log) >= max_no_traces_to_import:
break
if trace is not None:
raise SyntaxError(
"file contains <trace> in another <trace> tag"
)
trace = Trace()
tree[elem] = trace.attributes
continue
elif elem.tag.endswith(xes_constants.TAG_FLOAT):
if parent is not None:
try:
val = float(elem.get(xes_constants.KEY_VALUE))
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
val,
tree,
)
except ValueError:
logging.info(
"failed to parse float: "
+ str(elem.get(xes_constants.KEY_VALUE))
)
continue
elif elem.tag.endswith(xes_constants.TAG_INT):
if parent is not None:
try:
val = int(elem.get(xes_constants.KEY_VALUE))
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
val,
tree,
)
except ValueError:
logging.info(
"failed to parse int: "
+ str(elem.get(xes_constants.KEY_VALUE))
)
continue
elif elem.tag.endswith(xes_constants.TAG_BOOLEAN):
if parent is not None:
try:
val0 = elem.get(xes_constants.KEY_VALUE)
val = False
if str(val0).lower() == "true":
val = True
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
val,
tree,
)
except ValueError:
logging.info(
"failed to parse boolean: "
+ str(elem.get(xes_constants.KEY_VALUE))
)
continue
elif elem.tag.endswith(
xes_constants.TAG_LIST
) or elem.tag.endswith(xes_constants.TAG_CONTAINER):
if parent is not None:
# lists have no value, hence we put None as a value
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
None,
tree,
)
continue
elif elem.tag.endswith(xes_constants.TAG_ID):
if parent is not None:
tree = __parse_attribute(
elem,
parent,
elem.get(xes_constants.KEY_KEY),
elem.get(xes_constants.KEY_VALUE),
tree,
)
continue
elif elem.tag.endswith(xes_constants.TAG_EXTENSION):
if log is None:
raise SyntaxError("extension found outside of <log> tag")
if (
elem.get(xes_constants.KEY_NAME) is not None
and elem.get(xes_constants.KEY_PREFIX) is not None
and elem.get(xes_constants.KEY_URI) is not None
):
log.extensions[elem.get(xes_constants.KEY_NAME)] = {
xes_constants.KEY_PREFIX: elem.get(
xes_constants.KEY_PREFIX
),
xes_constants.KEY_URI: elem.get(xes_constants.KEY_URI),
}
continue
elif elem.tag.endswith(xes_constants.TAG_GLOBAL):
if log is None:
raise SyntaxError("global found outside of <log> tag")
if elem.get(xes_constants.KEY_SCOPE) is not None:
log.omni_present[elem.get(xes_constants.KEY_SCOPE)] = {}
tree[elem] = log.omni_present[
elem.get(xes_constants.KEY_SCOPE)
]
continue
elif elem.tag.endswith(xes_constants.TAG_CLASSIFIER):
if log is None:
raise SyntaxError("classifier found outside of <log> tag")
if elem.get(xes_constants.KEY_KEYS) is not None:
classifier_value = elem.get(xes_constants.KEY_KEYS)
if "'" in classifier_value:
log.classifiers[elem.get(xes_constants.KEY_NAME)] = [
x for x in classifier_value.split("'") if x.strip()
]
else:
log.classifiers[elem.get(xes_constants.KEY_NAME)] = (
classifier_value.split()
)
continue
elif elem.tag.endswith(xes_constants.TAG_LOG):
if log is not None:
raise SyntaxError("file contains > 1 <log> tags")
log = EventLog()
tree[elem] = log.attributes
continue
elif tree_event == _EVENT_END:
if elem in tree:
del tree[elem]
elem.clear()
if elem.getprevious() is not None:
try:
del elem.getparent()[0]
except TypeError:
pass
if elem.tag.endswith(xes_constants.TAG_EVENT):
if trace is not None:
trace.append(event)
event = None
continue
elif elem.tag.endswith(xes_constants.TAG_TRACE):
log.append(trace)
if progress is not None:
progress.update()
trace = None
continue
elif elem.tag.endswith(xes_constants.TAG_LOG):
continue
# gracefully close progress bar
if progress is not None:
progress.close()
del context, progress
if timestamp_sort:
log = sorting.sort_timestamp(
log, timestamp_key=timestamp_key, reverse_sort=reverse_sort
)
# sets the activity key as default classifier in the log's properties
log.properties[constants.PARAMETER_CONSTANT_ACTIVITY_KEY] = (
xes_constants.DEFAULT_NAME_KEY
)
log.properties[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = (
xes_constants.DEFAULT_NAME_KEY
)
# sets the default timestamp key
log.properties[constants.PARAMETER_CONSTANT_TIMESTAMP_KEY] = (
xes_constants.DEFAULT_TIMESTAMP_KEY
)
# sets the default resource key
log.properties[constants.PARAMETER_CONSTANT_RESOURCE_KEY] = (
xes_constants.DEFAULT_RESOURCE_KEY
)
# sets the default transition key
log.properties[constants.PARAMETER_CONSTANT_TRANSITION_KEY] = (
xes_constants.DEFAULT_TRANSITION_KEY
)
# sets the default group key
log.properties[constants.PARAMETER_CONSTANT_GROUP_KEY] = (
xes_constants.DEFAULT_GROUP_KEY
)
return log
[docs]
def apply(filename, parameters=None):
"""
Imports an XES file into a log object
Parameters
----------
filename:
Absolute filename
parameters
Parameters of the algorithm, including
Parameters.TIMESTAMP_SORT -> Specify if we should sort log by timestamp
Parameters.TIMESTAMP_KEY -> If sort is enabled, then sort the log by using this key
Parameters.REVERSE_SORT -> Specify in which direction the log should be sorted
Parameters.MAX_TRACES -> Specify the maximum number of traces to import from the log (read in order in the XML file)
Parameters.SHOW_PROGRESS_BAR -> Enables/disables the progress bar (default: True)
Parameters.ENCODING -> regulates the encoding (default: utf-8)
Returns
-------
log : :class:`pm4py.log.log.EventLog`
A log
"""
return import_log(filename, parameters)
[docs]
def import_log(filename, parameters=None):
"""
Imports an XES file into a log object
Parameters
----------
filename:
Absolute filename
parameters
Parameters of the algorithm, including
Parameters.TIMESTAMP_SORT -> Specify if we should sort log by timestamp
Parameters.TIMESTAMP_KEY -> If sort is enabled, then sort the log by using this key
Parameters.REVERSE_SORT -> Specify in which direction the log should be sorted
Parameters.MAX_TRACES -> Specify the maximum number of traces to import from the log (read in order in the XML file)
Parameters.SHOW_PROGRESS_BAR -> Enables/disables the progress bar (default: True)
Parameters.ENCODING -> regulates the encoding (default: utf-8)
Returns
-------
log : :class:`pm4py.log.log.EventLog`
A log
"""
from lxml import etree
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
is_compressed = filename.lower().endswith(".gz")
if importlib.util.find_spec("tqdm") and show_progress_bar:
if is_compressed:
f = gzip.open(filename, "rb")
else:
f = open(filename, "rb")
context = etree.iterparse(
f, events=[_EVENT_START, _EVENT_END], encoding=encoding
)
num_traces = count_traces(context)
f.close()
else:
# avoid the iteration to calculate the number of traces is "tqdm" is
# not used
num_traces = 0
if is_compressed:
f = gzip.open(filename, "rb")
else:
f = open(filename, "rb")
context = etree.iterparse(
f, events=[_EVENT_START, _EVENT_END], encoding=encoding
)
log = import_from_context(context, num_traces, parameters=parameters)
f.close()
return log
[docs]
def import_from_string(log_string, parameters=None):
"""
Deserialize a text/binary string representing a XES log
Parameters
-----------
log_string
String that contains the XES
parameters
Parameters of the algorithm, including
Parameters.TIMESTAMP_SORT -> Specify if we should sort log by timestamp
Parameters.TIMESTAMP_KEY -> If sort is enabled, then sort the log by using this key
Parameters.REVERSE_SORT -> Specify in which direction the log should be sorted
Parameters.INSERT_TRACE_INDICES -> Specify if trace indexes should be added as event attribute for each event
Parameters.MAX_TRACES -> Specify the maximum number of traces to import from the log (read in order in the XML file)
Parameters.SHOW_PROGRESS_BAR -> Enables/disables the progress bar (default: True)
Parameters.ENCODING -> regulates the encoding (default: utf-8)
Returns
-----------
log
Trace log object
"""
from lxml import etree
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
decompress_serialization = exec_utils.get_param_value(
Parameters.DECOMPRESS_SERIALIZATION, parameters, False
)
if type(log_string) is str:
log_string = log_string.encode(constants.DEFAULT_ENCODING)
if importlib.util.find_spec("tqdm") and show_progress_bar:
# first iteration: count the number of traces
b = BytesIO(log_string)
if decompress_serialization:
s = gzip.GzipFile(fileobj=b, mode="rb")
else:
s = b
context = etree.iterparse(
s, events=[_EVENT_START, _EVENT_END], encoding=encoding
)
num_traces = count_traces(context)
s.close()
b.close()
else:
# avoid the iteration to calculate the number of traces is "tqdm" is
# not used
num_traces = 0
# second iteration: actually read the content
b = BytesIO(log_string)
if decompress_serialization:
s = gzip.GzipFile(fileobj=b, mode="rb")
else:
s = b
context = etree.iterparse(
s, events=[_EVENT_START, _EVENT_END], encoding=encoding
)
log = import_from_context(context, num_traces, parameters=parameters)
s.close()
b.close()
return log
def __parse_attribute(elem, store, key, value, tree):
if len(elem.getchildren()) == 0:
if type(store) is list:
# changes to the store of lists: not dictionaries anymore
# but pairs of key-values.
store.append((key, value))
elif type(store) is set:
store.add((key, value))
else:
store[key] = value
else:
if elem.tag.endswith(xes_constants.TAG_LIST):
store[key] = {
xes_constants.KEY_VALUE: value,
xes_constants.KEY_CHILDREN: list(),
}
tree[elem] = store[key][xes_constants.KEY_CHILDREN]
elif elem.tag.endswith(xes_constants.TAG_CONTAINER):
store[key] = {
xes_constants.KEY_VALUE: value,
xes_constants.KEY_CHILDREN: set(),
}
tree[elem] = store[key][xes_constants.KEY_CHILDREN]
else:
store[key] = {
xes_constants.KEY_VALUE: value,
xes_constants.KEY_CHILDREN: dict(),
}
tree[elem] = store[key][xes_constants.KEY_CHILDREN]
return tree