'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import gzip
import importlib.util
from enum import Enum
from io import BytesIO
from pm4py.objects.log.util import xes as xes_util
from pm4py.util import exec_utils, constants
from xml.sax.saxutils import quoteattr
[docs]
class Parameters(Enum):
COMPRESS = "compress"
SHOW_PROGRESS_BAR = "show_progress_bar"
ENCODING = "encoding"
# defines correspondence between Python types and XES types
__TYPE_CORRESPONDENCE = {
"str": xes_util.TAG_STRING,
"int": xes_util.TAG_INT,
"float": xes_util.TAG_FLOAT,
"datetime": xes_util.TAG_DATE,
"Timestamp": xes_util.TAG_DATE,
"bool": xes_util.TAG_BOOLEAN,
"dict": xes_util.TAG_LIST,
"numpy.int64": xes_util.TAG_INT,
"numpy.float64": xes_util.TAG_FLOAT,
"numpy.datetime64": xes_util.TAG_DATE,
}
# if a type is not found in the previous list, then default to string
__DEFAULT_TYPE = xes_util.TAG_STRING
def __get_xes_attr_type(attr_name, attr_type):
"""
Transform a Python attribute type (e.g. str, datetime) into a XES attribute type (e.g. string, date)
Parameters
----------
attr_name
Name of the attribute
attr_type:
Python attribute type
"""
if attr_name == xes_util.DEFAULT_NAME_KEY:
return xes_util.TAG_STRING
elif attr_type in __TYPE_CORRESPONDENCE:
attr_type_xes = __TYPE_CORRESPONDENCE[attr_type]
else:
attr_type_xes = __DEFAULT_TYPE
return attr_type_xes
def __get_xes_attr_value(attr_value, attr_type_xes):
"""
Transform an attribute value from Python format to XES format (the type is provided as argument)
Parameters
----------
attr_value:
XES attribute value
attr_type_xes:
XES attribute type
"""
if attr_type_xes == xes_util.TAG_DATE:
return attr_value.isoformat()
elif attr_type_xes == xes_util.TAG_BOOLEAN:
return str(attr_value).lower()
return str(attr_value)
[docs]
def get_tab_indent(n):
"""
Get the desidered number of indentations as string
Parameters
-------------
n
Number of indentations
Returns
-------------
str_tab_indent
Desidered number of indentations as string
"""
return "".join(["\t"] * n)
[docs]
def escape(stru):
"""
XML-escape a string
Parameters
----------------
stru
String to be escaped
Returns
----------------
escaped_stru
Escaped string
"""
return quoteattr(stru)
[docs]
def export_attribute(attr_name, attr_value, indent_level):
"""
Exports an attribute
Parameters
--------------
attr_name
Name of the attribute
attr_value
Value of the attribute
indent_level
Level of indentation
Returns
--------------
stru
String representing the content of the attribute
"""
ret = []
if attr_name is not None and attr_value is not None:
attr_type = __get_xes_attr_type(attr_name, type(attr_value).__name__)
if not attr_type == xes_util.TAG_LIST:
attr_value = __get_xes_attr_value(attr_value, attr_type)
ret.append(
get_tab_indent(indent_level)
+ "<%s key=%s value=%s />\n"
% (attr_type, escape(attr_name), escape(attr_value))
)
else:
if attr_value[xes_util.KEY_VALUE] is None:
# list
ret.append(
get_tab_indent(indent_level)
+ "<list key=%s>\n" % (escape(attr_name))
)
ret.append(get_tab_indent(indent_level + 1) + "<values>\n")
for subattr in attr_value[xes_util.KEY_CHILDREN]:
ret.append(
export_attribute(
subattr[0], subattr[1], indent_level + 2
)
)
ret.append(get_tab_indent(indent_level + 1) + "</values>\n")
ret.append(get_tab_indent(indent_level) + "</list>\n")
else:
# nested attribute
this_value = attr_value[xes_util.KEY_VALUE]
this_type = __get_xes_attr_type(
attr_name, type(this_value).__name__
)
this_value = __get_xes_attr_value(this_value, this_type)
ret.append(
get_tab_indent(indent_level)
+ "<%s key=%s value=%s>\n"
% (this_type, escape(attr_name), escape(this_value))
)
for subattr_name, subattr_value in attr_value[
xes_util.KEY_CHILDREN
].items():
ret.append(
export_attribute(
subattr_name, subattr_value, indent_level + 1
)
)
ret.append("</%s>\n" % this_type)
return "".join(ret)
[docs]
def export_trace_line_by_line(trace, fp_obj, encoding):
"""
Exports the content of a trace line-by-line
to a file object
Parameters
-----------------
trace
Trace
fp_obj
File object
encoding
Encoding
"""
fp_obj.write((get_tab_indent(1) + "<trace>\n").encode(encoding))
for attr_name, attr_value in trace.attributes.items():
fp_obj.write(
export_attribute(attr_name, attr_value, 2).encode(encoding)
)
for event in trace:
fp_obj.write((get_tab_indent(2) + "<event>\n").encode(encoding))
for attr_name, attr_value in event.items():
fp_obj.write(
export_attribute(attr_name, attr_value, 3).encode(encoding)
)
fp_obj.write((get_tab_indent(2) + "</event>\n").encode(encoding))
fp_obj.write((get_tab_indent(1) + "</trace>\n").encode(encoding))
[docs]
def export_log_line_by_line(log, fp_obj, encoding, parameters=None):
"""
Exports the contents of the log line-by-line
to a file object
Parameters
--------------
log
Event log
fp_obj
File object
encoding
Encoding
parameters
Parameters of the algorithm
"""
if parameters is None:
parameters = {}
show_progress_bar = exec_utils.get_param_value(
Parameters.SHOW_PROGRESS_BAR, parameters, constants.SHOW_PROGRESS_BAR
)
progress = None
if importlib.util.find_spec("tqdm") and show_progress_bar:
from tqdm.auto import tqdm
progress = tqdm(
total=len(log), desc="exporting log, completed traces :: "
)
fp_obj.write(
('<?xml version="1.0" encoding="' + encoding + '" ?>\n').encode(
encoding
)
)
fp_obj.write(
(
"<log "
+ xes_util.TAG_VERSION
+ '="'
+ xes_util.VALUE_XES_VERSION
+ '" '
+ xes_util.TAG_FEATURES
+ '="'
+ xes_util.VALUE_XES_FEATURES
+ '" '
+ xes_util.TAG_XMLNS
+ '="'
+ xes_util.VALUE_XMLNS
+ '">\n'
).encode(encoding)
)
for ext_name, ext_value in log.extensions.items():
fp_obj.write(
(
get_tab_indent(1)
+ '<extension name="%s" prefix="%s" uri="%s" />\n'
% (
ext_name,
ext_value[xes_util.KEY_PREFIX],
ext_value[xes_util.KEY_URI],
)
).encode(encoding)
)
for clas_name, clas_attributes in log.classifiers.items():
fp_obj.write(
(
get_tab_indent(1)
+ '<classifier name="%s" keys="%s" />\n'
% (clas_name, " ".join(clas_attributes))
).encode(encoding)
)
for attr_name, attr_value in log.attributes.items():
fp_obj.write(
export_attribute(attr_name, attr_value, 1).encode(encoding)
)
for scope in log.omni_present:
fp_obj.write(
(get_tab_indent(1) + '<global scope="%s">\n' % (scope)).encode(
encoding
)
)
for attr_name, attr_value in log.omni_present[scope].items():
fp_obj.write(
export_attribute(attr_name, attr_value, 2).encode(encoding)
)
fp_obj.write((get_tab_indent(1) + "</global>\n").encode(encoding))
for trace in log:
export_trace_line_by_line(trace, fp_obj, encoding)
if progress is not None:
progress.update()
# gracefully close progress bar
if progress is not None:
progress.close()
del progress
fp_obj.write("</log>\n".encode(encoding))
[docs]
def apply(log, output_file_path, parameters=None):
"""
Exports a XES log using a non-standard exporter
(classifiers, lists, nested attributes, globals, extensions are not supported)
Parameters
------------
log
Event log
output_file_path
Path to the XES file
parameters
Parameters
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
compress = exec_utils.get_param_value(
Parameters.COMPRESS,
parameters,
output_file_path.lower().endswith(".gz"),
)
if compress:
if not output_file_path.lower().endswith(".gz"):
output_file_path = output_file_path + ".gz"
f = gzip.open(output_file_path, mode="wb")
else:
f = open(output_file_path, "wb")
export_log_line_by_line(log, f, encoding, parameters=parameters)
f.close()
[docs]
def export_log_as_string(log, parameters=None):
"""
Export a log into a string
Parameters
-----------
log: :class:`pm4py.log.log.EventLog`
PM4PY log
parameters
Parameters of the algorithm
Returns
-----------
logString
Log as a string
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
compress = exec_utils.get_param_value(
Parameters.COMPRESS, parameters, False
)
b = BytesIO()
if compress:
d = gzip.GzipFile(fileobj=b, mode="wb")
else:
d = b
export_log_line_by_line(log, d, encoding, parameters=parameters)
if compress:
d.close()
return b.getvalue()