Source code for pm4py.streaming.importer.xes.variants.xes_event_stream

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
import logging
from enum import Enum

from pm4py.objects.log.obj import Event, Trace
from pm4py.util import xes_constants, exec_utils, constants
from pm4py.util.dt_parsing import parser as dt_parser


[docs] class Parameters(Enum): ACCEPTANCE_CONDITION = "acceptance_condition"
_EVENT_END = "end" _EVENT_START = "start"
[docs] def parse_attribute(elem, store, key, value, tree): if len(elem.getchildren()) == 0: if type(store) is list: # changes to the store of lists: not dictionaries anymore # but pairs of key-values. store.append((key, value)) else: store[key] = value else: if elem.getchildren()[0].tag.endswith(xes_constants.TAG_VALUES): store[key] = { xes_constants.KEY_VALUE: value, xes_constants.KEY_CHILDREN: list(), } tree[elem] = store[key][xes_constants.KEY_CHILDREN] tree[elem.getchildren()[0]] = tree[elem] else: store[key] = { xes_constants.KEY_VALUE: value, xes_constants.KEY_CHILDREN: dict(), } tree[elem] = store[key][xes_constants.KEY_CHILDREN] return tree
[docs] class StreamingEventXesReader: def __init__(self, path, parameters=None): """ Initialize the iterable log object Parameters ------------- path Path to the XES log """ if parameters is None: parameters = {} self.path = path self.acceptance_condition = exec_utils.get_param_value( Parameters.ACCEPTANCE_CONDITION, parameters, lambda x: True ) self.date_parser = dt_parser.get() self.reset() def __iter__(self): """ Starts the iteration """ return self def __next__(self): """ Gets the next element of the log """ event = self.read_event() if self.reading_log: return event raise StopIteration
[docs] def to_event_stream(self, event_stream): """ Sends the content of a XES log to an event stream Parameters -------------- event_stream Event stream """ while self.reading_log: event = self.read_event() if event is not None: event_stream.append(event)
[docs] def reset(self): """ Resets the iterator """ # reset the variables from lxml import etree self.context = None self.tree = None # initialize the variables self.context = etree.iterparse( self.path, events=[_EVENT_START, _EVENT_END] ) self.event = None self.reading_log = True self.reading_event = False self.reading_trace = False self.tree = {}
[docs] def read_event(self): """ Gets the next event from the iterator Returns ------------ event Event """ tree = self.tree while True: tree_event, elem = next(self.context) if tree_event == _EVENT_START: parent = ( tree[elem.getparent()] if elem.getparent() in tree else None ) if elem.tag.endswith(xes_constants.TAG_TRACE): self.trace = Trace() tree[elem] = self.trace.attributes self.reading_trace = True continue if elem.tag.endswith(xes_constants.TAG_EVENT): self.event = Event() tree[elem] = self.event self.reading_event = True continue if self.reading_event or self.reading_trace: if elem.tag.endswith(xes_constants.TAG_STRING): if parent is not None: tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), elem.get(xes_constants.KEY_VALUE), tree, ) continue elif elem.tag.endswith(xes_constants.TAG_DATE): try: dt = self.date_parser.apply( elem.get(xes_constants.KEY_VALUE) ) tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), dt, tree, ) except TypeError: logging.info( "failed to parse date: " + str(elem.get(xes_constants.KEY_VALUE)) ) except ValueError: logging.info( "failed to parse date: " + str(elem.get(xes_constants.KEY_VALUE)) ) continue elif elem.tag.endswith(xes_constants.TAG_FLOAT): if parent is not None: try: val = float(elem.get(xes_constants.KEY_VALUE)) tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), val, tree, ) except ValueError: logging.info( "failed to parse float: " + str(elem.get(xes_constants.KEY_VALUE)) ) continue elif elem.tag.endswith(xes_constants.TAG_INT): if parent is not None: try: val = int(elem.get(xes_constants.KEY_VALUE)) tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), val, tree, ) except ValueError: logging.info( "failed to parse int: " + str(elem.get(xes_constants.KEY_VALUE)) ) continue elif elem.tag.endswith(xes_constants.TAG_BOOLEAN): if parent is not None: try: val0 = elem.get(xes_constants.KEY_VALUE) val = False if str(val0).lower() == "true": val = True tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), val, tree, ) except ValueError: logging.info( "failed to parse boolean: " + str(elem.get(xes_constants.KEY_VALUE)) ) continue elif elem.tag.endswith(xes_constants.TAG_LIST): if parent is not None: # lists have no value, hence we put None as a value tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), None, tree, ) continue elif elem.tag.endswith(xes_constants.TAG_ID): if parent is not None: tree = parse_attribute( elem, parent, elem.get(xes_constants.KEY_KEY), elem.get(xes_constants.KEY_VALUE), tree, ) continue elif tree_event == _EVENT_END: if elem in tree: del tree[elem] elem.clear() if elem.getprevious() is not None: try: del elem.getparent()[0] except TypeError: pass if elem.tag.endswith(xes_constants.TAG_EVENT): self.reading_event = False if self.acceptance_condition(self.event): for attr in self.trace.attributes: self.event[ constants.CASE_ATTRIBUTE_PREFIX + attr ] = self.trace.attributes[attr] return self.event continue elif elem.tag.endswith(xes_constants.TAG_TRACE): self.reading_trace = False continue elif elem.tag.endswith(xes_constants.TAG_LOG): self.reading_log = False break
[docs] def apply(path, parameters=None): """ Creates a StreamingEventXesReader object Parameters --------------- path Path parameters Parameters of the algorithm Returns --------------- stream_read_obj Stream reader object """ return StreamingEventXesReader(path, parameters=parameters)