Source code for pm4py.streaming.util.live_to_static_stream

from pm4py.streaming.algo.interface import StreamingAlgorithm
from pm4py.objects.log.obj import EventStream


[docs] class LiveToStaticStream(StreamingAlgorithm): static_stream = None def __init__(self): self.static_stream = EventStream() StreamingAlgorithm.__init__(self) def _process(self, event): self.static_stream.append(event) def _current_result(self): return self.static_stream