Source code for pm4py.objects.log.importer.xes.variants.chunk_regex
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import gzip
import os
import sys
from enum import Enum
from io import BytesIO
from pm4py.objects.log.obj import EventLog, Trace, Event
from pm4py.util import constants, exec_utils
from pm4py.util.dt_parsing import parser as dt_parser
import re
from collections import deque
[docs]
class Parameters(Enum):
DECOMPRESS_SERIALIZATION = "decompress_serialization"
ENCODING = "encoding"
[docs]
def apply(filename, parameters=None):
return import_log(filename, parameters)
[docs]
def import_log_from_file_object(
F, encoding, file_size=sys.maxsize, parameters=None
):
"""
Import a log object from a (XML) file object
Parameters
-----------
F
file object
encoding
Encoding
file_size
Size of the file (measured on disk)
parameters
Parameters of the algorithm
Returns
-----------
log
Log file
"""
nb = 2**12 # bytes per chunk
rex = re.compile(r"(<|>)")
parser = dt_parser.get()
cont = F.read(nb)
curr_els_attrs = []
fk_dict = {}
log = EventLog()
trace = None
while cont:
lst = deque(rex.split(cont.decode(encoding)))
while lst:
el = lst.popleft()
if len(el.rstrip()) > 0:
if el == "<":
continue
elif el == ">":
continue
while len(lst) == 0:
# need to read more
cont = F.read(nb)
if cont:
lst2 = rex.split(cont.decode(encoding))
el = el + lst2[0]
lst = deque(lst2[1:])
else:
break
if el[0] == "/":
if len(curr_els_attrs) > 1:
curr_els_attrs.pop()
else:
return log
continue
idx = el.find(" ")
if idx > -1:
tag = el[:idx]
el = el.split('"')
el[-1] = el[-1].strip()
if tag == "string":
curr_els_attrs[-1][el[1]] = el[3]
if el[-1] != "/":
curr_els_attrs.append(fk_dict)
continue
elif tag == "date":
curr_els_attrs[-1][el[1]] = parser.apply(el[3])
if el[-1] != "/":
curr_els_attrs.append(fk_dict)
continue
elif tag == "int":
curr_els_attrs[-1][el[1]] = int(el[3])
if el[-1] != "/":
curr_els_attrs.append(fk_dict)
continue
elif tag == "float":
curr_els_attrs[-1][el[1]] = float(el[3])
if el[-1] != "/":
curr_els_attrs.append(fk_dict)
continue
elif tag == "boolean":
curr_els_attrs[-1][el[1]] = (
True if el[3] == "true" else False
)
if el[-1] != "/":
curr_els_attrs.append(fk_dict)
continue
elif tag == "extension":
ext = log.extensions
name = el[
[i for i in range(len(el)) if "name=" in el[i]][0]
+ 1
]
prefix = el[
[i for i in range(len(el)) if "prefix=" in el[i]][
0
]
+ 1
]
uri = el[
[i for i in range(len(el)) if "uri=" in el[i]][0]
+ 1
]
ext[name] = {"prefix": prefix, "uri": uri}
if el[-1] != "/":
curr_els_attrs.append(ext)
continue
elif tag == "classifier":
classif = log.classifiers
name = el[
[i for i in range(len(el)) if "name=" in el[i]][0]
+ 1
]
keys = el[
[i for i in range(len(el)) if "keys=" in el[i]][0]
+ 1
]
if "'" in keys:
classif[name] = [
x for x in keys.split("'") if x.strip()
]
else:
classif[name] = keys.split()
if el[-1] != "/":
curr_els_attrs.append(classif)
continue
elif tag == "global":
glob = log.omni_present
scope = el[1]
dct = {}
glob[scope] = dct
if el[-1] != "/":
curr_els_attrs.append(dct)
continue
elif tag == "log":
curr_els_attrs.append(log.attributes)
continue
elif tag == "list":
dct_children = {}
dct = {"value": None, "children": dct_children}
curr_els_attrs[-1][el[1]] = dct
curr_els_attrs.append(dct_children)
continue
else:
if el == "event":
event = Event()
curr_els_attrs.append(event)
trace.append(event)
continue
elif el == "trace":
trace = Trace()
curr_els_attrs.append(trace.attributes)
log.append(trace)
continue
elif el == "log":
curr_els_attrs.append(log.attributes)
continue
elif el == "values":
curr_els_attrs.append(curr_els_attrs[-1])
cont = F.read(nb)
return log
[docs]
def import_log(filename, parameters=None):
"""
Import a log object from a XML file
containing the traces, the events and the simple attributes of them
Parameters
-----------
filename
XES file to parse
parameters
Parameters of the algorithm, including
Parameters.TIMESTAMP_SORT -> Specify if we should sort log by timestamp
Parameters.TIMESTAMP_KEY -> If sort is enabled, then sort the log by using this key
Parameters.REVERSE_SORT -> Specify in which direction the log should be sorted
Parameters.MAX_TRACES -> Specify the maximum number of traces to import from the log (read in order in the XML file)
Parameters.MAX_BYTES -> Maximum number of bytes to read
Parameters.SKYP_BYTES -> Number of bytes to skip
Parameters.SET_ATTRIBUTES_TO_READ -> Names of the attributes that should be parsed. If not specified,
then, all the attributes are parsed.
Parameters.ENCODING -> Regulates the encoding of the log (default: utf-8)
Returns
-----------
log
Log file
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
is_compressed = filename.endswith(".gz")
file_size = os.stat(filename).st_size
if is_compressed:
f = gzip.open(filename, mode="rb")
else:
f = open(filename, "rb")
log = import_log_from_file_object(
f, encoding, file_size=file_size, parameters=parameters
)
f.close()
return log
[docs]
def import_from_string(log_string, parameters=None):
"""
Deserialize a text/binary string representing a XES log
Parameters
-----------
log_string
String that contains the XES
parameters
Parameters of the algorithm
Returns
-----------
log
Trace log object
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
decompress_serialization = exec_utils.get_param_value(
Parameters.DECOMPRESS_SERIALIZATION, parameters, False
)
if type(log_string) is str:
log_string = log_string.encode(constants.DEFAULT_ENCODING)
b = BytesIO(log_string)
if decompress_serialization:
s = gzip.GzipFile(fileobj=b, mode="rb")
else:
s = b
log = import_log_from_file_object(s, encoding, parameters=parameters)
s.close()
b.close()
return log