Source code for pm4py.streaming.util.live_to_static_stream
from pm4py.streaming.algo.interface import StreamingAlgorithm
from pm4py.objects.log.obj import EventStream
[docs]
class LiveToStaticStream(StreamingAlgorithm):
static_stream = None
def __init__(self):
self.static_stream = EventStream()
StreamingAlgorithm.__init__(self)
def _process(self, event):
self.static_stream.append(event)
def _current_result(self):
return self.static_stream