Streaming Process Mining

Streaming Package: General Structure

In PM4Py, we provide support for streaming process mining functionalities, including:

  • Streaming process discovery (Directly-Follows Graph)
  • Streaming conformance checking (Token-Based Replay)
  • Streaming conformance checking (Footprints)
  • Streaming import of XES/CSV files

Event stream management is handled by the pm4py.streaming.stream.live_event_stream.LiveEventStream class, which offers two key methods:

  • register(algo): Registers a new algorithm to the live event stream, notifying it whenever a new event is added.
  • append(event): Appends a new event to the live event stream.

The LiveEventStream class processes incoming events using a thread pool, enabling the system to manage high volumes of events across multiple threads.

Streaming algorithms registered to the LiveEventStream must implement the following methods:

  • _process(event): Handles and processes incoming events.
  • _current_result(): Returns the current state of the streaming algorithm.

Streaming Process Discovery (Directly-Follows Graph)

The following example demonstrates how to discover a Directly-Follows Graph (DFG) from a stream of events. First, define the (live) event stream:

Next, create the streaming DFG discovery object:

Register the DFG discovery object to the stream:

Start the live stream:

Import a known XES log:

Convert the log into a static event stream:

Add all events from the static stream to the live stream:

After stopping the stream, ensure that all events have been fully processed:

Finally, retrieve the Directly-Follows Graph, including activities, start, and end activities:

Running print(dfg) on the running-example.xes log produces the following:

                    
                    {('register request', 'examine casually'): 3, ('examine casually', 'check ticket'): 4, ...}
                    
                    

Streaming Conformance Checking (Token-Based Replay)

The following example shows how to perform conformance checking using Token-Based Replay (TBR) on a stream of events. We assume you are working with the running-example.xes log and a process model discovered with the Inductive Miner (default noise threshold: 0.2).

First, import the running-example.xes log:

Convert the log into a static event stream:

Discover a process tree using the Inductive Miner:

Convert the process tree into a Petri net:

Create a live event stream and initialize the streaming TBR algorithm:

Register the TBR algorithm to the live stream and start it:

Add events to the live event stream:

Stop the stream after processing:

Retrieve statistics on replay execution (e.g., number of missing tokens) as a Pandas dataframe. This method can be called during streaming to get up-to-date results:

The following customizable methods print warnings during replay (can be overridden to send alerts, e.g., via email):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_missing_tokens
  • message_case_not_in_dictionary
  • message_final_marking_not_reached

Streaming Conformance Checking (Footprints)

Footprints are another conformance checking method offered in PM4Py, which can be applied in the context of streaming events. Below, we demonstrate an application of streaming footprints. First, we discover the footprints from the process model:

    

Next, we create the live event stream:

    

Then, we create the streaming footprints object:

    

Afterward, we register it to the live event stream:

    

We can then start the live event stream:

    

Subsequently, we append every event of the original log to the live event stream:

    

Finally, we stop the live event stream:

    

And retrieve the statistics of the conformance checking:

    

Additionally, the following methods are available within the streaming footprints module to print warnings during the replay. These methods can easily be overridden (for example, to send notifications via email):

  • message_case_or_activity_not_in_event
  • message_activity_not_possible
  • message_footprints_not_possible
  • message_start_activity_not_possible
  • message_end_activity_not_possible
  • message_case_not_in_dictionary

Streaming Conformance Checking (Temporal Profile)

PM4Py also includes an implementation of the temporal profile model, described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. "Temporal Conformance Checking at Runtime based on Time-infused Process Models." arXiv preprint arXiv:2008.07262 (2020).

A temporal profile measures, for every pair of activities in the log, the average time and standard deviation between events associated with those activities. The time is calculated from the completion of the first event to the start of the second event. Thus, it assumes working with an interval log where events have both a start and end timestamp. The output of temporal profile discovery is a dictionary where each activity pair (expressed as a tuple) is associated with a pair of numbers: the average time and the standard deviation.

It is possible to use a temporal profile to perform conformance checking on an event log. The time between pairs of activities in the log is evaluated against the stored profile values. Specifically, a score is computed indicating how many standard deviations the observed value deviates from the average. If this score exceeds a threshold (by default set to 6, according to Six Sigma principles), the pair is flagged.

In PM4Py, we provide a streaming conformance checking algorithm based on the temporal profile. This algorithm checks each incoming event against all previous events in the case, identifying deviations according to the temporal profile. Below is an example where a temporal profile is discovered, the streaming conformance checker is set up, and a log is replayed on the stream.

First, we load an event log and apply the discovery algorithm:

    

We then create the stream, register the temporal conformance checking algorithm, and start the stream. The conformance checker can be initialized with several parameters:

    
Parameter KeyTypeDefaultDescription
Parameters.CASE_ID_KEYstringcase:concept:nameThe attribute used as the case ID.
Parameters.ACTIVITY_KEYstringconcept:nameThe attribute used as the activity name.
Parameters.START_TIMESTAMP_KEYstringstart_timestampThe attribute used as the start timestamp.
Parameters.TIMESTAMP_KEYstringtime:timestampThe attribute used as the end timestamp.
Parameters.ZETAint6Multiplier for the standard deviation. Pairs of events exceeding this distance are flagged by the temporal profile.

Next, we send the events from the log to the stream:

    

During the execution of the streaming temporal profile conformance checker, warnings are printed if a pair of events violates the temporal profile constraints. Furthermore, it is possible to retrieve a dictionary containing all cases with their associated deviations. The following code retrieves the results of the streaming temporal profile conformance checking:

    

Streaming Importer (XES Trace-by-Trace)

To process traces from a XES event log that might not fit into memory, or when a sample of a large log is needed, the XES trace-by-trace streaming importer can be used. The importer works by simply providing the path to the log:

    

It is then possible to iterate over the traces of the log (which are read one trace at a time):

    

Streaming Importer (XES Event-by-Event)

To process events from a XES event log that might not fit into memory, or when a sample of a large log is needed, the XES event-by-event streaming importer can be used. In this case, individual events within the traces are picked during iteration. The importer works by simply providing the path to the log:

    

It is then possible to iterate over the single events of the log (which are read during iteration):

    

Streaming Importer (CSV Event-by-Event)

To process events from a CSV event log that might not fit into memory, or when a sample of a large log is needed, using Pandas may not be feasible. In this case, individual rows of the CSV file are parsed during iteration. The importer can be used by simply providing the path to the CSV log:

    

It is then possible to iterate over the individual events of the log (read during iteration):

    

OCEL Streaming

We offer support for streaming on OCEL. Current support includes:

  • Iterating over the events of an OCEL,
  • Listening to OCELs to direct them to traditional event listeners.

Events of an OCEL can be iterated as follows:

    

A complete example is shown below, where we take an OCEL, instantiate two event streams for the order and element object types respectively, and push the flattened events to them. The two event listeners are attached with a printer, such that each flattened event is printed on the screen when received.