Source code for pm4py.streaming.algo.interface
import abc
from threading import Lock
import traceback
[docs]
class StreamingAlgorithm(abc.ABC):
def __init__(self, parameters=None):
self._lock = Lock()
@abc.abstractmethod
def _process(self, event):
pass
@abc.abstractmethod
def _current_result(self):
pass
[docs]
def get(self):
self._lock.acquire()
try:
ret = self._current_result()
except BaseException:
traceback.print_exc()
ret = None
self._lock.release()
return ret
[docs]
def receive(self, event):
self._lock.acquire()
try:
self._process(event)
except BaseException:
traceback.print_exc()
self._lock.release()