In PM4Py, we provide support for streaming process mining functionalities, including:
Event stream management is handled by the pm4py.streaming.stream.live_event_stream.LiveEventStream
class, which offers two key methods:
register(algo)
: Registers a new algorithm to the live event stream, notifying it whenever a new event is added. append(event)
: Appends a new event to the live event stream. The LiveEventStream
class processes incoming events using a thread pool, enabling the system to manage high volumes of events across multiple threads.
Streaming algorithms registered to the LiveEventStream
must implement the following methods:
_process(event)
: Handles and processes incoming events._current_result()
: Returns the current state of the streaming algorithm.The following example demonstrates how to discover a Directly-Follows Graph (DFG) from a stream of events. First, define the (live) event stream:
Next, create the streaming DFG discovery object:
Register the DFG discovery object to the stream:
Start the live stream:
Import a known XES log:
Convert the log into a static event stream:
Add all events from the static stream to the live stream:
After stopping the stream, ensure that all events have been fully processed:
Finally, retrieve the Directly-Follows Graph, including activities, start, and end activities:
Running print(dfg)
on the running-example.xes
log produces the following:
{('register request', 'examine casually'): 3, ('examine casually', 'check ticket'): 4, ...}
The following example shows how to perform conformance checking using Token-Based Replay (TBR) on a stream of events. We assume you are working with the running-example.xes
log and a process model discovered with the Inductive Miner (default noise threshold: 0.2).
First, import the running-example.xes
log:
Convert the log into a static event stream:
Discover a process tree using the Inductive Miner:
Convert the process tree into a Petri net:
Create a live event stream and initialize the streaming TBR algorithm:
Register the TBR algorithm to the live stream and start it:
Add events to the live event stream:
Stop the stream after processing:
Retrieve statistics on replay execution (e.g., number of missing tokens) as a Pandas dataframe. This method can be called during streaming to get up-to-date results:
The following customizable methods print warnings during replay (can be overridden to send alerts, e.g., via email):
message_case_or_activity_not_in_event
message_activity_not_possible
message_missing_tokens
message_case_not_in_dictionary
message_final_marking_not_reached
Footprints are another conformance checking method offered in PM4Py, which can be applied in the context of streaming events. Below, we demonstrate an application of streaming footprints. First, we discover the footprints from the process model:
Next, we create the live event stream:
Then, we create the streaming footprints object:
Afterward, we register it to the live event stream:
We can then start the live event stream:
Subsequently, we append every event of the original log to the live event stream:
Finally, we stop the live event stream:
And retrieve the statistics of the conformance checking:
Additionally, the following methods are available within the streaming footprints module to print warnings during the replay. These methods can easily be overridden (for example, to send notifications via email):
message_case_or_activity_not_in_event
message_activity_not_possible
message_footprints_not_possible
message_start_activity_not_possible
message_end_activity_not_possible
message_case_not_in_dictionary
PM4Py also includes an implementation of the temporal profile model, described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).
A temporal profile measures, for every pair of activities in the log, the average time and standard deviation between events associated with those activities. The time is calculated from the completion of the first event to the start of the second event. Thus, it assumes working with an interval log where events have both a start and end timestamp. The output of temporal profile discovery is a dictionary where each activity pair (expressed as a tuple) is associated with a pair of numbers: the average time and the standard deviation.
It is possible to use a temporal profile to perform conformance checking on an event log. The time between pairs of activities in the log is evaluated against the stored profile values. Specifically, a score is computed indicating how many standard deviations the observed value deviates from the average. If this score exceeds a threshold (by default set to 6, according to Six Sigma principles), the pair is flagged.
In PM4Py, we provide a streaming conformance checking algorithm based on the temporal profile. This algorithm checks each incoming event against all previous events in the case, identifying deviations according to the temporal profile. Below is an example where a temporal profile is discovered, the streaming conformance checker is set up, and a log is replayed on the stream.
First, we load an event log and apply the discovery algorithm:
We then create the stream, register the temporal conformance checking algorithm, and start the stream. The conformance checker can be initialized with several parameters:
Parameter Key | Type | Default | Description |
---|---|---|---|
Parameters.CASE_ID_KEY | string | case:concept:name | The attribute used as the case ID. |
Parameters.ACTIVITY_KEY | string | concept:name | The attribute used as the activity name. |
Parameters.START_TIMESTAMP_KEY | string | start_timestamp | The attribute used as the start timestamp. |
Parameters.TIMESTAMP_KEY | string | time:timestamp | The attribute used as the end timestamp. |
Parameters.ZETA | int | 6 | Multiplier for the standard deviation. Pairs of events exceeding this distance are flagged by the temporal profile. |
Next, we send the events from the log to the stream:
During the execution of the streaming temporal profile conformance checker, warnings are printed if a pair of events violates the temporal profile constraints. Furthermore, it is possible to retrieve a dictionary containing all cases with their associated deviations. The following code retrieves the results of the streaming temporal profile conformance checking:
To process traces from a XES event log that might not fit into memory, or when a sample of a large log is needed, the XES trace-by-trace streaming importer can be used. The importer works by simply providing the path to the log:
It is then possible to iterate over the traces of the log (which are read one trace at a time):
To process events from a XES event log that might not fit into memory, or when a sample of a large log is needed, the XES event-by-event streaming importer can be used. In this case, individual events within the traces are picked during iteration. The importer works by simply providing the path to the log:
It is then possible to iterate over the single events of the log (which are read during iteration):
To process events from a CSV event log that might not fit into memory, or when a sample of a large log is needed, using Pandas may not be feasible. In this case, individual rows of the CSV file are parsed during iteration. The importer can be used by simply providing the path to the CSV log:
It is then possible to iterate over the individual events of the log (read during iteration):
We offer support for streaming on OCEL. Current support includes:
Events of an OCEL can be iterated as follows:
A complete example is shown below, where we take an OCEL, instantiate two event streams for the order
and element
object types respectively, and push the flattened events to them. The two event listeners are attached with a printer, such that each flattened event is printed on the screen when received.