pm4py package#

Process mining for Python

Subpackages#

Submodules#

pm4py.analysis module#

pm4py.analysis.construct_synchronous_product_net(trace: Trace, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) Tuple[PetriNet, Marking, Marking][source]#

constructs the synchronous product net between a trace and a Petri net process model.

Parameters:
  • trace (Trace) – trace of an event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
log = pm4py.read_xes('log.xes')
sync_net, sync_im, sync_fm = pm4py.construct_synchronous_product_net(log[0], net, im, fm)

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.compute_emd(language1: Dict[List[str], float], language2: Dict[List[str], float]) float[source]#

Computes the earth mover distance between two stochastic languages (for example, the first extracted from the log, and the second extracted from the process model.

Parameters:
  • language1 – (first) stochastic language

  • language2 – (second) stochastic language

Return type:

float

import pm4py

log = pm4py.read_xes('tests/input_data/running-example.xes')
language_log = pm4py.get_stochastic_language(log)
print(language_log)
net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
language_model = pm4py.get_stochastic_language(net, im, fm)
print(language_model)
emd_distance = pm4py.compute_emd(language_log, language_model)
print(emd_distance)
pm4py.analysis.solve_marking_equation(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, cost_function: Dict[Transition, float] = None) float[source]#

Solves the marking equation of a Petri net. The marking equation is solved as an ILP problem. An optional transition-based cost function to minimize can be provided as well.

Parameters:
  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • cost_function – optional cost function to use when solving the marking equation

Return type:

float

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
heuristic = pm4py.solve_marking_equation(net, im, fm)
pm4py.analysis.solve_extended_marking_equation(trace: Trace, sync_net: PetriNet, sync_im: Marking, sync_fm: Marking, split_points: List[int] | None = None) float[source]#

Gets an heuristics value (underestimation of the cost of an alignment) between a trace and a synchronous product net using the extended marking equation with the standard cost function (e.g. sync moves get cost equal to 0, invisible moves get cost equal to 1, other move on model / move on log get cost equal to 10000), with an optimal provisioning of the split points

Parameters:
  • trace (Trace) – trace

  • sync_net (PetriNet) – synchronous product net

  • sync_im (Marking) – initial marking (of the sync net)

  • sync_fm (Marking) – final marking (of the sync net)

  • split_points – if specified, the indexes of the events of the trace to be used as split points. If not specified, the split points are identified automatically.

Return type:

float

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
log = pm4py.read_xes('log.xes')
ext_mark_eq_heu = pm4py.solve_extended_marking_equation(log[0], net, im, fm)

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.analysis.check_soundness(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, print_diagnostics: bool = False) Tuple[bool, Dict[str, Any]][source]#

Check if a given Petri net is a sound WF-net. A Petri net is a WF-net iff:

  • it has a unique source place

  • it has a unique end place

  • every element in the WF-net is on a path from the source to the sink place

A WF-net is sound iff:
  • it contains no live-locks

  • it contains no deadlocks

  • we are able to always reach the final marking

For a formal definition of sound WF-net, consider: http://www.padsweb.rwth-aachen.de/wvdaalst/publications/p628.pdf In the returned object, the first element is a boolean indicating if the Petri net is a sound workflow net. The second element is a set of diagnostics collected while running WOFLAN (expressed as a dictionary associating the keys [name of the diagnostics] with the corresponding diagnostics).

Parameters:
  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • print_diagnostics (bool) – boolean value that sets up additional prints during the execution of WOFLAN

Return type:

Tuple[bool, Dict[str, Any]]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
is_sound = pm4py.check_soundness(net, im, fm)
pm4py.analysis.cluster_log(log: EventLog | EventStream | DataFrame, sklearn_clusterer=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Generator[EventLog, None, None][source]#

Apply clustering to the provided event log (method based on the extraction of profiles for the traces of the event log) based on a Scikit-Learn clusterer (default: K-means with two clusters)

Parameters:
  • log – log object

  • sklearn_clusterer – the Scikit-Learn clusterer to be used (default: KMeans(n_clusters=2, random_state=0, n_init=”auto”))

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Generator[pd.DataFrame, None, None]

import pm4py

for clust_log in pm4py.cluster_log(df):
    print(clust_log)
pm4py.analysis.insert_artificial_start_end(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', artificial_start='▶', artificial_end='■') EventLog | DataFrame[source]#

Inserts the artificial start/end activities in an event log / Pandas dataframe

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • artificial_start (str) – the symbol to be used as artificial start activity

  • artificial_end (str) – the symbol to be used as artificial end activity

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

dataframe = pm4py.insert_artificial_start_end(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.analysis.insert_case_service_waiting_time(log: EventLog | DataFrame, service_time_column: str = '@@service_time', sojourn_time_column: str = '@@sojourn_time', waiting_time_column: str = '@@waiting_time', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts the service/waiting/sojourn times of the case in the dataframe.

Parameters:
  • log – event log / Pandas dataframe

  • service_time_column (str) – column to be used for the service time

  • sojourn_time_column (str) – column to be used for the sojourn time

  • waiting_time_column (str) – column to be used for the waiting time

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • start_timestamp_key (str) – attribute to be used as start timestamp

Return type:

pd.DataFrame

import pm4py

dataframe = pm4py.insert_case_service_waiting_time(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
pm4py.analysis.insert_case_arrival_finish_rate(log: EventLog | DataFrame, arrival_rate_column='@@arrival_rate', finish_rate_column='@@finish_rate', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts the arrival/finish rates of the case in the dataframe. The arrival rate is computed as the difference between the start time of the case and the start time of the previous case to start. The finish rate is computed as the difference between the end time of the case and the end time of the next case to end.

Parameters:
  • log – event log / Pandas dataframe

  • arrival_rate_column (str) – column to be used for the arrival rate

  • finish_rate_column (str) – column to be used for the finish rate

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • start_timestamp_key (str) – attribute to be used as start timestamp

Return type:

pd.DataFrame

import pm4py

dataframe = pm4py.insert_case_arrival_finish_rate(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
pm4py.analysis.check_is_workflow_net(net: PetriNet) bool[source]#

Checks if the input Petri net satisfies the WF-net conditions: 1. unique source place 2. unique sink place 3. every node is on a path from the source to the sink

Parameters:

net (PetriNet) – petri net

Return type:

bool

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
is_wfnet = pm4py.check_is_workflow_net(net, im, fm)
pm4py.analysis.maximal_decomposition(net: PetriNet, im: Marking, fm: Marking) List[Tuple[PetriNet, Marking, Marking]][source]#

Calculate the maximal decomposition of an accepting Petri net.

Parameters:
Return type:

List[Tuple[PetriNet, Marking, Marking]]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
list_nets = pm4py.maximal_decomposition(net, im, fm)
for anet in list_nets:
    subnet, subim, subfm = anet
    pm4py.view_petri_net(subnet, subim, subfm, format='svg')
pm4py.analysis.simplicity_petri_net(net: PetriNet, im: Marking, fm: Marking, variant: str | None = 'arc_degree') float[source]#

Computes the simplicity metric for a given Petri net model.

The three available approaches are: - Arc degree simplicity: described in the paper Vázquez-Barreiros, Borja, Manuel Mucientes, and Manuel Lama. “ProDiGen: Mining complete, precise and minimal structure process models with a genetic algorithm.” Information Sciences 294 (2015): 315-333. - Extended cardoso metric: described in the paper “Complexity Metrics for Workflow Nets” Lassen, Kristian Bisgaard, and Wil MP van der Aalst - Extended cyclomatic metric: described in the paper “Complexity Metrics for Workflow Nets” Lassen, Kristian Bisgaard, and Wil MP van der Aalst

Parameters:
  • net (PetriNet) – petri net

  • im (Marking) – initial marking

  • fm (Marking) – final marking

  • variant – variant to be used (‘arc_degree’, ‘extended_cardoso’, ‘extended_cyclomatic’)

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
simplicity = pm4py.simplicity_petri_net(net, im, fm, variant='arc_degree')
pm4py.analysis.generate_marking(net: PetriNet, place_or_dct_places: str | Place | Dict[str, int] | Dict[Place, int]) Marking[source]#

Generate a marking for a given Petri net

Parameters:
  • net (PetriNet) – petri net

  • place_or_dct_places – place, or dictionary of places, to be used in the marking. Possible values: single Place object for the marking; name of the place for the marking; dictionary associating to each place its number of tokens; dictionary associating to names of places a number of tokens.

Return type:

Marking

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
marking = pm4py.generate_marking(net, {'source': 2})
pm4py.analysis.reduce_petri_net_invisibles(net: PetriNet) PetriNet[source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:

net (PetriNet) – petri net

Return type:

PetriNet

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
net = pm4py.reduce_petri_net_invisibles(net)
pm4py.analysis.reduce_petri_net_implicit_places(net: PetriNet, im: Marking, fm: Marking) Tuple[PetriNet, Marking, Marking][source]#

Reduce the number of invisibles transitions in the provided Petri net.

Parameters:
Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.read_pnml('model.pnml')
net = pm4py.reduce_petri_net_implicit_places(net, im, fm)
pm4py.analysis.get_enabled_transitions(net: PetriNet, marking: Marking) Set[Transition][source]#

Gets the transitions enabled in a given marking

Parameters:
Return type:

Set[PetriNet.Transition]

import pm4py

net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
# gets the transitions enabled in the initial marking
enabled_transitions = pm4py.get_enabled_transitions(net, im)

pm4py.cli module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.cli.cli_interface()[source]#

pm4py.conformance module#

The pm4py.conformance module contains the conformance checking algorithms implemented in pm4py

pm4py.conformance.conformance_diagnostics_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False, opt_parameters: Dict[Any, Any] | None = None) List[Dict[str, Any]][source]#

Apply token-based replay for conformance checking analysis. The methods return the full token-based-replay diagnostics.

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:

  • trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.

  • activated_transitions: list of transitions activated in the model by the token-based replay.

  • reached_marking: marking reached at the end of the replay.

  • missing_tokens: number of missing tokens.

  • consumed_tokens: number of consumed tokens.

  • remaining_tokens: number of remaining tokens.

  • produced_tokens: number of produced tokens.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • return_diagnostics_dataframe (bool) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

  • opt_parameters – optional parameters of the token-based replay, including: * reach_mark_through_hidden: boolean value that decides if we shall try to reach the final marking through hidden transitions * stop_immediately_unfit: boolean value that decides if we shall stop immediately when a non-conformance is detected * walk_through_hidden_trans: boolean value that decides if we shall walk through hidden transitions in order to enable visible transitions * places_shortest_path_by_hidden: shortest paths between places by hidden transitions * is_reduction: expresses if the token-based replay is called in a reduction attempt * thread_maximum_ex_time: alignment threads maximum allowed execution time * cleaning_token_flood: decides if a cleaning of the token flood shall be operated * disable_variants: disable variants grouping * return_object_names: decides whether names instead of object pointers shall be returned

Return type:

List[Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
tbr_diagnostics = pm4py.conformance_diagnostics_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.conformance_diagnostics_alignments(log: EventLog | DataFrame, *args, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', variant_str: str | None = None, return_diagnostics_dataframe: bool = False, **kwargs) List[Dict[str, Any]][source]#

Apply the alignments algorithm between a log and a process model. The methods return the full alignment diagnostics.

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

With each trace, a dictionary containing among the others the following information is associated:

alignment: contains the alignment (sync moves, moves on log, moves on model) cost: contains the cost of the alignment according to the provided cost function fitness: is equal to 1 if the trace is perfectly fitting.

Parameters:
  • log – event log

  • args – specification of the process model

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • variant_str – variant specification (for Petri net alignments)

  • return_diagnostics_dataframe (bool) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

List[Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
alignments_diagnostics = pm4py.conformance_diagnostics_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.fitness_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float][source]#

Calculates the fitness using token-based replay. The fitness is calculated on a log-based level. The output dictionary contains the following keys: - perc_fit_traces (the percentage of fit traces (from 0.0 to 100.0)) - average_trace_fitness (between 0.0 and 1.0; computed as average of the trace fitnesses) - log_fitness (between 0.0 and 1.0) - percentage_of_fitting_traces (the percentage of fit traces (from 0.0 to 100.0)

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_tbr = pm4py.fitness_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.fitness_alignments(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', variant_str: str | None = None) Dict[str, float][source]#

Calculates the fitness using alignments The output dictionary contains the following keys: - average_trace_fitness (between 0.0 and 1.0; computed as average of the trace fitnesses) - log_fitness (between 0.0 and 1.0) - percentage_of_fitting_traces (the percentage of fit traces (from 0.0 to 100.0)

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.

For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • variant_str – variant specification

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_alignments = pm4py.fitness_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.precision_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision precision using token-based replay

Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.

In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.

The reference paper for the TBR-based precision (ETConformance) is: Muñoz-Gama, Jorge, and Josep Carmona. “A fresh look at precision in process conformance.” International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_tbr = pm4py.precision_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.precision_alignments(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Calculates the precision of the model w.r.t. the event log using alignments

Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:

  • Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.

  • Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.

  • Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
    • Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.

    • Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.

The reference paper for the alignments-based precision (Align-ETConformance) is: Adriansyah, Arya, et al. “Measuring precision of modeled behavior.” Information systems and e-Business Management 13.1 (2015): 37-67

In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • multi_processing (bool) – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_alignments = pm4py.precision_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.generalization_tbr(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float[source]#

Computes the generalization of the model (against the event log). The approach is described in the paper:

Buijs, Joos CAM, Boudewijn F. van Dongen, and Wil MP van der Aalst. “Quality dimensions in process discovery: The importance of fitness, precision, generalization and simplicity.” International Journal of Cooperative Information Systems 23.01 (2014): 1440001.

Parameters:
  • log – event log

  • petri_net (PetriNet) – petri net

  • initial_marking (Marking) – initial marking

  • final_marking (Marking) – final marking

  • multi_processing – boolean value that enables the multiprocessing

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
generalization_tbr = pm4py.generalization_tbr(dataframe, net, im, fm)
pm4py.conformance.replay_prefix_tbr(prefix: List[str], net: PetriNet, im: Marking, fm: Marking, activity_key: str = 'concept:name') Marking[source]#

Replays a prefix (list of activities) on a given accepting Petri net, using Token-Based Replay.

Parameters:
  • prefix – list of activities

  • net (PetriNet) – Petri net

  • im (Marking) – initial marking

  • fm (Marking) – final marking

  • activity_key (str) – attribute to be used as activity

Return type:

Marking

import pm4py

net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
marking = pm4py.replay_prefix_tbr(['register request', 'check ticket'], net, im, fm)
pm4py.conformance.conformance_diagnostics_footprints(*args) List[Dict[str, Any]] | Dict[str, Any][source]#

Provide conformance checking diagnostics using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

Union[List[Dict[str, Any]], Dict[str, Any]]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
footprints_diagnostics = pm4py.conformance_diagnostics_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.fitness_footprints(*args) Dict[str, float][source]#

Calculates fitness using footprints. The output is a dictionary containing two keys: - perc_fit_traces => percentage of fit traces (over the log) - log_fitness => the fitness value over the log

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

Dict[str, float]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
fitness_fp = pm4py.fitness_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.precision_footprints(*args) float[source]#

Calculates precision using footprints

Parameters:

args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).

Return type:

float

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
precision_fp = pm4py.precision_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release

pm4py.conformance.check_is_fitting(*args, activity_key='concept:name') bool[source]#

Checks if a trace object is fit against a process model

Parameters:

args – arguments (trace object; process model (process tree, petri net, BPMN))

Return type:

bool

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.conformance.conformance_temporal_profile(log: EventLog | DataFrame, temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], zeta: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[List[Tuple[float, float, float, float]]][source]#

Performs conformance checking on the provided log with the provided temporal profile. The result is a list of time-based deviations for every case. E.g. if the log on top of which the conformance is applied is the following (1 case): A (timestamp: 2000-01) B (timestamp: 2002-01) The difference between the timestamps of A and B is two years. If the temporal profile: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)} is specified, and zeta is set to 1, then the aforementioned case would be deviating (considering the couple of activities (‘A’, ‘B’)), because 2 years > 1.5 months + 0.5 months.

Parameters:
  • log – log object

  • temporal_profile – temporal profile. E.g., if the log has two cases: A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06); A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03); The temporal profile will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

  • zeta (float) – number of standard deviations allowed from the average. E.g. zeta=1 allows every timestamp between AVERAGE-STDEV and AVERAGE+STDEV.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • return_diagnostics_dataframe (bool) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

List[List[Tuple[float, float, float, float]]]

import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_temporal_profile = pm4py.conformance_temporal_profile(dataframe, temporal_profile, zeta=1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.conformance.conformance_declare(log: EventLog | DataFrame, declare_model: Dict[str, Dict[Any, Dict[str, int]]], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Dict[str, Any]][source]#

Applies conformance checking against a DECLARE model.

Reference paper: F. M. Maggi, A. J. Mooij and W. M. P. van der Aalst, “User-guided discovery of declarative process models,” 2011 IEEE Symposium on Computational Intelligence and Data Mining (CIDM), Paris, France, 2011, pp. 192-199, doi: 10.1109/CIDM.2011.5949297.

Parameters:
  • log – event log

  • declare_model – DECLARE model

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • return_diagnostics_dataframe (bool) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

List[Dict[str, Any]]

import pm4py

log = pm4py.read_xes("C:/receipt.xes")
declare_model = pm4py.discover_declare(log)
conf_result = pm4py.conformance_declare(log, declare_model)
pm4py.conformance.conformance_log_skeleton(log: EventLog | DataFrame, log_skeleton: Dict[str, Any], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Set[Any]][source]#

Performs conformance checking using the log skeleton

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

  • “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

    in the history of the case. For example, ‘C should always be preceded by A’

  • “always_after”: specifies that some activities should always trigger the execution of some other activities

    in the future history of the case. For example, ‘A should always be followed by C’

  • “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

    a case. For example, ‘B and C should always happen the same number of times’.

  • “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

    For example, ‘there should be no case containing both C and D’.

  • “activ_occurrences”: specifies the allowed number of occurrences per activity:

    E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Parameters:
  • log – log object

  • log_skeleton – log skeleton object, expressed as dictionaries of the six constraints (never_together, always_before …) along with the discovered rules.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • return_diagnostics_dataframe (bool) – if possible, returns a dataframe with the diagnostics (instead of the usual output)

Return type:

List[Set[Any]]

import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
conformance_lsk = pm4py.conformance_log_skeleton(dataframe, log_skeleton, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

pm4py.connectors module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.connectors.extract_log_outlook_mails() DataFrame[source]#

Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer.

CASE ID (case:concept:name) => identifier of the conversation ACTIVITY (concept:name) => activity that is performed in the current item (send e-mail, receive e-mail,

refuse meeting …)

TIMESTAMP (time:timestamp) => timestamp of creation of the item in Outlook RESOURCE (org:resource) => sender of the current item

See also: * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.mailitem?redirectedfrom=MSDN&view=outlook-pia#properties_ * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.olobjectclass?view=outlook-pia

Return type:

pd.DataFrame

pm4py.connectors.extract_log_outlook_calendar(email_user: str | None = None, calendar_id: int = 9) DataFrame[source]#

Extracts the history of the calendar events (creation, update, start, end) in a Pandas dataframe from the local Outlook instance running on the current computer.

CASE ID (case:concept:name) => identifier of the meeting ACTIVITY (concept:name) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (time:timestamp) => the timestamp of the event case:subject => the subject of the meeting

Parameters:
  • email_user – (optional) e-mail address from which the (shared) calendar should be extracted

  • calendar_id (int) – identifier of the calendar for the given user (default: 9)

Return type:

pd.DataFrame

pm4py.connectors.extract_log_windows_events() DataFrame[source]#

Extract a process mining dataframe from all the events recorded in the Windows registry.

CASE ID (case:concept:name) => name of the computer emitting the events. ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier

TIMESTAMP (time:timestamp) => timestamp of generation of the event RESOURCE (org:resource) => username involved in the event

Return type:

pd.DataFrame

pm4py.connectors.extract_log_chrome_history(history_db_path: str | None = None) DataFrame[source]#

Extracts a dataframe containing the navigation history of Google Chrome. Please keep Google Chrome history closed when extracting.

CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit

Parameters:

history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)

Return type:

pd.DataFrame

pm4py.connectors.extract_log_firefox_history(history_db_path: str | None = None) DataFrame[source]#

Extracts a dataframe containing the navigation history of Mozilla Firefox. Please keep Google Chrome history closed when extracting.

CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit

Parameters:

history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)

Return type:

pd.DataFrame

pm4py.connectors.extract_log_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: str | None = None) DataFrame[source]#

Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.

Parameters:
  • owner (str) – owner of the repository (e.g., pm4py)

  • repo (str) – name of the repository (e.g., pm4py-core)

  • auth_token – authorization token

Return type:

pd.DataFrame

pm4py.connectors.extract_log_camunda_workflow(connection_string: str) DataFrame[source]#

Extracts a dataframe from the Camunda workflow system. Aside from the traditional columns, the processID of the process in Camunda is returned.

Parameters:

connection_string (str) – ODBC connection string to the Camunda database

Return type:

pd.DataFrame

pm4py.connectors.extract_log_sap_o2c(connection_string: str, prefix: str = '') DataFrame[source]#

Extracts a dataframe for the SAP O2C process.

Parameters:
  • connection_string (str) – ODBC connection string to the SAP database

  • prefix (str) – prefix for the tables (example: SAPSR3.)

Return type:

pd.DataFrame

pm4py.connectors.extract_log_sap_accounting(connection_string: str, prefix: str = '') DataFrame[source]#

Extracts a dataframe for the SAP Accounting process.

Parameters:
  • connection_string (str) – ODBC connection string to the SAP database

  • prefix (str) – prefix for the tables (example: SAPSR3.)

Return type:

pd.DataFrame

pm4py.connectors.extract_ocel_outlook_mails() OCEL[source]#

Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer as an object-centric event log.

ACTIVITY (ocel:activity) => activity that is performed in the current item (send e-mail, receive e-mail,

refuse meeting …)

TIMESTAMP (ocel:timestamp) => timestamp of creation of the item in Outlook

Object types: - org:resource => the snder of the mail - recipients => the list of recipients of the mail - topic => the topic of the discussion

See also: * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.mailitem?redirectedfrom=MSDN&view=outlook-pia#properties_ * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.olobjectclass?view=outlook-pia

Return type:

OCEL

pm4py.connectors.extract_ocel_outlook_calendar(email_user: str | None = None, calendar_id: int = 9) OCEL[source]#

Extracts the history of the calendar events (creation, update, start, end) as an object-centric event log from the local Outlook instance running on the current computer.

ACTIVITY (ocel:activity) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (ocel:timestamp) => the timestamp of the event

Object types: - case:concept:name => identifier of the meeting - case:subject => the subject of the meeting

Parameters:
  • email_user – (optional) e-mail address from which the (shared) calendar should be extracted

  • calendar_id (int) – identifier of the calendar for the given user (default: 9)

Return type:

OCEL

pm4py.connectors.extract_ocel_windows_events() OCEL[source]#

Extract a process mining dataframe from all the events recorded in the Windows registry as an object-centric event log.

ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier

(see https://learn.microsoft.com/en-us/previous-versions/windows/desktop/eventlogprov/win32-ntlogevent)

TIMESTAMP (time:timestamp) => timestamp of generation of the event

Object types: - categoryString: translation of the subcategory. The translation is source-specific. - computerName: name of the computer that generated this event. - eventIdentifier: identifier of the event. This is specific to the source that generated the event log entry. - eventType: 1=Error; 2=Warning; 3=Information; 4=Security Audit Success;5=Security Audit Failure; - sourceName: name of the source (application, service, driver, or subsystem) that generated the entry. - user: user name of the logged-on user when the event occurred. If the user name cannot be determined, this will be NULL.

Return type:

OCEL

pm4py.connectors.extract_ocel_chrome_history(history_db_path: str | None = None) OCEL[source]#

Extracts an object-centric event log containing the navigation history of Google Chrome. Please keep Google Chrome history closed when extracting.

ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit

Object Types: - case:concept:name : the profile of Chrome that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited

Parameters:

history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)

Return type:

OCEL

pm4py.connectors.extract_ocel_firefox_history(history_db_path: str | None = None) OCEL[source]#

Extracts an object-centric event log containing the navigation history of Mozilla Firefox. Please keep Mozilla Firefox history closed when extracting.

ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit

Object Types: - case:concept:name : the profile of Firefox that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited

Parameters:

history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)

Return type:

OCEL

pm4py.connectors.extract_ocel_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: str | None = None) OCEL[source]#

Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.

ACTIVITY (ocel:activity) => the event (created, commented, closed, subscribed …) TIMESTAMP (ocel:timestamp) => the timestamp of execution of the event

Object types: - case:concept:name => the URL of the events related to the issue - org:resource => the involved resource - case:repo => the repository in which the issue is created

Parameters:
  • owner (str) – owner of the repository (e.g., pm4py)

  • repo (str) – name of the repository (e.g., pm4py-core)

  • auth_token – authorization token

Return type:

OCEL

pm4py.connectors.extract_ocel_camunda_workflow(connection_string: str) OCEL[source]#

Extracts an object-centric event log from the Camunda workflow system.

Parameters:

connection_string (str) – ODBC connection string to the Camunda database

Return type:

pd.DataFrame

pm4py.connectors.extract_ocel_sap_o2c(connection_string: str, prefix: str = '') OCEL[source]#

Extracts an object-centric event log for the SAP O2C process.

Parameters:
  • connection_string (str) – ODBC connection string to the SAP database

  • prefix (str) – prefix for the tables (example: SAPSR3.)

Return type:

pd.DataFrame

pm4py.connectors.extract_ocel_sap_accounting(connection_string: str, prefix: str = '') OCEL[source]#

Extracts an object-centric event log for the SAP Accounting process.

Parameters:
  • connection_string (str) – ODBC connection string to the SAP database

  • prefix (str) – prefix for the tables (example: SAPSR3.)

Return type:

pd.DataFrame

pm4py.convert module#

The pm4py.convert module contains the cross-conversions implemented in pm4py

pm4py.convert.convert_to_event_log(obj: DataFrame | EventStream, case_id_key: str = 'case:concept:name', **kwargs) EventLog[source]#

Converts a DataFrame/EventStream object to an event log object

Parameters:
  • obj – DataFrame or EventStream object

  • case_id_key (str) – attribute to be used as case identifier

Return type:

EventLog

import pandas as pd
import pm4py

dataframe = pm4py.read_csv("tests/input_data/running-example.csv")
dataframe = pm4py.format_dataframe(dataframe, case_id_column='case:concept:name', activity_column='concept:name', timestamp_column='time:timestamp')
log = pm4py.convert_to_event_log(dataframe)
pm4py.convert.convert_to_event_stream(obj: EventLog | DataFrame, case_id_key: str = 'case:concept:name', **kwargs) EventStream[source]#

Converts a log object to an event stream

Parameters:
  • obj – log object

  • case_id_key (str) – attribute to be used as case identifier

Return type:

EventStream

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")
event_stream = pm4py.convert_to_event_stream(log)
pm4py.convert.convert_to_dataframe(obj: EventStream | EventLog, **kwargs) DataFrame[source]#

Converts a log object to a dataframe

Parameters:

obj – log object

Return type:

pd.DataFrame

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")
dataframe = pm4py.convert_to_dataframe(log)
pm4py.convert.convert_to_bpmn(*args: Tuple[PetriNet, Marking, Marking] | ProcessTree) BPMN[source]#

Converts an object to a BPMN diagram. As an input, either a Petri net (with corresponding initial and final marking) or a process tree can be provided. A process tree can always be converted into a BPMN model and thus quality of the result object is guaranteed. For Petri nets, the quality of the converison largely depends on the net provided (e.g., sound WF-nets are likely to produce reasonable BPMN models)

Parameters:

args – petri net (with initial and final marking) or process tree

Return type:

BPMN

import pm4py

# import a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
bpmn_graph = pm4py.convert_to_bpmn(net, im, fm)
pm4py.convert.convert_to_petri_net(*args: BPMN | ProcessTree | HeuristicsNet | POWL | dict) Tuple[PetriNet, Marking, Marking][source]#

Converts an input model to an (accepting) Petri net. The input objects can either be a process tree, BPMN model or a Heuristic net. The output is a triple, containing the Petri net and the initial and final markings. The markings are only returned if they can be reasonable derived from the input model.

Parameters:

args – process tree, Heuristics net, BPMN or POWL model

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

# imports a process tree from a PTML file
process_tree = pm4py.read_ptml("tests/input_data/running-example.ptml")
net, im, fm = pm4py.convert_to_petri_net(process_tree)
pm4py.convert.convert_to_process_tree(*args: Tuple[PetriNet, Marking, Marking] | BPMN) ProcessTree[source]#

Converts an input model to a process tree. The input models can either be Petri nets (marked) or BPMN models. For both input types, the conversion is not guaranteed to work, hence, invocation of the method can yield an Exception.

Parameters:

args – petri net (along with initial and final marking) or BPMN

Return type:

ProcessTree

import pm4py

# imports a BPMN file
bpmn_graph = pm4py.read_bpmn("tests/input_data/running-example.bpmn")
# converts the BPMN to a process tree (through intermediate conversion to a Petri net)
process_tree = pm4py.convert_to_process_tree(bpmn_graph)
pm4py.convert.convert_to_reachability_graph(*args: Tuple[PetriNet, Marking, Marking] | BPMN | ProcessTree) TransitionSystem[source]#

Converts an input model to a reachability graph (transition system). The input models can either be Petri nets (with markings), BPMN models or process trees. The output is the state-space of the model (i.e., the reachability graph), enocdoed as a TransitionSystem object.

Parameters:

args – petri net (along with initial and final marking), process tree or BPMN

Return type:

TransitionSystem

import pm4py

# reads a Petri net from a file
net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml")
# converts it to reachability graph
reach_graph = pm4py.convert_to_reachability_graph(net, im, fm)
pm4py.convert.convert_log_to_ocel(log: EventLog | EventStream | DataFrame, activity_column: str = 'concept:name', timestamp_column: str = 'time:timestamp', object_types: Collection[str] | None = None, obj_separator: str = ' AND ', additional_event_attributes: Collection[str] | None = None, additional_object_attributes: Dict[str, Collection[str]] | None = None) OCEL[source]#

Converts an event log to an object-centric event log with one or more than one object types.

Parameters:
  • log_obj – log object

  • activity_column (str) – activity column

  • timestamp_column (str) – timestamp column

  • object_types – list of columns to consider as object types

  • obj_separator (str) – separator between different objects in the same column

  • additional_event_attributes – additional attributes to be considered as event attributes in the OCEL

  • additional_object_attributes – additional attributes per object type to be considered as object attributes in the OCEL (dictionary in which object types are associated to their attributes, i.e., {“order”: [“quantity”, “cost”], “invoice”: [“date”, “due date”]})

Return type:

OCEL

pm4py.convert.convert_ocel_to_networkx(ocel: OCEL, variant: str = 'ocel_to_nx') DiGraph[source]#

Converts an OCEL to a NetworkX DiGraph object.

Parameters:
  • ocel (OCEL) – object-centric event log

  • variant (str) – variant of the conversion to use: “ocel_to_nx” -> graph containing event and object IDS and two type of relations (REL=related objects, DF=directly-follows); “ocel_features_to_nx” -> graph containing different types of interconnection at the object level

Return type:

nx.DiGraph

pm4py.convert.convert_log_to_networkx(log: EventLog | EventStream | DataFrame, include_df: bool = True, case_id_key: str = 'concept:name', other_case_attributes_as_nodes: Collection[str] | None = None, event_attributes_as_nodes: Collection[str] | None = None) DiGraph[source]#

Converts an event log object to a NetworkX DiGraph object. The nodes of the graph are the events, the cases (and possibly the attributes of the log). The edges are: - Connecting each event to the corresponding case (BELONGS_TO type) - Connecting every event to the directly-following one (DF type, if enabled) - Connecting every case/event to the given attribute values (ATTRIBUTE_EDGE type)

Parameters:
  • log – log object (EventLog, EventStream, Pandas dataframe)

  • include_df (bool) – include the directly-follows graph relation in the graph (bool)

  • case_id_attribute – specify which attribute at the case level should be considered the case ID (str)

  • other_case_attributes_as_nodes – specify which attributes at the case level should be inserted in the graph as nodes (other than the caseID) (list, default empty)

  • event_attributes_as_nodes – specify which attributes at the event level should be inserted in the graph as nodes (list, default empty)

Return type:

nx.DiGraph

pm4py.convert.convert_log_to_time_intervals(log: EventLog | DataFrame, filter_activity_couple: Tuple[str, str] | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') List[List[Any]][source]#

Gets a list of intervals from an event log. Each interval contains two temporally consecutive events and measures the time between the two events (complete timestamp of the first against start timestamp of the second).

Parameters:
  • log – log object

  • filter_activity_couple – (optional) filters the intervals to only consider a given couple of activities of the log

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key (str) – the attribute to be used as case identifier

  • start_timestamp_key (str) – the attribute to be used as start timestamp

Return type:

List[List[Any]]

import pm4py

log = pm4py.read_xes('tests/input_data/receipt.xes')
time_intervals = pm4py.convert_log_to_time_intervals(log)
print(len(time_intervals))
time_intervals = pm4py.convert_log_to_time_intervals(log, ('Confirmation of receipt', 'T02 Check confirmation of receipt'))
print(len(time_intervals))
pm4py.convert.convert_petri_net_to_networkx(net: PetriNet, im: Marking, fm: Marking) DiGraph[source]#

Converts a Petri net to a NetworkX DiGraph. Each place and transition is corresponding to a node in the graph.

Parameters:
Return type:

nx.DiGraph

pm4py.convert.convert_petri_net_type(net: PetriNet, im: Marking, fm: Marking, type: str = 'classic') Tuple[PetriNet, Marking, Marking][source]#

Changes the Petri net (internal) type

Parameters:
  • net (PetriNet) – petri net

  • im (Marking) – initial marking

  • fm (Marking) – final marking

  • type (str) – internal type (classic, reset, inhibitor, reset_inhibitor)

Return type:

Tuple[PetriNet, Marking, Marking]

pm4py.discovery module#

The pm4py.discovery module contains the process discovery algorithms implemented in pm4py

pm4py.discovery.discover_dfg(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the frequency of relation as value.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[dict, dict, dict]

import pm4py

dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_directly_follows_graph(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#
pm4py.discovery.discover_dfg_typed(log: DataFrame, case_id_key: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') DirectlyFollowsGraph[source]#

Discovers a Directly-Follows Graph (DFG) from a log.

This method returns a typed DFG object, i.e., as specified in pm4py.objects.dfg.obj.py (DirectlyFollowsGraph Class) The DFG object describes a graph, start activities and end activities. The graph is a collection of triples of the form (a,b,f) representing an arc a->b with frequency f. The start activities are a collection of tuples of the form (a,f) representing that activity a starts f cases. The end activities are a collection of tuples of the form (a,f) representing that ativity a ends f cases.

This method replaces pm4py.discover_dfg and pm4py.discover_directly_follows_graph. In a future release, these functions will adopt the same behavior as this function.

Parameters:
  • log (DataFrame) – pandas.DataFrame

  • case_id_key (str) – attribute to be used as case identifier

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

Return type:

DFG

import pm4py

dfg = pm4py.discover_dfg_typed(log, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_performance_dfg(log: EventLog | DataFrame, business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], workcalendar=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict][source]#

Discovers a performance directly-follows graph from an event log.

This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the performance of relation as value.

Parameters:
  • log – event log / Pandas dataframe

  • business_hours (bool) – enables/disables the computation based on the business hours (default: False)

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[dict, dict, dict]

import pm4py

performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_alpha(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha Miner.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_alpha(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_ilp(log: EventLog | DataFrame, alpha: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the ILP Miner.

Parameters:
  • log – event log / Pandas dataframe

  • alpha (float) – noise threshold for the sequence encoding graph (1.0=no filtering, 0.0=greatest filtering)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_ilp(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_alpha_plus(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the Alpha+ algorithm

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_alpha_plus(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')

Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.

pm4py.discovery.discover_petri_net_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, multi_processing: bool = False, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) Tuple[PetriNet, Marking, Marking][source]#

Discovers a Petri net using the inductive miner algorithm.

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • disable_fallthroughs (bool) – disable the Inductive Miner fall-throughs

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_petri_net_heuristics(log: EventLog | DataFrame, dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking][source]#

Discover a Petri net using the Heuristics Miner

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
  • log – event log / Pandas dataframe

  • dependency_threshold (float) – dependency threshold (default: 0.5)

  • and_threshold (float) – AND threshold (default: 0.65)

  • loop_two_threshold (float) – loop two threshold (default: 0.5)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Tuple[PetriNet, Marking, Marking]

import pm4py

net, im, fm = pm4py.discover_petri_net_heuristics(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_process_tree_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) ProcessTree[source]#

Discovers a process tree using the inductive miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • activity_key (str) – attribute to be used for the activity

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • disable_fallthroughs (bool) – disable the Inductive Miner fall-throughs

Return type:

ProcessTree

import pm4py

process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_heuristics_net(log: EventLog | DataFrame, dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, min_act_count: int = 1, min_dfg_occurrences: int = 1, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', decoration: str = 'frequency') HeuristicsNet[source]#

Discovers an heuristics net

Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).

Parameters:
  • log – event log / Pandas dataframe

  • dependency_threshold (float) – dependency threshold (default: 0.5)

  • and_threshold (float) – AND threshold (default: 0.65)

  • loop_two_threshold (float) – loop two threshold (default: 0.5)

  • min_act_count (int) – minimum number of occurrences per activity in order to be included in the discovery

  • min_dfg_occurrences (int) – minimum number of occurrences per arc in the DFG in order to be included in the discovery

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • decoration (str) – the decoration that should be used (frequency, performance)

Return type:

HeuristicsNet

import pm4py

heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.derive_minimum_self_distance(log: DataFrame | EventLog | EventStream, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int][source]#

This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The activity key ‘concept:name’ is used.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, int]

import pm4py

msd = pm4py.derive_minimum_self_distance(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_footprints(*args: EventLog | Tuple[PetriNet, Marking, Marking] | ProcessTree) List[Dict[str, Any]] | Dict[str, Any][source]#

Discovers the footprints out of the provided event log / process model

Parameters:

args – event log / process model

Return type:

Union[List[Dict[str, Any]], Dict[str, Any]]

import pm4py

footprints = pm4py.discover_footprints(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_eventually_follows_graph(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], int][source]#

Gets the eventually follows graph from a log object.

The eventually follows graph is a dictionary associating to every couple of activities which are eventually following each other the number of occurrences of this relation.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str, str], int]

import pm4py

efg = pm4py.discover_eventually_follows_graph(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_bpmn_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) BPMN[source]#

Discovers a BPMN using the Inductive Miner algorithm

The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.

Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).

Parameters:
  • log – event log / Pandas dataframe / typed DFG

  • noise_threshold (float) – noise threshold (default: 0.0)

  • multi_processing (bool) – boolean that enables/disables multiprocessing in inductive miner

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • disable_fallthroughs (bool) – disable the Inductive Miner fall-throughs

Return type:

BPMN

import pm4py

bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_transition_system(log: EventLog | DataFrame, direction: str = 'forward', window: int = 2, view: str = 'sequence', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') TransitionSystem[source]#

Discovers a transition system as described in the process mining book “Process Mining: Data Science in Action”

Parameters:
  • log – event log / Pandas dataframe

  • direction (str) – direction in which the transition system is built (forward, backward)

  • window (int) – window (2, 3, …)

  • view (str) – view to use in the construction of the states (sequence, set, multiset)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

TransitionSystem

import pm4py

transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_prefix_tree(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Trie[source]#

Discovers a prefix tree from the provided log object.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Trie

import pm4py

prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_temporal_profile(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], Tuple[float, float]][source]#

Discovers a temporal profile from a log object.

Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. “Temporal Conformance Checking at Runtime based on Time-infused Process Models.” arXiv preprint arXiv:2008.07262 (2020).

The output is a dictionary containing, for every couple of activities eventually following in at least a case of the log, the average and the standard deviation of the difference of the timestamps.

E.g. if the log has two cases:

A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06) A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03)

The returned dictionary will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[Tuple[str, str], Tuple[float, float]]

import pm4py

temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_log_skeleton(log: EventLog | DataFrame, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Any][source]#

Discovers a log skeleton from an event log.

A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,

‘A should be directly followed by B’ and ‘B should be directly followed by C’.

  • “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before

    in the history of the case. For example, ‘C should always be preceded by A’

  • “always_after”: specifies that some activities should always trigger the execution of some other activities

    in the future history of the case. For example, ‘A should always be followed by C’

  • “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside

    a case. For example, ‘B and C should always happen the same number of times’.

  • “never_together”: specifies that a given couple of activities should never happen together in the history of the case.

    For example, ‘there should be no case containing both C and D’.

  • “activ_occurrences”: specifies the allowed number of occurrences per activity:

    E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.

Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).

Parameters:
  • log – event log / Pandas dataframe

  • noise_threshold (float) – noise threshold, acting as described in the paper.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, Any]

import pm4py

log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.discovery.discover_declare(log: EventLog | DataFrame, allowed_templates: Set[str] | None = None, considered_activities: Set[str] | None = None, min_support_ratio: float | None = None, min_confidence_ratio: float | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Dict[Any, Dict[str, int]]][source]#

Discovers a DECLARE model from an event log.

Reference paper: F. M. Maggi, A. J. Mooij and W. M. P. van der Aalst, “User-guided discovery of declarative process models,” 2011 IEEE Symposium on Computational Intelligence and Data Mining (CIDM), Paris, France, 2011, pp. 192-199, doi: 10.1109/CIDM.2011.5949297.

Parameters:
  • log – event log / Pandas dataframe

  • allowed_templates – (optional) collection of templates to consider for the discovery

  • considered_activities – (optional) collection of activities to consider for the discovery

  • min_support_ratio – (optional, decided automatically otherwise) minimum percentage of cases (over the entire set of cases of the log) for which the discovered rules apply

  • min_confidence_ratio – (optional, decided automatically otherwise) minimum percentage of cases (over the rule’s support) for which the discovered rules are valid

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Dict[str, Any]

import pm4py

declare_model = pm4py.discover_declare(log)
pm4py.discovery.discover_powl(log: EventLog | DataFrame, variant=POWLDiscoveryVariant.MAXIMAL, filtering_weight_factor: float = 0.0, order_graph_filtering_threshold: float = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') POWL[source]#

Discovers a POWL model from an event log.

Reference paper: Kourani, Humam, and Sebastiaan J. van Zelst. “POWL: partially ordered workflow language.” International Conference on Business Process Management. Cham: Springer Nature Switzerland, 2023.

Parameters:
  • log – event log / Pandas dataframe

  • variant – variant of the algorithm

  • filtering_weight_factor (float) – accepts values 0 <= x < 1

  • order_graph_filtering_threshold (float) – accepts values 0.5 < x <= 1

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

POWL

import pm4py

log = pm4py.read_xes('tests/input_data/receipt.xes')
powl_model = pm4py.discover_powl(log, activity_key='concept:name')
print(powl_model)
pm4py.discovery.discover_batches(log: EventLog | DataFrame, merge_distance: int = 900, min_batch_size: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') List[Tuple[Tuple[str, str], int, Dict[str, Any]]][source]#

Discover batches from the provided log object

We say that an activity is executed in batches by a given resource when the resource executes several times the same activity in a short period of time.

Identifying such activities may identify points of the process that can be automated, since the activity of the person may be repetitive.

The following categories of batches are detected: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched)

The approach has been described in the following paper: Martin, N., Swennen, M., Depaire, B., Jans, M., Caris, A., & Vanhoof, K. (2015, December). Batch Processing: Definition and Event Log Identification. In SIMPDA (pp. 137-140).

The output is a (sorted) list containing tuples. Each tuple contain:
  • Index 0: the activity-resource for which at least one batch has been detected

  • Index 1: the number of batches for the given activity-resource

  • Index 2: a list containing all the batches. Each batch is described by:

    # The start timestamp of the batch # The complete timestamp of the batch # The list of events that are executed in the batch

Parameters:
  • log – event log / Pandas dataframe

  • merge_distance (int) – the maximum time distance between non-overlapping intervals in order for them to be considered belonging to the same batch (default: 15*60 15 minutes)

  • min_batch_size (int) – the minimum number of events for a batch to be considered (default: 2)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

List[Tuple[Tuple[str, str], int, Dict[str, Any]]]

import pm4py

batches = pm4py.discover_log_skeleton(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp', resource_key='org:resource')

pm4py.filtering module#

The pm4py.filtering module contains the filtering features offered in pm4py

pm4py.filtering.filter_log_relative_occurrence_event_attribute(log: EventLog | DataFrame, min_relative_stake: float, attribute_key: str = 'concept:name', level='cases', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the event log keeping only the events having an attribute value which occurs: - in at least the specified (min_relative_stake) percentage of events, when level=”events” - in at least the specified (min_relative_stake) percentage of cases, when level=”cases”

Parameters:
  • log – event log / Pandas dataframe

  • min_relative_stake (float) – minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.

  • attribute_key (str) – the attribute to filter

  • level (str) – the level of the filter (if level=”events”, then events / if level=”cases”, then cases)

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_log_relative_occurrence_event_attribute(dataframe, 0.5, level='cases', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_start_activities(log: EventLog | DataFrame, activities: Set[str] | List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter cases having a start activity in the provided list

Parameters:
  • log – event log / Pandas dataframe

  • activities – collection of start activities

  • retain (bool) – if True, we retain the traces containing the given start activities, if false, we drop the traces

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_start_activities(dataframe, ['Act. A'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_end_activities(log: EventLog | DataFrame, activities: Set[str] | List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter cases having an end activity in the provided list

Parameters:
  • log – event log / Pandas dataframe

  • activities – collection of end activities

  • retain (bool) – if True, we retain the traces containing the given end activities, if false, we drop the traces

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_end_activities(dataframe, ['Act. Z'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_event_attribute_values(log: EventLog | DataFrame, attribute_key: str, values: Set[str] | List[str], level: str = 'case', retain: bool = True, case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter a log object on the values of some event attribute

Parameters:
  • log – event log / Pandas dataframe

  • attribute_key (str) – attribute to filter

  • values – admitted (or forbidden) values

  • level (str) – specifies how the filter should be applied (‘case’ filters the cases where at least one occurrence happens, ‘event’ filter the events eventually trimming the cases)

  • retain (bool) – specifies if the values should be kept or removed

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_event_attribute_values(dataframe, 'concept:name', ['Act. A', 'Act. Z'], case_id_key='case:concept:name')
pm4py.filtering.filter_trace_attribute_values(log: EventLog | DataFrame, attribute_key: str, values: Set[str] | List[str], retain: bool = True, case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter a log on the values of a trace attribute

Parameters:
  • log – event log / Pandas dataframe

  • attribute_key (str) – attribute to filter

  • values – collection of values to filter

  • retain (bool) – boolean value (keep/discard matching traces)

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_trace_attribute_values(dataframe, 'case:creator', ['Mike'], case_id_key='case:concept:name')
pm4py.filtering.filter_variants(log: EventLog | DataFrame, variants: Set[str] | List[str] | List[Tuple[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter a log on a specified set of variants

Parameters:
  • log – event log / Pandas dataframe

  • variants – collection of variants to filter; A variant should be specified as a list of tuples of activity names, e.g., [(‘a’, ‘b’, ‘c’)]

  • retain (bool) – boolean; if True all traces conforming to the specified variants are retained; if False, all those traces are removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants(dataframe, [('Act. A', 'Act. B', 'Act. Z'), ('Act. A', 'Act. C', 'Act. Z')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_directly_follows_relation(log: EventLog | DataFrame, relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Retain traces that contain any of the specified ‘directly follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>].

Parameters:
  • log – event log / Pandas dataframe

  • relations – list of activity name pairs, which are allowed/forbidden paths

  • retain (bool) – parameter that says whether the paths should be kept/removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_directly_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_eventually_follows_relation(log: EventLog | DataFrame, relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Retain traces that contain any of the specified ‘eventually follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].

Parameters:
  • log – event log / Pandas dataframe

  • relations – list of activity name pairs, which are allowed/forbidden paths

  • retain (bool) – parameter that says whether the paths should be kept/removed

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_eventually_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_time_range(log: EventLog | DataFrame, dt1: str, dt2: str, mode='events', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filter a log on a time interval

Parameters:
  • log – event log / Pandas dataframe

  • dt1 (str) – left extreme of the interval

  • dt2 (str) – right extreme of the interval

  • mode (str) – modality of filtering (events, traces_contained, traces_intersecting). events: any event that fits the time frame is retained; traces_contained: any trace completely contained in the timeframe is retained; traces_intersecting: any trace intersecting with the time-frame is retained.

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_contained', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_intersecting', case_id_key='case:concept:name', timestamp_key='time:timestamp')
filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='events', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_between(log: EventLog | DataFrame, act1: str | List[str], act2: str | List[str], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Finds all the sub-cases leading from an event with activity “act1” to an event with activity “act2” in the log, and returns a log containing only them.

Example:

Log A B C D E F A B E F C A B F C B C B E F C

act1 = B act2 = C

Returned sub-cases: B C (from the first case) B E F C (from the second case) B F C (from the third case) B C (from the third case) B E F C (from the third case)

Parameters:
  • log – event log / Pandas dataframe

  • act1 – source activity (or collection of activities)

  • act2 – target activity (or collection of activities)

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_between(dataframe, 'A', 'D', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.filtering.filter_case_size(log: EventLog | DataFrame, min_size: int, max_size: int, case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the event log, keeping the cases having a length (number of events) included between min_size and max_size

Parameters:
  • log – event log / Pandas dataframe

  • min_size (int) – minimum allowed number of events

  • max_size (int) – maximum allowed number of events

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_case_size(dataframe, 5, 10, case_id_key='case:concept:name')
pm4py.filtering.filter_case_performance(log: EventLog | DataFrame, min_performance: float, max_performance: float, timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the event log, keeping the cases having a duration (the timestamp of the last event minus the timestamp of the first event) included between min_performance and max_performance

Parameters:
  • log – event log / Pandas dataframe

  • min_performance (float) – minimum allowed case duration

  • max_performance (float) – maximum allowed case duration

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_case_performance(dataframe, 3600.0, 86400.0, timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_activities_rework(log: EventLog | DataFrame, activity: str, min_occurrences: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the event log, keeping the cases where the specified activity occurs at least min_occurrences times.

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – activity

  • min_occurrences (int) – minimum desidered number of occurrences

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_activities_rework(dataframe, 'Approve Order', 2, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_paths_performance(log: EventLog | DataFrame, path: Tuple[str, str], min_performance: float, max_performance: float, keep=True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the event log, either: - (keep=True) keeping the cases having the specified path (tuple of 2 activities) with a duration included between min_performance and max_performance - (keep=False) discarding the cases having the specified path with a duration included between min_performance and max_performance

Parameters:
  • log – event log / Pandas dataframe

  • path – tuple of two activities (source_activity, target_activity)

  • min_performance (float) – minimum allowed performance (of the path)

  • max_performance (float) – maximum allowed performance (of the path)

  • keep (bool) – keep/discard the cases having the specified path with a duration included between min_performance and max_performance

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_paths_performance(dataframe, ('A', 'D'), 3600.0, 86400.0, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_variants_top_k(log: EventLog | DataFrame, k: int, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Keeps the top-k variants of the log

Parameters:
  • log – event log / Pandas dataframe

  • k (int) – number of variants that should be kept

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants_top_k(dataframe, 5, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_variants_by_coverage_percentage(log: EventLog | DataFrame, min_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the variants of the log by a coverage percentage (e.g., if min_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 1 and variant 2).

Parameters:
  • log – event log / Pandas dataframe

  • min_coverage_percentage (float) – minimum allowed percentage of coverage

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants_by_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_variants_by_maximum_coverage_percentage(log: EventLog | DataFrame, max_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the variants of the log by a maximum coverage percentage (e.g., if max_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 2 and variant 3).

Parameters:
  • log – event log / Pandas dataframe

  • max_coverage_percentage (float) – maximum allowed percentage of coverage

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_variants_by_maximum_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_prefixes(log: EventLog | DataFrame, activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the log, keeping the prefixes to a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The prefixes to “C” are respectively:

A,B A,B,Z,A,B A,B

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – target activity of the filter

  • strict (bool) – applies the filter strictly (cuts the occurrences of the selected activity).

  • first_or_last (str) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_suffixes(log: EventLog | DataFrame, activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters the log, keeping the suffixes from a given activity. E.g., for a log with traces:

A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F

The suffixes from “C” are respectively:

D D D,C,E,C,F

Parameters:
  • log – event log / Pandas dataframe

  • activity (str) – target activity of the filter

  • strict (bool) – applies the filter strictly (cuts the occurrences of the selected activity).

  • first_or_last (str) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_ocel_event_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided event attributes values

Parameters:
  • ocel (OCEL) – object-centric event log

  • attribute_key (str) – attribute at the event level

  • attribute_values – collection of attribute values

  • positive (bool) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_event_attribute(ocel, 'ocel:activity', ['A', 'B', 'D'])
pm4py.filtering.filter_ocel_object_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL[source]#

Filters the object-centric event log on the provided object attributes values

Parameters:
  • ocel (OCEL) – object-centric event log

  • attribute_key (str) – attribute at the event level

  • attribute_values – collection of attribute values

  • positive (bool) – decides if the values should be kept (positive=True) or removed (positive=False)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_attribute(ocel, 'ocel:type', ['order'])
pm4py.filtering.filter_ocel_object_types_allowed_activities(ocel: OCEL, correspondence_dict: Dict[str, Collection[str]]) OCEL[source]#

Filters an object-centric event log keeping only the specified object types with the specified activity set (filters out the rest).

Parameters:
  • ocel (OCEL) – object-centric event log

  • correspondence_dict – dictionary containing, for every object type of interest, a collection of allowed activities. Example: {“order”: [“Create Order”], “element”: [“Create Order”, “Create Delivery”]}

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(ocel, {'order': ['create order', 'pay order'], 'item})
pm4py.filtering.filter_ocel_object_per_type_count(ocel: OCEL, min_num_obj_type: Dict[str, int]) OCEL[source]#

Filters the events of the object-centric logs which are related to at least the specified amount of objects per type.

E.g. pm4py.filter_object_per_type_count(ocel, {“order”: 1, “element”: 2})

Would keep the following events:

ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order

0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1] 1 e11 1981-01-01 Create Order [i6, i5] [o2] 2 e14 1981-01-04 Create Order [i8, i7] [o3]

Parameters:
  • ocel (OCEL) – object-centric event log

  • min_num_obj_type – minimum number of objects per type

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_object_per_type_count(ocel, {'order': 1, 'element': 2})
pm4py.filtering.filter_ocel_start_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which a new object for the given object type is spawn. (E.g. an event with activity “Create Order” might spawn new orders).

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type to consider

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(ocel, 'delivery')
pm4py.filtering.filter_ocel_end_events_per_object_type(ocel: OCEL, object_type: str) OCEL[source]#

Filters the events in which an object for the given object type terminates its lifecycle. (E.g. an event with activity “Pay Order” might terminate an order).

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type to consider

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(ocel, 'delivery')
pm4py.filtering.filter_ocel_events_timestamp(ocel: OCEL, min_timest: datetime | str, max_timest: datetime | str, timestamp_key: str = 'ocel:timestamp') OCEL[source]#

Filters the object-centric event log keeping events in the provided timestamp range

Parameters:
  • ocel (OCEL) – object-centric event log

  • min_timest – left extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

  • max_timest – right extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)

  • timestamp_key (str) – the attribute to use as timestamp (default: ocel:timestamp)

Return type:

OCEL

import pm4py

filtered_ocel = pm4py.filter_ocel_events_timestamp(ocel, '1990-01-01 00:00:00', '2010-01-01 00:00:00')
pm4py.filtering.filter_four_eyes_principle(log: EventLog | DataFrame, activity1: str, activity2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') EventLog | DataFrame[source]#

Filter the cases of the log which violates the four eyes principle on the provided activities.

Parameters:
  • log – event log

  • activity1 (str) – first activity

  • activity2 (str) – second activity

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_four_eyes_principle(dataframe, 'Act. A', 'Act. B', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_activity_done_different_resources(log: EventLog | DataFrame, activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') EventLog | DataFrame[source]#

Filters the cases where an activity is repeated by different resources.

Parameters:
  • log – event log

  • activity (str) – activity to consider

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • resource_key (str) – attribute to be used as resource

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

filtered_dataframe = pm4py.filter_activity_done_different_resources(dataframe, 'Act. A', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.filtering.filter_trace_segments(log: EventLog | DataFrame, admitted_traces: List[List[str]], positive: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Filters an event log on a set of traces. A trace is a sequence of activities and “…”, in which: - a “…” before an activity tells that other activities can precede the given activity - a “…” after an activity tells that other activities can follow the given activity

For example: - pm4py.filter_trace_segments(log, [[“A”, “B”]]) <- filters only the cases of the event log having exactly the process variant A,B - pm4py.filter_trace_segments(log, [[”…”, “A”, “B”]]) <- filters only the cases of the event log ending with the activities A,B - pm4py.filter_trace_segments(log, [[“A”, “B”, “…”]]) <- filters only the cases of the event log starting with the activities A,B - pm4py.filter_trace_segments(log, [[”…”, “A”, “B”, “C”, “…”], [”…”, “D”, “E”, “F”, “…”]]

<- filters only the cases of the event log in which at any point

there is A followed by B followed by C, and in which at any other point there is D followed by E followed by F

Parameters:
  • log – event log / Pandas dataframe

  • admitted_traces – collection of traces admitted from the filter (with the aforementioned criteria)

  • positive (bool) – (boolean) indicates if the filter should keep/discard the cases satisfying the filter

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")

filtered_log = pm4py.filter_trace_segments(log, [["...", "check ticket", "decide", "reinitiate request", "..."]])
print(filtered_log)
pm4py.filtering.filter_ocel_object_types(ocel: OCEL, obj_types: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object types of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • obj_types – object types to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

  • level (int) – recursively expand the set of object identifiers until the specified level

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order'])
pm4py.filtering.filter_ocel_objects(ocel: OCEL, object_identifiers: Collection[str], positive: bool = True, level: int = 1) OCEL[source]#

Filters the object identifiers of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_identifiers – object identifiers to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

  • level (int) – recursively expand the set of object identifiers until the specified level

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=1)
pm4py.filtering.filter_ocel_events(ocel: OCEL, event_identifiers: Collection[str], positive: bool = True) OCEL[source]#

Filters the event identifiers of an object-centric event log.

Parameters:
  • ocel (OCEL) – object-centric event log

  • event_identifiers – event identifiers to keep/remove

  • positive (bool) – boolean value (True=keep, False=remove)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1'])
pm4py.filtering.filter_ocel_cc_object(ocel: OCEL, object_id: str, conn_comp: List[List[str]] | None = None, return_conn_comp: bool = False) OCEL | Tuple[OCEL, List[List[str]]][source]#

Returns the connected component of the object-centric event log to which the object with the provided identifier belongs.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_id (str) – object identifier

  • conn_comp – (optional) connected components of the objects of the OCEL

  • return_conn_comp (bool) – if True, returns the computed connected components of the OCEL

Return type:

Union[OCEL, Tuple[OCEL, List[List[str]]]]

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'order1')
pm4py.filtering.filter_ocel_cc_length(ocel: OCEL, min_cc_length: int, max_cc_length: int) OCEL[source]#

Keeps only the objects in an OCEL belonging to a connected component with a length falling in a specified range

Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.

Parameters:
  • ocel (OCEL) – object-centric event log

  • min_cc_length (int) – minimum allowed length for the connected component

  • max_cc_length (int) – maximum allowed length for the connected component

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_length(ocel, 2, 10)
pm4py.filtering.filter_ocel_cc_otype(ocel: OCEL, otype: str, positive: bool = True) OCEL[source]#

Filters the objects belonging to the connected components having at least an object of the provided object type.

Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.

Parameters:
  • ocel (OCEL) – object-centric event log

  • otype (str) – object type

  • positive (bool) – boolean that keeps or discards the objects of these components

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_otype(ocel, 'order')
pm4py.filtering.filter_ocel_cc_activity(ocel: OCEL, activity: str) OCEL[source]#

Filters the objects belonging to the connected components having at least an event with the provided activity.

Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.

Parameters:
  • ocel (OCEL) – object-centric event log

  • activity (str) – activity

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_activity(ocel, 'Create Order')

pm4py.hof module#

pm4py.hof.filter_log(f: Callable[[Any], bool], log: EventLog) EventLog | EventStream[source]#

Filters the log according to a given (lambda) function.

Parameters:
  • f – function that specifies the filter criterion, may be a lambda

  • log (EventLog) – event log; either EventLog or EventStream Object

Return type:

Union[log_inst.EventLog, log_inst.EventStream]

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.filter_trace(f: Callable[[Any], bool], trace: Trace) Trace[source]#

Filters the trace according to a given (lambda) function.

Parameters:
  • f – function that specifies the filter criterion, may be a lambda

  • trace (Trace) – trace; PM4Py trace object

Return type:

log_inst.Trace

pm4py.hof.sort_log(log: EventLog, key, reverse: bool = False) EventLog | EventStream[source]#

Sorts the event log according to a given key.

Parameters:
  • log (EventLog) – event log object; either EventLog or EventStream

  • key – sorting key

  • reverse (bool) – indicates whether sorting should be reversed or not

Return type:

Union[log_inst.EventLog, log_inst.EventStream]

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.hof.sort_trace(trace: Trace, key, reverse: bool = False) Trace[source]#

Sorts the events in a trace according to a given key.

Parameters:
  • trace (Trace) – input trace

  • key – sorting key

  • reverse (bool) – indicates whether sorting should be reversed (default False)

Return type:

log_inst.Trace

Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.

pm4py.llm module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.llm.openai_query(prompt: str, api_key: str | None = None, openai_model: str | None = None, api_url: str | None = None, **kwargs) str[source]#

Executes the provided prompt, obtaining the answer from the OpenAI APIs.

Parameters:
  • prompt (str) – prompt that should be executed

  • api_key – OpenAI API key

  • openai_model – OpenAI model to be used (default: gpt-3.5-turbo)

  • api_url – OpenAI API URL

Return type:

str

import pm4py

resp = pm4py.llm.openai_query('what is the result of 3+3?', api_key="sk-382393", openai_model="gpt-3.5-turbo")
print(resp)
pm4py.llm.abstract_dfg(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, primary_performance_aggregation: str = 'mean', secondary_performance_aggregation: str | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the DFG abstraction of a traditional event log

Parameters:
  • log_obj – log object

  • max_len (int) – maximum length of the (string) abstraction

  • include_performance (bool) – (boolean) includes the performance of the paths in the abstraction

  • relative_frequency (bool) – (boolean) uses the relative instead of the absolute frequency of the paths

  • response_header (bool) – includes a short header before the paths, pointing to the description of the abstraction

  • primary_performance_aggregation (str) – primary aggregation to be used for the arc’s performance (default: mean, other options: median, min, max, sum, stdev)

  • secondary_performance_aggregation – (optional) secondary aggregation to be used for the arc’s performance (default None, other options: mean, median, min, max, sum, stdev)

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

  • case_id_key (str) – the column to be used as case identifier

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
print(pm4py.llm.abstract_dfg(log))
pm4py.llm.abstract_variants(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, primary_performance_aggregation: str = 'mean', secondary_performance_aggregation: str | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the variants abstraction of a traditional event log

Parameters:
  • log_obj – log object

  • max_len (int) – maximum length of the (string) abstraction

  • include_performance (bool) – (boolean) includes the performance of the variants in the abstraction

  • relative_frequency (bool) – (boolean) uses the relative instead of the absolute frequency of the variants

  • response_header (bool) – includes a short header before the variants, pointing to the description of the abstraction

  • primary_performance_aggregation (str) – primary aggregation to be used for the arc’s performance (default: mean, other options: median, min, max, sum, stdev)

  • secondary_performance_aggregation – (optional) secondary aggregation to be used for the arc’s performance (default None, other options: mean, median, min, max, sum, stdev)

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

  • case_id_key (str) – the column to be used as case identifier

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
print(pm4py.llm.abstract_variants(log))
pm4py.llm.abstract_ocel(ocel: OCEL, include_timestamps: bool = True) str[source]#

Obtains the abstraction of an object-centric event log, including the list of events and the objects of the OCEL

Parameters:
  • ocel (OCEL) – object-centric event log

  • include_timestamps (bool) – (boolean) includes the timestamp information in the abstraction

Return type:

str

import pm4py

ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
print(pm4py.llm.abstract_ocel(ocel))
pm4py.llm.abstract_ocel_ocdfg(ocel: OCEL, include_header: bool = True, include_timestamps: bool = True, max_len: int = 10000) str[source]#

Obtains the abstraction of an object-centric event log, representing in text the object-centric directly-follows graph

Parameters:
  • ocel (OCEL) – object-centric event log

  • include_header (bool) – (boolean) includes the header in the abstraction

  • include_timestamps (bool) – (boolean) includes the timestamp information in the abstraction

  • max_len (int) – maximum length of the abstraction

Return type:

str

import pm4py

ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
print(pm4py.llm.abstract_ocel_ocdfg(ocel))
pm4py.llm.abstract_ocel_features(ocel: OCEL, obj_type: str, include_header: bool = True, max_len: int = 10000, debug: bool = False, enable_object_lifecycle_paths: bool = True) str[source]#

Obtains the abstraction of an object-centric event log, representing in text the features and their values.

Parameters:
  • ocel (OCEL) – object-centric event log

  • obj_type (str) – the object type that should be considered in the feature extraction

  • include_header (bool) – (boolean) includes the header in the abstraction

  • max_len (int) – maximum length of the abstraction

  • debug (bool) – enables debugging mode (telling at which point of the feature extraction you are)

  • enable_object_lifecycle_paths (bool) – enables the “lifecycle paths” feature

Return type:

str

import pm4py

ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel")
print(pm4py.llm.abstract_ocel_ocdfg(ocel))
pm4py.llm.abstract_event_stream(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, response_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Obtains the event stream abstraction of a traditional event log

Parameters:
  • log_obj – log object

  • max_len (int) – maximum length of the (string) abstraction

  • response_header (bool) – includes a short header before the variants, pointing to the description of the abstraction

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

  • case_id_key (str) – the column to be used as case identifier

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
print(pm4py.llm.abstract_event_stream(log))
pm4py.llm.abstract_petri_net(net: PetriNet, im: Marking, fm: Marking, response_header: bool = True) str[source]#

Obtain an abstraction of a Petri net

Parameters:
  • net (PetriNet) – Petri net

  • im (Marking) – Initial marking

  • fm (Marking) – Final marking

  • response_header (bool) – includes the header of the response

Return type:

str

import pm4py

net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml')
print(pm4py.llm.abstract_petri_net(net, im, fm))
pm4py.llm.abstract_log_attributes(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Abstracts the attributes of a log (reporting their name, their type, and the top values)

Parameters:
  • log_obj – log object

  • max_len (int) – maximum length of the (string) abstraction

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

  • case_id_key (str) – the column to be used as case identifier

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
print(pm4py.llm.abstract_log_attributes(log))
pm4py.llm.abstract_log_features(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str[source]#

Abstracts the machine learning features obtained from a log (reporting the top features until the desired length is obtained)

Parameters:
  • log_obj – log object

  • max_len (int) – maximum length of the (string) abstraction

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

  • case_id_key (str) – the column to be used as case identifier

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes")
print(pm4py.llm.abstract_log_features(log))
pm4py.llm.abstract_temporal_profile(temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], include_header: bool = True) str[source]#

Abstracts a temporal profile model to a string.

Parameters:
  • temporal_profile – temporal profile model

  • include_header (bool) – includes an header in the response, describing the temporal profile

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True)
temporal_profile = pm4py.discover_temporal_profile(log)
text_abstr = pm4py.llm.abstract_temporal_profile(temporal_profile, include_header=True)
print(text_abstr)
pm4py.llm.abstract_case(case: Trace, include_case_attributes: bool = True, include_event_attributes: bool = True, include_timestamp: bool = True, include_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') str[source]#

Textually abstracts a case

Parameters:
  • case (Trace) – case object

  • include_case_attributes (bool) – (boolean) include or not the attributes at the case level

  • include_event_attributes (bool) – (boolean) include or not the attributes at the event level

  • include_timestamp (bool) – (boolean) include or not the event timestamp in the abstraction

  • include_header (bool) – (boolean) includes the header of the response

  • activity_key (str) – the column to be used as activity

  • timestamp_key (str) – the column to be used as timestamp

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True)
print(pm4py.llm.abstract_case(log[0]))
pm4py.llm.abstract_declare(declare_model, include_header: bool = True) str[source]#

Textually abstracts a DECLARE model

Parameters:
  • declare – DECLARE model

  • include_header (bool) – (boolean) includes the header of the response

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True)
log_ske = pm4py.discover_declare(log)
print(pm4py.llm.abstract_declare(log_ske))
pm4py.llm.abstract_log_skeleton(log_skeleton, include_header: bool = True) str[source]#

Textually abstracts a log skeleton process model

Parameters:
  • log_skeleton – log skeleton

  • include_header (bool) – (boolean) includes the header of the response

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True)
log_ske = pm4py.discover_log_skeleton(log)
print(pm4py.llm.abstract_log_skeleton(log_ske))
pm4py.llm.explain_visualization(vis_saver, *args, connector=<function openai_query>, **kwargs) str[source]#

Explains a process mining visualization by using LLMs (saving that first in a .png image, then providing the .png file to the Large Language Model along with possibly a description of the visualization).

Parameters:
  • vis_saver – the visualizer (saving to disk) to be used

  • args – the mandatory arguments that should be provided to the visualization

  • connector – the connector method to the large language model

  • kwargs – optional parameters of the visualization or the connector (for example, the annotation of the visualization, or the API key)

Return type:

str

import pm4py

log = pm4py.read_xes("tests/input_data/running-example.xes")
descr = pm4py.llm.explain_visualization(pm4py.save_vis_dotted_chart, log, api_key="sk-5HN", show_legend=False)
print(descr)

pm4py.meta module#

Process mining for Python

pm4py.ml module#

The pm4py.ml module contains the machine learning features offered in pm4py

pm4py.ml.split_train_test(log: EventLog | DataFrame, train_percentage: float = 0.8, case_id_key='case:concept:name') Tuple[EventLog, EventLog] | Tuple[DataFrame, DataFrame][source]#

Split an event log in a training log and a test log (for machine learning purposes). Returns the training and the test event log.

Parameters:
  • log – event log / Pandas dataframe

  • train_percentage (float) – fraction of traces to be included in the training log (from 0.0 to 1.0)

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[Tuple[EventLog, EventLog], Tuple[pd.DataFrame, pd.DataFrame]]

import pm4py

train_df, test_df = pm4py.split_train_test(dataframe, train_percentage=0.75)
pm4py.ml.get_prefixes_from_log(log: EventLog | DataFrame, length: int, case_id_key: str = 'case:concept:name') EventLog | DataFrame[source]#

Gets the prefixes of a log of a given length. The returned log object contain the prefixes: - if a trace has lower or identical length, it is included as-is - if a trace has greater length, it is cut

Parameters:
  • log – event log / Pandas dataframe

  • length (int) – length

  • case_id_key (str) – attribute to be used as case identifier

Return type:

Union[EventLog, pd.DataFrame]

import pm4py

trimmed_df = pm4py.get_prefixes_from_log(dataframe, length=5, case_id_key='case:concept:name')
pm4py.ml.extract_outcome_enriched_dataframe(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame[source]#

Inserts additional columns in the dataframe which are computed on the overall case, so they model the outcome of the case.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

  • start_timestamp_key (str) – attribute to be used as start timestamp

Return type:

pd.DataFrame

import pm4py

enriched_df = pm4py.extract_outcome_enriched_dataframe(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
pm4py.ml.extract_features_dataframe(log: EventLog | DataFrame, str_tr_attr=None, num_tr_attr=None, str_ev_attr=None, num_ev_attr=None, str_evsucc_attr=None, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key=None, resource_key='org:resource', include_case_id: bool = False, **kwargs) DataFrame[source]#

Extracts a dataframe containing the features of each case of the provided log object

Parameters:
  • log – log object (event log / Pandas dataframe)

  • str_tr_attr – (if provided) string attributes at the case level which should be extracted as features

  • num_tr_attr – (if provided) numeric attributes at the case level which should be extracted as features

  • str_ev_attr – (if provided) string attributes at the event level which should be extracted as features (one-hot encoding)

  • num_ev_attr – (if provided) numeric attributes at the event level which should be extracted as features (last value per attribute in a case)

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key – (if provided, otherwise default) the attribute to be used as case identifier

  • resource_key (str) – the attribute to be used as resource

  • include_case_id (bool) – includes the case identifier column in the features table

Return type:

pd.DataFrame

import pm4py

features_df = pm4py.extract_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.ml.extract_ocel_features(ocel: OCEL, obj_type: str, enable_object_lifecycle_paths: bool = True, enable_object_work_in_progress: bool = False, object_str_attributes: Collection[str] | None = None, object_num_attributes: Collection[str] | None = None, include_obj_id: bool = False, debug: bool = False) DataFrame[source]#

Extracts from an object-centric event log a set of features (returned as dataframe) computed on the OCEL for the objects of a given object type.

Implements the approach described in: Berti, A., Herforth, J., Qafari, M.S. et al. Graph-based feature extraction on object-centric event logs. Int J Data Sci Anal (2023). https://doi.org/10.1007/s41060-023-00428-2

Parameters:
  • ocel (OCEL) – object-centric event log

  • obj_type (str) – object type that should be considered

  • enable_object_lifecycle_paths (bool) – enables the “lifecycle paths” feature

  • enable_object_work_in_progress (bool) – enables the “work in progress” feature (which has an high computational cost)

  • object_str_attributes – string attributes at the object level to one-hot encode during the feature extraction

  • object_num_attributes – numeric attributes at the object level to one-hot encode during the feature extraction

  • include_obj_id (bool) – includes the object identifier as column of the “features” dataframe

  • debug (bool) – enables debugging mode (telling at which point of the feature extraction you are)

Return type:

pd.DataFrame

import pm4py

ocel = pm4py.read_ocel('log.jsonocel')
fea_df = pm4py.extract_ocel_features(ocel, "item")
pm4py.ml.extract_temporal_features_dataframe(log: EventLog | DataFrame, grouper_freq='W', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key=None, start_timestamp_key='time:timestamp', resource_key='org:resource') DataFrame[source]#

Extracts a dataframe containing the temporal features of the provided log object

Implements the approach described in the paper: Pourbafrani, Mahsa, Sebastiaan J. van Zelst, and Wil MP van der Aalst. “Supporting automatic system dynamics model generation for simulation in the context of process mining.” International Conference on Business Information Systems. Springer, Cham, 2020.

Parameters:
  • log – log object (event log / Pandas dataframe)

  • grouper_freq (str) – the grouping frequency (D, W, M, Y) to use

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key – (if provided, otherwise default) the attribute to be used as case identifier

  • resource_key (str) – the attribute to be used as resource

  • start_timestamp_key (str) – the attribute to be used as start timestamp

Return type:

pd.DataFrame

import pm4py

temporal_features_df = pm4py.extract_temporal_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.ml.extract_target_vector(log: EventLog | DataFrame, variant: str, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name') Tuple[Any, List[str]][source]#

Extracts from a log object the target vector for a specific ML use case (next activity, next time, remaining time)

Parameters:
  • log – log object (event log / Pandas dataframe)

  • variant (str) – variant of the algorithm to be used: next_activity, next_time, remaining_time

  • activity_key (str) – the attribute to be used as activity

  • timestamp_key (str) – the attribute to be used as timestamp

  • case_id_key (str) – the attribute to be used as case identifier

Return type:

Tuple[Any, List[str]]

import pm4py

vector_next_act, class_next_act = pm4py.extract_target_vector(log, 'next_activity', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
vector_next_time, class_next_time = pm4py.extract_target_vector(log, 'next_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
vector_rem_time, class_rem_time = pm4py.extract_target_vector(log, 'remaining_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')

pm4py.ocel module#

The pm4py.ocel module contains the object-centric process mining features offered in pm4py

pm4py.ocel.ocel_get_object_types(ocel: OCEL) List[str][source]#

Gets the list of object types contained in the object-centric event log (e.g., [“order”, “item”, “delivery”]).

Parameters:

ocel (OCEL) – object-centric event log

Return type:

List[str]

import pm4py

object_types = pm4py.ocel_get_object_types(ocel)
pm4py.ocel.ocel_get_attribute_names(ocel: OCEL) List[str][source]#

Gets the list of attributes at the event and the object level of an object-centric event log (e.g. [“cost”, “amount”, “name”])

Parameters:

ocel (OCEL) – object-centric event log

Return type:

List[str]

import pm4py

attribute_names = pm4py.ocel_get_attribute_names(ocel)
pm4py.ocel.ocel_flattening(ocel: OCEL, object_type: str) DataFrame[source]#

Flattens the object-centric event log to a traditional event log with the choice of an object type. In the flattened log, the objects of a given object type are the cases, and each case contains the set of events related to the object. The flattened log follows the XES notations for case identifier, activity, and timestamp. In particular: - “case:concept:name” is the column used for the case ID. - “concept:name” is the column used for the activity. - “time:timestamp” is the column used for the timestamp.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – object type

Return type:

pd.DataFrame

import pm4py

event_log = pm4py.ocel_flattening(ocel, 'items')
pm4py.ocel.ocel_object_type_activities(ocel: OCEL) Dict[str, Collection[str]][source]#

Gets the set of activities performed for each object type

Parameters:

ocel (OCEL) – object-centric event log

Return type:

Dict[str, Collection[str]]

import pm4py

ot_activities = pm4py.ocel_object_type_activities(ocel)
pm4py.ocel.ocel_objects_ot_count(ocel: OCEL) Dict[str, Dict[str, int]][source]#

Counts for each event the number of related objects per type

Parameters:

ocel (OCEL) – object-centric event log

Return type:

Dict[str, Dict[str, int]]

import pm4py

objects_ot_count = pm4py.ocel_objects_ot_count(ocel)
pm4py.ocel.ocel_temporal_summary(ocel: OCEL) DataFrame[source]#

Returns the ``temporal summary’’ from an object-centric event log. The temporal summary aggregates all the events performed in the same timestamp, and reports the list of activities and the involved objects.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

pd.DataFrame

import pm4py

temporal_summary = pm4py.ocel_temporal_summary(ocel)
pm4py.ocel.ocel_objects_summary(ocel: OCEL) DataFrame[source]#

Gets the objects summary of an object-centric event log

Parameters:

ocel (OCEL) – object-centric event log

Return type:

pd.DataFrame

import pm4py

objects_summary = pm4py.ocel_objects_summary(ocel)
pm4py.ocel.ocel_objects_interactions_summary(ocel: OCEL) DataFrame[source]#

Gets the objects interactions summary of an object-centric event log. The objects interactions summary has a row for every combination (event, related object, other related object). Properties such as the activity of the event, and the object types of the two related objects, are included.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

interactions_summary = pm4py.ocel_objects_interactions_summary(ocel)
pm4py.ocel.discover_ocdfg(ocel: OCEL, business_hours=False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)]) Dict[str, Any][source]#

Discovers an OC-DFG from an object-centric event log.

Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).

Reference paper: Berti, Alessandro, and Wil van der Aalst. “Extracting multiple viewpoint models from relational databases.” Data-Driven Process Discovery and Analysis. Springer, Cham, 2018. 24-51.

Parameters:
  • ocel (OCEL) – object-centric event log

  • business_hours (bool) – boolean value that enables the usage of the business hours

  • business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00

Return type:

Dict[str, Any]

import pm4py

ocdfg = pm4py.discover_ocdfg(ocel)
pm4py.ocel.discover_oc_petri_net(ocel: OCEL, inductive_miner_variant: str = 'im', diagnostics_with_tbr: bool = False) Dict[str, Any][source]#

Discovers an object-centric Petri net from the provided object-centric event log.

Reference paper: van der Aalst, Wil MP, and Alessandro Berti. “Discovering object-centric Petri nets.” Fundamenta informaticae 175.1-4 (2020): 1-40.

Parameters:
  • ocel (OCEL) – object-centric event log

  • inductive_miner_variant (str) – specify the variant of the inductive miner to be used (“im” for traditional; “imd” for the faster inductive miner directly-follows)

  • diagnostics_with_tbr (bool) – (boolean) enables the computation of some diagnostics using token-based replay

Return type:

Dict[str, Any]

import pm4py

ocpn = pm4py.discover_oc_petri_net(ocel)
pm4py.ocel.discover_objects_graph(ocel: OCEL, graph_type: str = 'object_interaction') Set[Tuple[str, str]][source]#

Discovers an object graph from the provided object-centric event log

Parameters:
  • ocel (OCEL) – object-centric event log

  • graph_type (str) – type of graph to consider (object_interaction, object_descendants, object_inheritance, object_cobirth, object_codeath)

Return type:

Dict[str, Any]

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
obj_graph = pm4py.ocel_discover_objects_graph(ocel, graph_type='object_interaction')
pm4py.ocel.ocel_o2o_enrichment(ocel: OCEL, included_graphs: Collection[str] | None = None) OCEL[source]#

Inserts the information inferred from the graph computations (pm4py.discover_objects_graph) in the list of O2O relations of the OCEL.

Parameters:
  • ocel (OCEL) – object-centric event log

  • included_graphs – types of graphs to include, provided as list/set of strings (object_interaction_graph, object_descendants_graph, object_inheritance_graph, object_cobirth_graph, object_codeath_graph)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_o2o_enrichment(ocel)
print(ocel.o2o)
pm4py.ocel.ocel_e2o_lifecycle_enrichment(ocel: OCEL) OCEL[source]#

Inserts lifecycle-based information (when an object is created/terminated or other types of relations) in the list of E2O relations of the OCEL

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_e2o_lifecycle_enrichment(ocel)
print(ocel.relations)
pm4py.ocel.sample_ocel_objects(ocel: OCEL, num_objects: int) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the objects that is chosen in a random way. Only the events related to at least one of these objects are filtered from the event log. As a note, the relationships between the different objects are probably going to be ruined by this sampling.

Parameters:
  • ocel (OCEL) – Object-centric event log

  • num_objects (int) – Number of objects of the object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
sampled_ocel = pm4py.sample_ocel_objects(ocel, 50) # keeps only 50 random objects
pm4py.ocel.sample_ocel_connected_components(ocel: OCEL, connected_components: int = 1, max_num_events_per_cc: int = 9223372036854775807, max_num_objects_per_cc: int = 9223372036854775807, max_num_e2o_relations_per_cc: int = 9223372036854775807) OCEL[source]#

Given an object-centric event log, returns a sampled event log with a subset of the executions. The number of considered connected components need to be specified by the user.

Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.

Parameters:
  • ocel (OCEL) – Object-centric event log

  • connected_components (int) – Number of connected components to pick from the OCEL

  • max_num_events_per_cc (int) – maximum number of events allowed per connected component (default: sys.maxsize)

  • max_num_objects_per_cc (int) – maximum number of events allowed per connected component (default: sys.maxsize)

  • max_num_e2o_relations_per_cc (int) – maximum number of event-to-object relationships allowed per connected component (default: sys.maxsize)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
sampled_ocel = pm4py.sample_ocel_connected_components(ocel, 5) # keeps only 5 connected components
pm4py.ocel.ocel_drop_duplicates(ocel: OCEL) OCEL[source]#

Drop relations between events and objects happening at the same time, with the same activity, to the same object identifier. This ends up cleaning the OCEL from duplicate events.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_drop_duplicates(ocel)
pm4py.ocel.ocel_merge_duplicates(ocel: OCEL, have_common_object: bool | None = False) OCEL[source]#

Merge events in the OCEL that happen with the same activity at the same timestamp

Parameters:
  • ocel (OCEL) – object-centric event log

  • have_common_object – impose the additional merge condition that the two events should happen at the same timestamp.

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_merge_duplicates(ocel)
pm4py.ocel.ocel_sort_by_additional_column(ocel: OCEL, additional_column: str, primary_column: str = 'ocel:timestamp') OCEL[source]#

Sorts the OCEL not only based on the timestamp column and the index, but using an additional sorting column that further determines the order of the events happening at the same timestamp.

Parameters:
  • ocel (OCEL) – object-centric event log

  • additional_column (str) – additional column to use for the sorting

  • primary_column (str) – primary column to be used for the sorting (default: ocel:timestamp)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_sort_by_additional_column(ocel, 'ordering')
pm4py.ocel.ocel_add_index_based_timedelta(ocel: OCEL) OCEL[source]#

Adds a small time-delta to the timestamp column based on the current index of the event. This ensures the correct ordering of the events in any object-centric process mining solution.

Parameters:

ocel (OCEL) – object-centric event log

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
ocel = pm4py.ocel_add_index_based_timedelta(ocel)
pm4py.ocel.cluster_equivalent_ocel(ocel: OCEL, object_type: str, max_objs: int = 9223372036854775807) Dict[str, Collection[OCEL]][source]#

Perform a clustering of the object-centric event log, based on the ‘executions’ of a single object type. Equivalent ‘executions’ are grouped in the output dictionary.

Parameters:
  • ocel (OCEL) – object-centric event log

  • object_type (str) – reference object type

  • max_objs (int) – maximum number of objects (of the given object type)

Return type:

Dict[str, Collection[OCEL]]

import pm4py

ocel = pm4py.read_ocel('trial.ocel')
clusters = pm4py.cluster_equivalent_ocel(ocel, "order")

pm4py.org module#

The pm4py.org module contains the organizational analysis techniques offered in pm4py

pm4py.org.discover_handover_of_work_network(log: EventLog | DataFrame, beta=0, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the handover of work network of the event log. The handover of work network is essentially the DFG of the event log, however, using the resource as a node of the graph, instead of the activity. As such, to use this, resource information should be present in the event log.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • beta (int) – beta parameter for Handover metric

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_handover_of_work_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_working_together_network(log: EventLog | DataFrame, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the working together network of the process. Two nodes resources are connected in the graph if the resources collaborate on an instance of the process.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_working_together_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_activity_based_resource_similarity(log: EventLog | DataFrame, activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates similarity between the resources in the event log, based on their activity profiles.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

act_res_sim = pm4py.discover_activity_based_resource_similarity(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_subcontracting_network(log: EventLog | DataFrame, n=2, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA[source]#

Calculates the subcontracting network of the process.

Return type:

SNA

Parameters:
  • log – event log / Pandas dataframe

  • n (int) – n parameter for Subcontracting metric

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_organizational_roles(log: EventLog | DataFrame, activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Role][source]#

Mines the organizational roles

A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:

Reference paper: Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.

Parameters:
  • log – event log / Pandas dataframe

  • activity_key (str) – attribute to be used for the activity

  • resource_key (str) – attribute to be used for the resource

  • timestamp_key (str) – attribute to be used for the timestamp

  • case_id_key (str) – attribute to be used as case identifier

import pm4py

roles = pm4py.discover_organizational_roles(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.org.discover_network_analysis(log: DataFrame | EventLog | EventStream, out_column: str, in_column: str, node_column_source: str, node_column_target: str, edge_column: str, edge_reference: str = '_out', performance: bool = False, sorting_column: str = 'time:timestamp', timestamp_column: str = 'time:timestamp') Dict[Tuple[str, str], Dict[str, Any]][source]#

Performs a network analysis of the log based on the provided parameters.

The classical social network analysis methods are based on the order of the events inside a case. For example, the Handover of Work metric considers the directly-follows relationships between resources during the work of a case. An edge is added between the two resources if such relationships occurs.

Real-life scenarios may be more complicated. At first, is difficult to collect events inside the same case without having convergence/divergence issues (see first section of the OCEL part). At second, the type of relationship may also be important. Consider for example the relationship between two resources: this may be more efficient if the activity that is executed is liked by the resources, rather than disgusted.

The network analysis that we introduce here generalizes some existing social network analysis metrics, becoming independent from the choice of a case notion and permitting to build a multi-graph instead of a simple graph.

With this, we assume events to be linked by signals. An event emits a signal (that is contained as one attribute of the event) that is assumed to be received by other events (also, this is an attribute of these events) that follow the first event in the log. So, we assume there is an OUT attribute (of the event) that is identical to the IN attribute (of the other events).

When we collect this information, we can build the network analysis graph: - The source node of the relation is given by an aggregation over a node_column_source attribute. - The target node of the relation is given by an aggregation over a node_column_target attribute. - The type of edge is given by an aggregation over an edge_column attribute. - The network analysis graph can either be annotated with frequency or performance information.

The output is a multigraph. Two events EV1 and EV2 of the log are merged (indipendently from the case notion) based on having EV1.OUT_COLUMN = EV2.IN_COLUMN. Then, an aggregation is applied on the couple of events (NODE_COLUMN) to obtain the nodes that are connected. The edges between these nodes are aggregated based on some property of the source event (EDGE_COLUMN).

Parameters:
  • log – event log / Pandas dataframe

  • out_column (str) – the source column of the link (default: the case identifier; events of the same case are linked)

  • in_column (str) – the target column of the link (default: the case identifier; events of the same case are linked)

  • node_column_source (str) – the attribute to be used for the node definition of the source event (default: the resource of the log, org:resource)

  • node_column_target (str) – the attribute to be used for the node definition of the target event (default: the resource of the log, org:resource)

  • edge_column (str) – the attribute to be used for the edge definition (default: the activity of the log, concept:name)

  • edge_reference (str) – decide if the edge attribute should be picked from the source event. Values: _out => the source event ; _in => the target event

  • performance (bool) – boolean value that enables the performance calculation on the edges of the network analysis

  • sorting_column (str) – the column that should be used to sort the log before performing the network analysis (default: time:timestamp)

  • timestamp_column (str) – the column that should be used as timestamp for the performance-related analysis (default: time:timestamp)

Return type:

Dict[Tuple[str, str], Dict[str, Any]]

import pm4py

net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')

pm4py.privacy module#

This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).

PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.

pm4py.privacy.anonymize_differential_privacy(log: EventLog | DataFrame, epsilon: float = 1.0, k: int = 10, p: int = 20) DataFrame[source]#

Protect event logs with differential privacy. Differential privacy is a guarantee that bounds the impact the data of one individual has on a query result.

Control-flow information is anonymized with SaCoFa. This algorithm inserts noise into a trace-variant count, through the step-wise construction of a prefix tree.

Contextual-information, like timestamps or resources, is anonymized with PRIPEL. This technique enriches a control-flow anonymized event log with contextual information from the original log, while still achieving differential privacy. PRIPEL anonymizes each event’s timestamp and other attributes, that are stored as strings, integers, floats, or booleans.

Please install diffprivlib https://diffprivlib.readthedocs.io/en/latest/ (pip install diffprivlib==0.5.2) to run our algorithm.

SaCoFa is described in: S. A. Fahrenkog-Petersen, M. Kabierski, F. Rösel, H. van der Aa and M. Weidlich, “SaCoFa: Semantics-aware Control-flow Anonymization for Process Mining,” 2021 3rd International Conference on Process Mining (ICPM), 2021, pp. 72-79. https://doi.org/10.48550/arXiv.2109.08501

PRIPEL is described in: Fahrenkrog-Petersen, S.A., van der Aa, H., Weidlich, M. (2020). PRIPEL: Privacy-Preserving Event Log Publishing Including Contextual Information. In: Fahland, D., Ghidini, C., Becker, J., Dumas, M. (eds) Business Process Management. BPM 2020. Lecture Notes in Computer Science, vol 12168. Springer, Cham. https://doi.org/10.1007/978-3-030-58666-9_7

Parameters:
  • log – event log / Pandas dataframe

  • epsilon (float) – the strength of the differential privacy guarantee. The smaller the value of epsilon, the stronger the privacy guarantee that is provided.

  • k (int) – the maximal length of considered traces in the prefix tree. We recommend setting k, that roughly 80% of all traces from the original event log are covered.

  • p (int) – the pruning parameter, which denotes the minimum count a prefix has to have in order to not be discarded. The dependent exponential runtime of the algorithms is mitigated by the pruning parameter.

Return type:

pd.DataFrame

import pm4py

event_log = pm4py.read_xes("running-example.xes")
anonymized_event_log = pm4py.anonymize_differential_privacy(event_log, epsilon=1.0, k=10, p=20)

pm4py.read module#

The pm4py.read module contains all funcationality related to reading files/objects from disk.

pm4py.read.read_xes(file_path: str, variant: str | None = None, return_legacy_log_object: bool = False, encoding: str = 'utf-8', **kwargs) DataFrame | EventLog[source]#

Reads an event log stored in XES format (see xes-standard) Returns a table (pandas.DataFrame) view of the event log.

Parameters:
  • file_path (str) – file path of the event log (.xes file) on disk

  • variant – the variant of the importer to use. “iterparse” => traditional XML parser; “line_by_line” => text-based line-by-line importer ; “chunk_regex” => chunk-of-bytes importer (default); “iterparse20” => XES 2.0 importer

  • return_legacy_log_object (bool) – boolean value enabling returning a log object (default: False)

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

DataFrame

import pm4py

log = pm4py.read_xes("<path_to_xes_file>")
pm4py.read.read_pnml(file_path: str, auto_guess_final_marking: bool = False, encoding: str = 'utf-8') Tuple[PetriNet, Marking, Marking][source]#

Reads a Petri net object from a .pnml file. The Petri net object returned is a triple containing the following objects:

  1. Petrinet Object, encoded as a PetriNet class

  2. Initial Marking

  3. Final Marking

Return type:

Tuple[PetriNet, Marking, Marking]

Parameters:
  • file_path (str) – file path of the Petri net model (.pnml file) on disk

  • encoding (str) – the encoding to be used (default: utf-8)

import pm4py

pn = pm4py.read_pnml("<path_to_pnml_file>")
pm4py.read.read_ptml(file_path: str, encoding: str = 'utf-8') ProcessTree[source]#

Reads a process tree object from a .ptml file

Parameters:
  • file_path (str) – file path of the process tree object on disk

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

ProcessTree

import pm4py

process_tree = pm4py.read_ptml("<path_to_ptml_file>")
pm4py.read.read_dfg(file_path: str, encoding: str = 'utf-8') Tuple[Dict[Tuple[str, str], int], Dict[str, int], Dict[str, int]][source]#

Reads a DFG object from a .dfg file. The DFG object returned is a triple containing the following objects:

  1. DFG Object, encoded as a Dict[Tuple[str,str],int], s.t. DFG[('a','b')]=k implies that activity 'a' is directly followed by activity 'b' a total of k times in the log

  2. Start activity dictionary, encoded as a Dict[str,int], s.t., S['a']=k implies that activity 'a' is starting k traces in the event log

  3. End activity dictionary, encoded as a Dict[str,int], s.t., E['z']=k implies that activity 'z' is ending k traces in the event log.

Return type:

Tuple[Dict[Tuple[str,str],int], Dict[str,int], Dict[str,int]]

Parameters:
  • file_path (str) – file path of the dfg model on disk

  • encoding (str) – the encoding to be used (default: utf-8)

import pm4py

dfg = pm4py.read_dfg("<path_to_dfg_file>")
pm4py.read.read_bpmn(file_path: str, encoding: str = 'utf-8') BPMN[source]#

Reads a BPMN model from a .bpmn file

Parameters:
  • file_path (str) – file path of the bpmn model

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

BPMN

import pm4py

bpmn = pm4py.read_bpmn('<path_to_bpmn_file>')
pm4py.read.read_ocel(file_path: str, objects_path: str | None = None, encoding: str = 'utf-8') OCEL[source]#

Reads an object-centric event log from a file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log

  • objects_path – [Optional] file path from which the objects dataframe should be read

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel("<path_to_ocel_file>")
pm4py.read.read_ocel_csv(file_path: str, objects_path: str | None = None, encoding: str = 'utf-8') OCEL[source]#

Reads an object-centric event log from a CSV file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log (.csv)

  • objects_path – [Optional] file path from which the objects dataframe should be read

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_csv("<path_to_ocel_file.csv>")
pm4py.read.read_ocel_json(file_path: str, encoding: str = 'utf-8') OCEL[source]#

Reads an object-centric event log from a JSON-OCEL file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log (.jsonocel)

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_json("<path_to_ocel_file.jsonocel>")
pm4py.read.read_ocel_xml(file_path: str, encoding: str = 'utf-8') OCEL[source]#

Reads an object-centric event log from a XML-OCEL file (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the object-centric event log (.xmlocel)

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_xml("<path_to_ocel_file.xmlocel>")
pm4py.read.read_ocel_sqlite(file_path: str, encoding: str = 'utf-8') OCEL[source]#

Reads an object-centric event log from a SQLite database (see: http://www.ocel-standard.org/). The OCEL object is returned by this method

Parameters:
  • file_path (str) – file path of the SQLite database (.sqlite)

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel_sqlite("<path_to_ocel_file.sqlite>")
pm4py.read.read_ocel2(file_path: str, variant_str: str | None = None, encoding: str = 'utf-8') OCEL[source]#

Reads an OCEL2.0 event log

Parameters:
  • file_path (str) – path to the OCEL2.0 event log

  • variant_str – (optional) specification of the importer variant to be used

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel2("<path_to_ocel_file>")
pm4py.read.read_ocel2_json(file_path: str, variant_str: str | None = None, encoding: str = 'utf-8') OCEL[source]#

Reads an OCEL2.0 event log from a JSON-OCEL(2) file

Parameters:
  • file_path (str) – path to the JSON file

  • variant_str – (optional) specification of the importer variant to be used

  • encoding (str) – the encoding to be used (default: utf-8)

Return type:

OCEL

import pm4py

ocel = pm4py.read_ocel2_json("<path_to_ocel_file.jsonocel>")
pm4py.read.read_ocel2_sqlite(file_path: str, variant_str: str | None = None, encoding: str = 'utf-8') OCEL[source]