pm4py package#
Process mining for Python
Subpackages#
- pm4py.algo package
- Subpackages
- pm4py.algo.analysis package
- pm4py.algo.anonymization package
- pm4py.algo.clustering package
- pm4py.algo.comparison package
- pm4py.algo.conformance package
- pm4py.algo.connectors package
- pm4py.algo.decision_mining package
- pm4py.algo.discovery package
- pm4py.algo.evaluation package
- pm4py.algo.filtering package
- pm4py.algo.label_splitting package
- pm4py.algo.merging package
- pm4py.algo.organizational_mining package
- pm4py.algo.querying package
- pm4py.algo.reduction package
- pm4py.algo.simulation package
- pm4py.algo.transformation package
- Subpackages
- pm4py.objects package
- Subpackages
- pm4py.objects.bpmn package
- pm4py.objects.conversion package
- pm4py.objects.dfg package
- pm4py.objects.heuristics_net package
- pm4py.objects.log package
- pm4py.objects.ocel package
- pm4py.objects.org package
- pm4py.objects.petri_net package
- pm4py.objects.powl package
- pm4py.objects.process_tree package
- pm4py.objects.random_variables package
- pm4py.objects.stochastic_petri package
- pm4py.objects.transition_system package
- pm4py.objects.trie package
- Subpackages
- pm4py.statistics package
- Subpackages
- pm4py.statistics.attributes package
- pm4py.statistics.concurrent_activities package
- pm4py.statistics.end_activities package
- pm4py.statistics.eventually_follows package
- pm4py.statistics.ocel package
- pm4py.statistics.overlap package
- pm4py.statistics.passed_time package
- pm4py.statistics.rework package
- pm4py.statistics.service_time package
- pm4py.statistics.sojourn_time package
- pm4py.statistics.start_activities package
- pm4py.statistics.traces package
- pm4py.statistics.util package
- pm4py.statistics.variants package
- Subpackages
- pm4py.streaming package
- pm4py.util package
- Subpackages
- Submodules
- pm4py.util.business_hours module
- pm4py.util.colors module
- pm4py.util.constants module
- pm4py.util.exec_utils module
- pm4py.util.hie_utils module
- pm4py.util.ml_utils module
- pm4py.util.nx_utils module
Parameters
get_default_nx_environment()
Graph()
DiGraph()
MultiGraph()
MultiDiGraph()
ancestors()
descendants()
connected_components()
bfs_tree()
contracted_nodes()
shortest_path()
strongly_connected_components()
has_path()
is_strongly_connected()
all_pairs_shortest_path()
all_pairs_dijkstra()
find_cliques()
degree_centrality()
greedy_modularity_communities()
maximum_flow_value()
minimum_weight_full_matching()
Edmonds()
neo4j_upload()
neo4j_download()
nx_to_ocel()
nx_to_event_log()
- pm4py.util.pandas_utils module
get_default_dataframe_environment()
to_dict_records()
to_dict_index()
insert_index()
insert_case_index()
insert_ev_in_tr_index()
format_unique()
insert_feature_activity_position_in_trace()
insert_case_arrival_finish_rate()
insert_case_service_waiting_time()
check_is_pandas_dataframe()
instantiate_dataframe()
instantiate_dataframe_from_dict()
instantiate_dataframe_from_records()
get_grouper()
get_total_seconds()
convert_to_seconds()
dataframe_column_string_to_datetime()
read_csv()
concat()
merge()
check_pandas_dataframe_columns()
- pm4py.util.points_subset module
- pm4py.util.regex module
- pm4py.util.string_distance module
- pm4py.util.typing module
- pm4py.util.variants_util module
- pm4py.util.vis_utils module
- pm4py.util.xes_constants module
- pm4py.visualization package
- Subpackages
- pm4py.visualization.align_table package
- pm4py.visualization.bpmn package
- pm4py.visualization.common package
- Submodules
- pm4py.visualization.common.dot_util module
- pm4py.visualization.common.gview module
- pm4py.visualization.common.html module
- pm4py.visualization.common.save module
- pm4py.visualization.common.svg_pos_parser module
- pm4py.visualization.common.utils module
- pm4py.visualization.common.visualizer module
- pm4py.visualization.decisiontree package
- pm4py.visualization.dfg package
- pm4py.visualization.dotted_chart package
- pm4py.visualization.footprints package
- pm4py.visualization.graphs package
- pm4py.visualization.heuristics_net package
- pm4py.visualization.network_analysis package
- pm4py.visualization.networkx package
- pm4py.visualization.ocel package
- pm4py.visualization.performance_spectrum package
- pm4py.visualization.petri_net package
- pm4py.visualization.powl package
- pm4py.visualization.process_tree package
- pm4py.visualization.sna package
- pm4py.visualization.transition_system package
- pm4py.visualization.trie package
- Subpackages
Submodules#
pm4py.analysis module#
- pm4py.analysis.construct_synchronous_product_net(trace: Trace, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking) Tuple[PetriNet, Marking, Marking] [source]#
constructs the synchronous product net between a trace and a Petri net process model.
- Parameters:
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') log = pm4py.read_xes('log.xes') sync_net, sync_im, sync_fm = pm4py.construct_synchronous_product_net(log[0], net, im, fm)
Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.
- pm4py.analysis.compute_emd(language1: Dict[List[str], float], language2: Dict[List[str], float]) float [source]#
Computes the earth mover distance between two stochastic languages (for example, the first extracted from the log, and the second extracted from the process model.
- Parameters:
language1 – (first) stochastic language
language2 – (second) stochastic language
- Return type:
float
import pm4py log = pm4py.read_xes('tests/input_data/running-example.xes') language_log = pm4py.get_stochastic_language(log) print(language_log) net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml') language_model = pm4py.get_stochastic_language(net, im, fm) print(language_model) emd_distance = pm4py.compute_emd(language_log, language_model) print(emd_distance)
- pm4py.analysis.solve_marking_equation(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, cost_function: Dict[Transition, float] = None) float [source]#
Solves the marking equation of a Petri net. The marking equation is solved as an ILP problem. An optional transition-based cost function to minimize can be provided as well.
- Parameters:
- Return type:
float
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') heuristic = pm4py.solve_marking_equation(net, im, fm)
- pm4py.analysis.solve_extended_marking_equation(trace: Trace, sync_net: PetriNet, sync_im: Marking, sync_fm: Marking, split_points: List[int] | None = None) float [source]#
Gets an heuristics value (underestimation of the cost of an alignment) between a trace and a synchronous product net using the extended marking equation with the standard cost function (e.g. sync moves get cost equal to 0, invisible moves get cost equal to 1, other move on model / move on log get cost equal to 10000), with an optimal provisioning of the split points
- Parameters:
trace (
Trace
) – tracesync_net (
PetriNet
) – synchronous product netsync_im (
Marking
) – initial marking (of the sync net)sync_fm (
Marking
) – final marking (of the sync net)split_points – if specified, the indexes of the events of the trace to be used as split points. If not specified, the split points are identified automatically.
- Return type:
float
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') log = pm4py.read_xes('log.xes') ext_mark_eq_heu = pm4py.solve_extended_marking_equation(log[0], net, im, fm)
Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.
- pm4py.analysis.check_soundness(petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, print_diagnostics: bool = False) Tuple[bool, Dict[str, Any]] [source]#
Check if a given Petri net is a sound WF-net. A Petri net is a WF-net iff:
it has a unique source place
it has a unique end place
every element in the WF-net is on a path from the source to the sink place
- A WF-net is sound iff:
it contains no live-locks
it contains no deadlocks
we are able to always reach the final marking
For a formal definition of sound WF-net, consider: http://www.padsweb.rwth-aachen.de/wvdaalst/publications/p628.pdf In the returned object, the first element is a boolean indicating if the Petri net is a sound workflow net. The second element is a set of diagnostics collected while running WOFLAN (expressed as a dictionary associating the keys [name of the diagnostics] with the corresponding diagnostics).
- Parameters:
- Return type:
Tuple[bool, Dict[str, Any]]
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') is_sound = pm4py.check_soundness(net, im, fm)
- pm4py.analysis.cluster_log(log: EventLog | EventStream | DataFrame, sklearn_clusterer=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Generator[EventLog, None, None] [source]#
Apply clustering to the provided event log (method based on the extraction of profiles for the traces of the event log) based on a Scikit-Learn clusterer (default: K-means with two clusters)
- Parameters:
log – log object
sklearn_clusterer – the Scikit-Learn clusterer to be used (default: KMeans(n_clusters=2, random_state=0, n_init=”auto”))
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Generator[pd.DataFrame, None, None]
import pm4py for clust_log in pm4py.cluster_log(df): print(clust_log)
- pm4py.analysis.insert_artificial_start_end(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', artificial_start='▶', artificial_end='■') EventLog | DataFrame [source]#
Inserts the artificial start/end activities in an event log / Pandas dataframe
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierartificial_start (
str
) – the symbol to be used as artificial start activityartificial_end (
str
) – the symbol to be used as artificial end activity
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py dataframe = pm4py.insert_artificial_start_end(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.analysis.insert_case_service_waiting_time(log: EventLog | DataFrame, service_time_column: str = '@@service_time', sojourn_time_column: str = '@@sojourn_time', waiting_time_column: str = '@@waiting_time', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame [source]#
Inserts the service/waiting/sojourn times of the case in the dataframe.
- Parameters:
log – event log / Pandas dataframe
service_time_column (
str
) – column to be used for the service timesojourn_time_column (
str
) – column to be used for the sojourn timewaiting_time_column (
str
) – column to be used for the waiting timeactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierstart_timestamp_key (
str
) – attribute to be used as start timestamp
- Return type:
pd.DataFrame
import pm4py dataframe = pm4py.insert_case_service_waiting_time(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
- pm4py.analysis.insert_case_arrival_finish_rate(log: EventLog | DataFrame, arrival_rate_column='@@arrival_rate', finish_rate_column='@@finish_rate', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame [source]#
Inserts the arrival/finish rates of the case in the dataframe. The arrival rate is computed as the difference between the start time of the case and the start time of the previous case to start. The finish rate is computed as the difference between the end time of the case and the end time of the next case to end.
- Parameters:
log – event log / Pandas dataframe
arrival_rate_column (
str
) – column to be used for the arrival ratefinish_rate_column (
str
) – column to be used for the finish rateactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierstart_timestamp_key (
str
) – attribute to be used as start timestamp
- Return type:
pd.DataFrame
import pm4py dataframe = pm4py.insert_case_arrival_finish_rate(dataframe, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
- pm4py.analysis.check_is_workflow_net(net: PetriNet) bool [source]#
Checks if the input Petri net satisfies the WF-net conditions: 1. unique source place 2. unique sink place 3. every node is on a path from the source to the sink
- Parameters:
net (
PetriNet
) – petri net- Return type:
bool
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') is_wfnet = pm4py.check_is_workflow_net(net, im, fm)
- pm4py.analysis.maximal_decomposition(net: PetriNet, im: Marking, fm: Marking) List[Tuple[PetriNet, Marking, Marking]] [source]#
Calculate the maximal decomposition of an accepting Petri net.
- Parameters:
- Return type:
List[Tuple[PetriNet, Marking, Marking]]
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') list_nets = pm4py.maximal_decomposition(net, im, fm) for anet in list_nets: subnet, subim, subfm = anet pm4py.view_petri_net(subnet, subim, subfm, format='svg')
- pm4py.analysis.simplicity_petri_net(net: PetriNet, im: Marking, fm: Marking, variant: str | None = 'arc_degree') float [source]#
Computes the simplicity metric for a given Petri net model.
The three available approaches are: - Arc degree simplicity: described in the paper Vázquez-Barreiros, Borja, Manuel Mucientes, and Manuel Lama. “ProDiGen: Mining complete, precise and minimal structure process models with a genetic algorithm.” Information Sciences 294 (2015): 315-333. - Extended cardoso metric: described in the paper “Complexity Metrics for Workflow Nets” Lassen, Kristian Bisgaard, and Wil MP van der Aalst - Extended cyclomatic metric: described in the paper “Complexity Metrics for Workflow Nets” Lassen, Kristian Bisgaard, and Wil MP van der Aalst
- Parameters:
- Return type:
float
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') simplicity = pm4py.simplicity_petri_net(net, im, fm, variant='arc_degree')
- pm4py.analysis.generate_marking(net: PetriNet, place_or_dct_places: str | Place | Dict[str, int] | Dict[Place, int]) Marking [source]#
Generate a marking for a given Petri net
- Parameters:
net (
PetriNet
) – petri netplace_or_dct_places – place, or dictionary of places, to be used in the marking. Possible values: single Place object for the marking; name of the place for the marking; dictionary associating to each place its number of tokens; dictionary associating to names of places a number of tokens.
- Return type:
Marking
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') marking = pm4py.generate_marking(net, {'source': 2})
- pm4py.analysis.reduce_petri_net_invisibles(net: PetriNet) PetriNet [source]#
Reduce the number of invisibles transitions in the provided Petri net.
- Parameters:
net (
PetriNet
) – petri net- Return type:
PetriNet
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') net = pm4py.reduce_petri_net_invisibles(net)
- pm4py.analysis.reduce_petri_net_implicit_places(net: PetriNet, im: Marking, fm: Marking) Tuple[PetriNet, Marking, Marking] [source]#
Reduce the number of invisibles transitions in the provided Petri net.
- Parameters:
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.read_pnml('model.pnml') net = pm4py.reduce_petri_net_implicit_places(net, im, fm)
- pm4py.analysis.get_enabled_transitions(net: PetriNet, marking: Marking) Set[Transition] [source]#
Gets the transitions enabled in a given marking
- Parameters:
- Return type:
Set[PetriNet.Transition]
import pm4py net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml') # gets the transitions enabled in the initial marking enabled_transitions = pm4py.get_enabled_transitions(net, im)
pm4py.cli module#
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
pm4py.conformance module#
The pm4py.conformance
module contains the conformance checking algorithms implemented in pm4py
- pm4py.conformance.conformance_diagnostics_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False, opt_parameters: Dict[Any, Any] | None = None) List[Dict[str, Any]] [source]#
Apply token-based replay for conformance checking analysis. The methods return the full token-based-replay diagnostics.
Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.
In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.
The output of the token-based replay, stored in the variable replayed_traces, contains for each trace of the log:
trace_is_fit: boolean value (True/False) that is true when the trace is according to the model.
activated_transitions: list of transitions activated in the model by the token-based replay.
reached_marking: marking reached at the end of the replay.
missing_tokens: number of missing tokens.
consumed_tokens: number of consumed tokens.
remaining_tokens: number of remaining tokens.
produced_tokens: number of produced tokens.
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierreturn_diagnostics_dataframe (
bool
) – if possible, returns a dataframe with the diagnostics (instead of the usual output)opt_parameters – optional parameters of the token-based replay, including: * reach_mark_through_hidden: boolean value that decides if we shall try to reach the final marking through hidden transitions * stop_immediately_unfit: boolean value that decides if we shall stop immediately when a non-conformance is detected * walk_through_hidden_trans: boolean value that decides if we shall walk through hidden transitions in order to enable visible transitions * places_shortest_path_by_hidden: shortest paths between places by hidden transitions * is_reduction: expresses if the token-based replay is called in a reduction attempt * thread_maximum_ex_time: alignment threads maximum allowed execution time * cleaning_token_flood: decides if a cleaning of the token flood shall be operated * disable_variants: disable variants grouping * return_object_names: decides whether names instead of object pointers shall be returned
- Return type:
List[Dict[str, Any]]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') tbr_diagnostics = pm4py.conformance_diagnostics_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.conformance_diagnostics_alignments(log: EventLog | DataFrame, *args, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', variant_str: str | None = None, return_diagnostics_dataframe: bool = False, **kwargs) List[Dict[str, Any]] [source]#
Apply the alignments algorithm between a log and a process model. The methods return the full alignment diagnostics.
Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:
Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.
Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.
- Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.
Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.
With each trace, a dictionary containing among the others the following information is associated:
alignment: contains the alignment (sync moves, moves on log, moves on model) cost: contains the cost of the alignment according to the provided cost function fitness: is equal to 1 if the trace is perfectly fitting.
- Parameters:
log – event log
args – specification of the process model
multi_processing (
bool
) – boolean value that enables the multiprocessingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifiervariant_str – variant specification (for Petri net alignments)
return_diagnostics_dataframe (
bool
) – if possible, returns a dataframe with the diagnostics (instead of the usual output)
- Return type:
List[Dict[str, Any]]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') alignments_diagnostics = pm4py.conformance_diagnostics_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.fitness_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, float] [source]#
Calculates the fitness using token-based replay. The fitness is calculated on a log-based level. The output dictionary contains the following keys: - perc_fit_traces (the percentage of fit traces (from 0.0 to 100.0)) - average_trace_fitness (between 0.0 and 1.0; computed as average of the trace fitnesses) - log_fitness (between 0.0 and 1.0) - percentage_of_fitting_traces (the percentage of fit traces (from 0.0 to 100.0)
Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.
In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.
The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.
For token-based replay, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as indicated in the scientific contribution
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[str, float]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') fitness_tbr = pm4py.fitness_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.fitness_alignments(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', variant_str: str | None = None) Dict[str, float] [source]#
Calculates the fitness using alignments The output dictionary contains the following keys: - average_trace_fitness (between 0.0 and 1.0; computed as average of the trace fitnesses) - log_fitness (between 0.0 and 1.0) - percentage_of_fitting_traces (the percentage of fit traces (from 0.0 to 100.0)
Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:
Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.
Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.
- Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.
Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.
The calculation of the replay fitness aim to calculate how much of the behavior in the log is admitted by the process model. We propose two methods to calculate replay fitness, based on token-based replay and alignments respectively.
For alignments, the percentage of traces that are completely fit is returned, along with a fitness value that is calculated as the average of the fitness values of the single traces.
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingmulti_processing (
bool
) – boolean value that enables the multiprocessingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifiervariant_str – variant specification
- Return type:
Dict[str, float]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') fitness_alignments = pm4py.fitness_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.precision_token_based_replay(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float [source]#
Calculates the precision precision using token-based replay
Token-based replay matches a trace and a Petri net model, starting from the initial place, in order to discover which transitions are executed and in which places we have remaining or missing tokens for the given process instance. Token-based replay is useful for Conformance Checking: indeed, a trace is fitting according to the model if, during its execution, the transitions can be fired without the need to insert any missing token. If the reaching of the final marking is imposed, then a trace is fitting if it reaches the final marking without any missing or remaining tokens.
In PM4Py there is an implementation of a token replayer that is able to go across hidden transitions (calculating shortest paths between places) and can be used with any Petri net model with unique visible transitions and hidden transitions. When a visible transition needs to be fired and not all places in the preset are provided with the correct number of tokens, starting from the current marking it is checked if for some place there is a sequence of hidden transitions that could be fired in order to enable the visible transition. The hidden transitions are then fired and a marking that permits to enable the visible transition is reached. The approach is described in: Berti, Alessandro, and Wil MP van der Aalst. “Reviving Token-based Replay: Increasing Speed While Improving Diagnostics.” ATAED@ Petri Nets/ACSD. 2019.
The reference paper for the TBR-based precision (ETConformance) is: Muñoz-Gama, Jorge, and Josep Carmona. “A fresh look at precision in process conformance.” International Conference on Business Process Management. Springer, Berlin, Heidelberg, 2010.
In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
float
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') precision_tbr = pm4py.precision_token_based_replay(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.precision_alignments(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float [source]#
Calculates the precision of the model w.r.t. the event log using alignments
Alignment-based replay aims to find one of the best alignment between the trace and the model. For each trace, the output of an alignment is a list of couples where the first element is an event (of the trace) or » and the second element is a transition (of the model) or ». For each couple, the following classification could be provided:
Sync move: the classification of the event corresponds to the transition label; in this case, both the trace and the model advance in the same way during the replay.
Move on log: for couples where the second element is », it corresponds to a replay move in the trace that is not mimicked in the model. This kind of move is unfit and signal a deviation between the trace and the model.
- Move on model: for couples where the first element is », it corresponds to a replay move in the model that is not mimicked in the trace. For moves on model, we can have the following distinction:
Moves on model involving hidden transitions: in this case, even if it is not a sync move, the move is fit.
Moves on model not involving hidden transitions: in this case, the move is unfit and signals a deviation between the trace and the model.
The reference paper for the alignments-based precision (Align-ETConformance) is: Adriansyah, Arya, et al. “Measuring precision of modeled behavior.” Information systems and e-Business Management 13.1 (2015): 37-67
In this approach, the different prefixes of the log are replayed (whether possible) on the model. At the reached marking, the set of transitions that are enabled in the process model is compared with the set of activities that follow the prefix. The more the sets are different, the more the precision value is low. The more the sets are similar, the more the precision value is high.
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingmulti_processing (
bool
) – boolean value that enables the multiprocessingactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
float
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') precision_alignments = pm4py.precision_alignments(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.generalization_tbr(log: EventLog | DataFrame, petri_net: PetriNet, initial_marking: Marking, final_marking: Marking, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') float [source]#
Computes the generalization of the model (against the event log). The approach is described in the paper:
Buijs, Joos CAM, Boudewijn F. van Dongen, and Wil MP van der Aalst. “Quality dimensions in process discovery: The importance of fitness, precision, generalization and simplicity.” International Journal of Cooperative Information Systems 23.01 (2014): 1440001.
- Parameters:
log – event log
petri_net (
PetriNet
) – petri netinitial_marking (
Marking
) – initial markingfinal_marking (
Marking
) – final markingmulti_processing – boolean value that enables the multiprocessing
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
float
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') generalization_tbr = pm4py.generalization_tbr(dataframe, net, im, fm)
- pm4py.conformance.replay_prefix_tbr(prefix: List[str], net: PetriNet, im: Marking, fm: Marking, activity_key: str = 'concept:name') Marking [source]#
Replays a prefix (list of activities) on a given accepting Petri net, using Token-Based Replay.
- Parameters:
- Return type:
Marking
import pm4py net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml') marking = pm4py.replay_prefix_tbr(['register request', 'check ticket'], net, im, fm)
- pm4py.conformance.conformance_diagnostics_footprints(*args) List[Dict[str, Any]] | Dict[str, Any] [source]#
Provide conformance checking diagnostics using footprints
- Parameters:
args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).
- Return type:
Union[List[Dict[str, Any]], Dict[str, Any]]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') footprints_diagnostics = pm4py.conformance_diagnostics_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release
- pm4py.conformance.fitness_footprints(*args) Dict[str, float] [source]#
Calculates fitness using footprints. The output is a dictionary containing two keys: - perc_fit_traces => percentage of fit traces (over the log) - log_fitness => the fitness value over the log
- Parameters:
args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).
- Return type:
Dict[str, float]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') fitness_fp = pm4py.fitness_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release
- pm4py.conformance.precision_footprints(*args) float [source]#
Calculates precision using footprints
- Parameters:
args – provided arguments (the first argument is supposed to be an event log (or the footprints discovered from the event log); the other arguments are supposed to be the process model (or the footprints discovered from the process model).
- Return type:
float
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') precision_fp = pm4py.precision_footprints(dataframe, net, im, fm, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
Deprecated since version 2.3.0: This will be removed in 3.0.0. conformance checking using footprints will not be exposed in a future release
- pm4py.conformance.check_is_fitting(*args, activity_key='concept:name') bool [source]#
Checks if a trace object is fit against a process model
- Parameters:
args – arguments (trace object; process model (process tree, petri net, BPMN))
- Return type:
bool
Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.
- pm4py.conformance.conformance_temporal_profile(log: EventLog | DataFrame, temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], zeta: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[List[Tuple[float, float, float, float]]] [source]#
Performs conformance checking on the provided log with the provided temporal profile. The result is a list of time-based deviations for every case. E.g. if the log on top of which the conformance is applied is the following (1 case): A (timestamp: 2000-01) B (timestamp: 2002-01) The difference between the timestamps of A and B is two years. If the temporal profile: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)} is specified, and zeta is set to 1, then the aforementioned case would be deviating (considering the couple of activities (‘A’, ‘B’)), because 2 years > 1.5 months + 0.5 months.
- Parameters:
log – log object
temporal_profile – temporal profile. E.g., if the log has two cases: A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06); A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03); The temporal profile will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}
zeta (
float
) – number of standard deviations allowed from the average. E.g. zeta=1 allows every timestamp between AVERAGE-STDEV and AVERAGE+STDEV.activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierreturn_diagnostics_dataframe (
bool
) – if possible, returns a dataframe with the diagnostics (instead of the usual output)
- Return type:
List[List[Tuple[float, float, float, float]]]
import pm4py temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') conformance_temporal_profile = pm4py.conformance_temporal_profile(dataframe, temporal_profile, zeta=1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.conformance.conformance_declare(log: EventLog | DataFrame, declare_model: Dict[str, Dict[Any, Dict[str, int]]], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Dict[str, Any]] [source]#
Applies conformance checking against a DECLARE model.
Reference paper: F. M. Maggi, A. J. Mooij and W. M. P. van der Aalst, “User-guided discovery of declarative process models,” 2011 IEEE Symposium on Computational Intelligence and Data Mining (CIDM), Paris, France, 2011, pp. 192-199, doi: 10.1109/CIDM.2011.5949297.
- Parameters:
log – event log
declare_model – DECLARE model
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierreturn_diagnostics_dataframe (
bool
) – if possible, returns a dataframe with the diagnostics (instead of the usual output)
- Return type:
List[Dict[str, Any]]
import pm4py log = pm4py.read_xes("C:/receipt.xes") declare_model = pm4py.discover_declare(log) conf_result = pm4py.conformance_declare(log, declare_model)
- pm4py.conformance.conformance_log_skeleton(log: EventLog | DataFrame, log_skeleton: Dict[str, Any], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', return_diagnostics_dataframe: bool = False) List[Set[Any]] [source]#
Performs conformance checking using the log skeleton
Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).
A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,
‘A should be directly followed by B’ and ‘B should be directly followed by C’.
- “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before
in the history of the case. For example, ‘C should always be preceded by A’
- “always_after”: specifies that some activities should always trigger the execution of some other activities
in the future history of the case. For example, ‘A should always be followed by C’
- “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside
a case. For example, ‘B and C should always happen the same number of times’.
- “never_together”: specifies that a given couple of activities should never happen together in the history of the case.
For example, ‘there should be no case containing both C and D’.
- “activ_occurrences”: specifies the allowed number of occurrences per activity:
E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.
- Parameters:
log – log object
log_skeleton – log skeleton object, expressed as dictionaries of the six constraints (never_together, always_before …) along with the discovered rules.
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierreturn_diagnostics_dataframe (
bool
) – if possible, returns a dataframe with the diagnostics (instead of the usual output)
- Return type:
List[Set[Any]]
import pm4py log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp') conformance_lsk = pm4py.conformance_log_skeleton(dataframe, log_skeleton, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
pm4py.connectors module#
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
- pm4py.connectors.extract_log_outlook_mails() DataFrame [source]#
Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer.
CASE ID (case:concept:name) => identifier of the conversation ACTIVITY (concept:name) => activity that is performed in the current item (send e-mail, receive e-mail,
refuse meeting …)
TIMESTAMP (time:timestamp) => timestamp of creation of the item in Outlook RESOURCE (org:resource) => sender of the current item
See also: * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.mailitem?redirectedfrom=MSDN&view=outlook-pia#properties_ * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.olobjectclass?view=outlook-pia
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_outlook_calendar(email_user: str | None = None, calendar_id: int = 9) DataFrame [source]#
Extracts the history of the calendar events (creation, update, start, end) in a Pandas dataframe from the local Outlook instance running on the current computer.
CASE ID (case:concept:name) => identifier of the meeting ACTIVITY (concept:name) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (time:timestamp) => the timestamp of the event case:subject => the subject of the meeting
- Parameters:
email_user – (optional) e-mail address from which the (shared) calendar should be extracted
calendar_id (
int
) – identifier of the calendar for the given user (default: 9)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_windows_events() DataFrame [source]#
Extract a process mining dataframe from all the events recorded in the Windows registry.
CASE ID (case:concept:name) => name of the computer emitting the events. ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier
TIMESTAMP (time:timestamp) => timestamp of generation of the event RESOURCE (org:resource) => username involved in the event
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_chrome_history(history_db_path: str | None = None) DataFrame [source]#
Extracts a dataframe containing the navigation history of Google Chrome. Please keep Google Chrome history closed when extracting.
CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit
- Parameters:
history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_firefox_history(history_db_path: str | None = None) DataFrame [source]#
Extracts a dataframe containing the navigation history of Mozilla Firefox. Please keep Google Chrome history closed when extracting.
CASE ID (case:concept:name) => an identifier of the profile that has been extracted ACTIVITY (concept:name) => the complete path of the website, minus the GET arguments TIMESTAMP (time:timestamp) => the timestamp of visit
- Parameters:
history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: str | None = None) DataFrame [source]#
Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.
- Parameters:
owner (
str
) – owner of the repository (e.g., pm4py)repo (
str
) – name of the repository (e.g., pm4py-core)auth_token – authorization token
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_camunda_workflow(connection_string: str) DataFrame [source]#
Extracts a dataframe from the Camunda workflow system. Aside from the traditional columns, the processID of the process in Camunda is returned.
- Parameters:
connection_string (
str
) – ODBC connection string to the Camunda database- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_sap_o2c(connection_string: str, prefix: str = '') DataFrame [source]#
Extracts a dataframe for the SAP O2C process.
- Parameters:
connection_string (
str
) – ODBC connection string to the SAP databaseprefix (
str
) – prefix for the tables (example: SAPSR3.)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_log_sap_accounting(connection_string: str, prefix: str = '') DataFrame [source]#
Extracts a dataframe for the SAP Accounting process.
- Parameters:
connection_string (
str
) – ODBC connection string to the SAP databaseprefix (
str
) – prefix for the tables (example: SAPSR3.)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_ocel_outlook_mails() OCEL [source]#
Extracts the history of the conversations from the local instance of Microsoft Outlook running on the current computer as an object-centric event log.
- ACTIVITY (ocel:activity) => activity that is performed in the current item (send e-mail, receive e-mail,
refuse meeting …)
TIMESTAMP (ocel:timestamp) => timestamp of creation of the item in Outlook
Object types: - org:resource => the snder of the mail - recipients => the list of recipients of the mail - topic => the topic of the discussion
See also: * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.mailitem?redirectedfrom=MSDN&view=outlook-pia#properties_ * https://learn.microsoft.com/en-us/dotnet/api/microsoft.office.interop.outlook.olobjectclass?view=outlook-pia
- Return type:
OCEL
- pm4py.connectors.extract_ocel_outlook_calendar(email_user: str | None = None, calendar_id: int = 9) OCEL [source]#
Extracts the history of the calendar events (creation, update, start, end) as an object-centric event log from the local Outlook instance running on the current computer.
ACTIVITY (ocel:activity) => one between: Meeting Created, Last Change of Meeting, Meeting Started, Meeting Completed TIMESTAMP (ocel:timestamp) => the timestamp of the event
Object types: - case:concept:name => identifier of the meeting - case:subject => the subject of the meeting
- Parameters:
email_user – (optional) e-mail address from which the (shared) calendar should be extracted
calendar_id (
int
) – identifier of the calendar for the given user (default: 9)
- Return type:
OCEL
- pm4py.connectors.extract_ocel_windows_events() OCEL [source]#
Extract a process mining dataframe from all the events recorded in the Windows registry as an object-centric event log.
- ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier
(see https://learn.microsoft.com/en-us/previous-versions/windows/desktop/eventlogprov/win32-ntlogevent)
TIMESTAMP (time:timestamp) => timestamp of generation of the event
Object types: - categoryString: translation of the subcategory. The translation is source-specific. - computerName: name of the computer that generated this event. - eventIdentifier: identifier of the event. This is specific to the source that generated the event log entry. - eventType: 1=Error; 2=Warning; 3=Information; 4=Security Audit Success;5=Security Audit Failure; - sourceName: name of the source (application, service, driver, or subsystem) that generated the entry. - user: user name of the logged-on user when the event occurred. If the user name cannot be determined, this will be NULL.
- Return type:
OCEL
- pm4py.connectors.extract_ocel_chrome_history(history_db_path: str | None = None) OCEL [source]#
Extracts an object-centric event log containing the navigation history of Google Chrome. Please keep Google Chrome history closed when extracting.
ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit
Object Types: - case:concept:name : the profile of Chrome that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited
- Parameters:
history_db_path – path to the history DB path of Google Chrome (default: position of the Windows folder)
- Return type:
OCEL
- pm4py.connectors.extract_ocel_firefox_history(history_db_path: str | None = None) OCEL [source]#
Extracts an object-centric event log containing the navigation history of Mozilla Firefox. Please keep Mozilla Firefox history closed when extracting.
ACTIVITY (ocel:activity) => the complete path of the website, minus the GET arguments TIMESTAMP (ocel:timestamp) => the timestamp of visit
Object Types: - case:concept:name : the profile of Firefox that is used to visit the site - complete_url: the complete URL of the website - url_wo_parameters: complete URL minus the part after ? - domain: the domain of the website that is visited
- Parameters:
history_db_path – path to the history DB path of Mozilla Firefox (default: position of the Windows folder)
- Return type:
OCEL
- pm4py.connectors.extract_ocel_github(owner: str = 'pm4py', repo: str = 'pm4py-core', auth_token: str | None = None) OCEL [source]#
Extracts a dataframe containing the history of the issues of a Github repository. According to the API limit rate of public/registered users, only a part of the events can be returned.
ACTIVITY (ocel:activity) => the event (created, commented, closed, subscribed …) TIMESTAMP (ocel:timestamp) => the timestamp of execution of the event
Object types: - case:concept:name => the URL of the events related to the issue - org:resource => the involved resource - case:repo => the repository in which the issue is created
- Parameters:
owner (
str
) – owner of the repository (e.g., pm4py)repo (
str
) – name of the repository (e.g., pm4py-core)auth_token – authorization token
- Return type:
OCEL
- pm4py.connectors.extract_ocel_camunda_workflow(connection_string: str) OCEL [source]#
Extracts an object-centric event log from the Camunda workflow system.
- Parameters:
connection_string (
str
) – ODBC connection string to the Camunda database- Return type:
pd.DataFrame
- pm4py.connectors.extract_ocel_sap_o2c(connection_string: str, prefix: str = '') OCEL [source]#
Extracts an object-centric event log for the SAP O2C process.
- Parameters:
connection_string (
str
) – ODBC connection string to the SAP databaseprefix (
str
) – prefix for the tables (example: SAPSR3.)
- Return type:
pd.DataFrame
- pm4py.connectors.extract_ocel_sap_accounting(connection_string: str, prefix: str = '') OCEL [source]#
Extracts an object-centric event log for the SAP Accounting process.
- Parameters:
connection_string (
str
) – ODBC connection string to the SAP databaseprefix (
str
) – prefix for the tables (example: SAPSR3.)
- Return type:
pd.DataFrame
pm4py.convert module#
The pm4py.convert
module contains the cross-conversions implemented in pm4py
- pm4py.convert.convert_to_event_log(obj: DataFrame | EventStream, case_id_key: str = 'case:concept:name', **kwargs) EventLog [source]#
Converts a DataFrame/EventStream object to an event log object
- Parameters:
obj – DataFrame or EventStream object
case_id_key (
str
) – attribute to be used as case identifier
- Return type:
EventLog
import pandas as pd import pm4py dataframe = pm4py.read_csv("tests/input_data/running-example.csv") dataframe = pm4py.format_dataframe(dataframe, case_id_column='case:concept:name', activity_column='concept:name', timestamp_column='time:timestamp') log = pm4py.convert_to_event_log(dataframe)
- pm4py.convert.convert_to_event_stream(obj: EventLog | DataFrame, case_id_key: str = 'case:concept:name', **kwargs) EventStream [source]#
Converts a log object to an event stream
- Parameters:
obj – log object
case_id_key (
str
) – attribute to be used as case identifier
- Return type:
EventStream
import pm4py log = pm4py.read_xes("tests/input_data/running-example.xes") event_stream = pm4py.convert_to_event_stream(log)
- pm4py.convert.convert_to_dataframe(obj: EventStream | EventLog, **kwargs) DataFrame [source]#
Converts a log object to a dataframe
- Parameters:
obj – log object
- Return type:
pd.DataFrame
import pm4py log = pm4py.read_xes("tests/input_data/running-example.xes") dataframe = pm4py.convert_to_dataframe(log)
- pm4py.convert.convert_to_bpmn(*args: Tuple[PetriNet, Marking, Marking] | ProcessTree) BPMN [source]#
Converts an object to a BPMN diagram. As an input, either a Petri net (with corresponding initial and final marking) or a process tree can be provided. A process tree can always be converted into a BPMN model and thus quality of the result object is guaranteed. For Petri nets, the quality of the converison largely depends on the net provided (e.g., sound WF-nets are likely to produce reasonable BPMN models)
- Parameters:
args – petri net (with initial and final marking) or process tree
- Return type:
BPMN
import pm4py # import a Petri net from a file net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml") bpmn_graph = pm4py.convert_to_bpmn(net, im, fm)
- pm4py.convert.convert_to_petri_net(*args: BPMN | ProcessTree | HeuristicsNet | POWL | dict) Tuple[PetriNet, Marking, Marking] [source]#
Converts an input model to an (accepting) Petri net. The input objects can either be a process tree, BPMN model or a Heuristic net. The output is a triple, containing the Petri net and the initial and final markings. The markings are only returned if they can be reasonable derived from the input model.
- Parameters:
args – process tree, Heuristics net, BPMN or POWL model
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py # imports a process tree from a PTML file process_tree = pm4py.read_ptml("tests/input_data/running-example.ptml") net, im, fm = pm4py.convert_to_petri_net(process_tree)
- pm4py.convert.convert_to_process_tree(*args: Tuple[PetriNet, Marking, Marking] | BPMN) ProcessTree [source]#
Converts an input model to a process tree. The input models can either be Petri nets (marked) or BPMN models. For both input types, the conversion is not guaranteed to work, hence, invocation of the method can yield an Exception.
- Parameters:
args – petri net (along with initial and final marking) or BPMN
- Return type:
ProcessTree
import pm4py # imports a BPMN file bpmn_graph = pm4py.read_bpmn("tests/input_data/running-example.bpmn") # converts the BPMN to a process tree (through intermediate conversion to a Petri net) process_tree = pm4py.convert_to_process_tree(bpmn_graph)
- pm4py.convert.convert_to_reachability_graph(*args: Tuple[PetriNet, Marking, Marking] | BPMN | ProcessTree) TransitionSystem [source]#
Converts an input model to a reachability graph (transition system). The input models can either be Petri nets (with markings), BPMN models or process trees. The output is the state-space of the model (i.e., the reachability graph), enocdoed as a
TransitionSystem
object.- Parameters:
args – petri net (along with initial and final marking), process tree or BPMN
- Return type:
TransitionSystem
import pm4py # reads a Petri net from a file net, im, fm = pm4py.read_pnml("tests/input_data/running-example.pnml") # converts it to reachability graph reach_graph = pm4py.convert_to_reachability_graph(net, im, fm)
- pm4py.convert.convert_log_to_ocel(log: EventLog | EventStream | DataFrame, activity_column: str = 'concept:name', timestamp_column: str = 'time:timestamp', object_types: Collection[str] | None = None, obj_separator: str = ' AND ', additional_event_attributes: Collection[str] | None = None, additional_object_attributes: Dict[str, Collection[str]] | None = None) OCEL [source]#
Converts an event log to an object-centric event log with one or more than one object types.
- Parameters:
log_obj – log object
activity_column (
str
) – activity columntimestamp_column (
str
) – timestamp columnobject_types – list of columns to consider as object types
obj_separator (
str
) – separator between different objects in the same columnadditional_event_attributes – additional attributes to be considered as event attributes in the OCEL
additional_object_attributes – additional attributes per object type to be considered as object attributes in the OCEL (dictionary in which object types are associated to their attributes, i.e., {“order”: [“quantity”, “cost”], “invoice”: [“date”, “due date”]})
- Return type:
OCEL
- pm4py.convert.convert_ocel_to_networkx(ocel: OCEL, variant: str = 'ocel_to_nx') DiGraph [source]#
Converts an OCEL to a NetworkX DiGraph object.
- Parameters:
ocel (
OCEL
) – object-centric event logvariant (
str
) – variant of the conversion to use: “ocel_to_nx” -> graph containing event and object IDS and two type of relations (REL=related objects, DF=directly-follows); “ocel_features_to_nx” -> graph containing different types of interconnection at the object level
- Return type:
nx.DiGraph
- pm4py.convert.convert_log_to_networkx(log: EventLog | EventStream | DataFrame, include_df: bool = True, case_id_key: str = 'concept:name', other_case_attributes_as_nodes: Collection[str] | None = None, event_attributes_as_nodes: Collection[str] | None = None) DiGraph [source]#
Converts an event log object to a NetworkX DiGraph object. The nodes of the graph are the events, the cases (and possibly the attributes of the log). The edges are: - Connecting each event to the corresponding case (BELONGS_TO type) - Connecting every event to the directly-following one (DF type, if enabled) - Connecting every case/event to the given attribute values (ATTRIBUTE_EDGE type)
- Parameters:
log – log object (EventLog, EventStream, Pandas dataframe)
include_df (
bool
) – include the directly-follows graph relation in the graph (bool)case_id_attribute – specify which attribute at the case level should be considered the case ID (str)
other_case_attributes_as_nodes – specify which attributes at the case level should be inserted in the graph as nodes (other than the caseID) (list, default empty)
event_attributes_as_nodes – specify which attributes at the event level should be inserted in the graph as nodes (list, default empty)
- Return type:
nx.DiGraph
- pm4py.convert.convert_log_to_time_intervals(log: EventLog | DataFrame, filter_activity_couple: Tuple[str, str] | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') List[List[Any]] [source]#
Gets a list of intervals from an event log. Each interval contains two temporally consecutive events and measures the time between the two events (complete timestamp of the first against start timestamp of the second).
- Parameters:
log – log object
filter_activity_couple – (optional) filters the intervals to only consider a given couple of activities of the log
activity_key (
str
) – the attribute to be used as activitytimestamp_key (
str
) – the attribute to be used as timestampcase_id_key (
str
) – the attribute to be used as case identifierstart_timestamp_key (
str
) – the attribute to be used as start timestamp
- Return type:
List[List[Any]]
import pm4py log = pm4py.read_xes('tests/input_data/receipt.xes') time_intervals = pm4py.convert_log_to_time_intervals(log) print(len(time_intervals)) time_intervals = pm4py.convert_log_to_time_intervals(log, ('Confirmation of receipt', 'T02 Check confirmation of receipt')) print(len(time_intervals))
- pm4py.convert.convert_petri_net_to_networkx(net: PetriNet, im: Marking, fm: Marking) DiGraph [source]#
Converts a Petri net to a NetworkX DiGraph. Each place and transition is corresponding to a node in the graph.
pm4py.discovery module#
The pm4py.discovery
module contains the process discovery algorithms implemented in pm4py
- pm4py.discovery.discover_dfg(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict] [source]#
Discovers a Directly-Follows Graph (DFG) from a log.
This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the frequency of relation as value.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[dict, dict, dict]
import pm4py dfg, start_activities, end_activities = pm4py.discover_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_directly_follows_graph(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict] [source]#
- pm4py.discovery.discover_dfg_typed(log: DataFrame, case_id_key: str = 'case:concept:name', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') DirectlyFollowsGraph [source]#
Discovers a Directly-Follows Graph (DFG) from a log.
This method returns a typed DFG object, i.e., as specified in
pm4py.objects.dfg.obj.py
(DirectlyFollowsGraph
Class) The DFG object describes a graph, start activities and end activities. The graph is a collection of triples of the form (a,b,f) representing an arc a->b with frequency f. The start activities are a collection of tuples of the form (a,f) representing that activity a starts f cases. The end activities are a collection of tuples of the form (a,f) representing that ativity a ends f cases.This method replaces
pm4py.discover_dfg
andpm4py.discover_directly_follows_graph
. In a future release, these functions will adopt the same behavior as this function.- Parameters:
log (
DataFrame
) –pandas.DataFrame
case_id_key (
str
) – attribute to be used as case identifieractivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestamp
- Return type:
DFG
import pm4py dfg = pm4py.discover_dfg_typed(log, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_performance_dfg(log: EventLog | DataFrame, business_hours: bool = False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)], workcalendar=None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[dict, dict, dict] [source]#
Discovers a performance directly-follows graph from an event log.
This method returns a dictionary with the couples of directly-following activities (in the log) as keys and the performance of relation as value.
- Parameters:
log – event log / Pandas dataframe
business_hours (
bool
) – enables/disables the computation based on the business hours (default: False)business_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[dict, dict, dict]
import pm4py performance_dfg, start_activities, end_activities = pm4py.discover_performance_dfg(dataframe, case_id_key='case:concept:name', activity_key='concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_petri_net_alpha(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking] [source]#
Discovers a Petri net using the Alpha Miner.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.discover_petri_net_alpha(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_petri_net_ilp(log: EventLog | DataFrame, alpha: float = 1.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking] [source]#
Discovers a Petri net using the ILP Miner.
- Parameters:
log – event log / Pandas dataframe
alpha (
float
) – noise threshold for the sequence encoding graph (1.0=no filtering, 0.0=greatest filtering)activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.discover_petri_net_ilp(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_petri_net_alpha_plus(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking] [source]#
Discovers a Petri net using the Alpha+ algorithm
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.discover_petri_net_alpha_plus(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
Deprecated since version 2.3.0: This will be removed in 3.0.0. this method will be removed in a future release.
- pm4py.discovery.discover_petri_net_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, multi_processing: bool = False, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) Tuple[PetriNet, Marking, Marking] [source]#
Discovers a Petri net using the inductive miner algorithm.
The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.
Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).
- Parameters:
log – event log / Pandas dataframe / typed DFG
noise_threshold (
float
) – noise threshold (default: 0.0)multi_processing (
bool
) – boolean that enables/disables multiprocessing in inductive mineractivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierdisable_fallthroughs (
bool
) – disable the Inductive Miner fall-throughs
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.discover_petri_net_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_petri_net_heuristics(log: EventLog | DataFrame, dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Tuple[PetriNet, Marking, Marking] [source]#
Discover a Petri net using the Heuristics Miner
Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).
- Parameters:
log – event log / Pandas dataframe
dependency_threshold (
float
) – dependency threshold (default: 0.5)and_threshold (
float
) – AND threshold (default: 0.65)loop_two_threshold (
float
) – loop two threshold (default: 0.5)activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Tuple[PetriNet, Marking, Marking]
import pm4py net, im, fm = pm4py.discover_petri_net_heuristics(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_process_tree_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) ProcessTree [source]#
Discovers a process tree using the inductive miner algorithm
The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.
Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).
- Parameters:
log – event log / Pandas dataframe / typed DFG
noise_threshold (
float
) – noise threshold (default: 0.0)activity_key (
str
) – attribute to be used for the activitymulti_processing (
bool
) – boolean that enables/disables multiprocessing in inductive minertimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierdisable_fallthroughs (
bool
) – disable the Inductive Miner fall-throughs
- Return type:
ProcessTree
import pm4py process_tree = pm4py.discover_process_tree_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_heuristics_net(log: EventLog | DataFrame, dependency_threshold: float = 0.5, and_threshold: float = 0.65, loop_two_threshold: float = 0.5, min_act_count: int = 1, min_dfg_occurrences: int = 1, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', decoration: str = 'frequency') HeuristicsNet [source]#
Discovers an heuristics net
Heuristics Miner is an algorithm that acts on the Directly-Follows Graph, providing way to handle with noise and to find common constructs (dependency between two activities, AND). The output of the Heuristics Miner is an Heuristics Net, so an object that contains the activities and the relationships between them. The Heuristics Net can be then converted into a Petri net. The paper can be visited by clicking on the upcoming link: this link).
- Parameters:
log – event log / Pandas dataframe
dependency_threshold (
float
) – dependency threshold (default: 0.5)and_threshold (
float
) – AND threshold (default: 0.65)loop_two_threshold (
float
) – loop two threshold (default: 0.5)min_act_count (
int
) – minimum number of occurrences per activity in order to be included in the discoverymin_dfg_occurrences (
int
) – minimum number of occurrences per arc in the DFG in order to be included in the discoveryactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierdecoration (
str
) – the decoration that should be used (frequency, performance)
- Return type:
HeuristicsNet
import pm4py heu_net = pm4py.discover_heuristics_net(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.derive_minimum_self_distance(log: DataFrame | EventLog | EventStream, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, int] [source]#
This algorithm computes the minimum self-distance for each activity observed in an event log. The self distance of a in <a> is infinity, of a in <a,a> is 0, in <a,b,a> is 1, etc. The activity key ‘concept:name’ is used.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[str, int]
import pm4py msd = pm4py.derive_minimum_self_distance(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_footprints(*args: EventLog | Tuple[PetriNet, Marking, Marking] | ProcessTree) List[Dict[str, Any]] | Dict[str, Any] [source]#
Discovers the footprints out of the provided event log / process model
- Parameters:
args – event log / process model
- Return type:
Union[List[Dict[str, Any]], Dict[str, Any]]
import pm4py footprints = pm4py.discover_footprints(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_eventually_follows_graph(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], int] [source]#
Gets the eventually follows graph from a log object.
The eventually follows graph is a dictionary associating to every couple of activities which are eventually following each other the number of occurrences of this relation.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[Tuple[str, str], int]
import pm4py efg = pm4py.discover_eventually_follows_graph(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_bpmn_inductive(log: EventLog | DataFrame | DirectlyFollowsGraph, noise_threshold: float = 0.0, multi_processing: bool = False, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', disable_fallthroughs: bool = False) BPMN [source]#
Discovers a BPMN using the Inductive Miner algorithm
The basic idea of Inductive Miner is about detecting a ‘cut’ in the log (e.g. sequential cut, parallel cut, concurrent cut and loop cut) and then recur on sublogs, which were found applying the cut, until a base case is found. The Directly-Follows variant avoids the recursion on the sublogs but uses the Directly Follows graph.
Inductive miner models usually make extensive use of hidden transitions, especially for skipping/looping on a portion on the model. Furthermore, each visible transition has a unique label (there are no transitions in the model that share the same label).
- Parameters:
log – event log / Pandas dataframe / typed DFG
noise_threshold (
float
) – noise threshold (default: 0.0)multi_processing (
bool
) – boolean that enables/disables multiprocessing in inductive mineractivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierdisable_fallthroughs (
bool
) – disable the Inductive Miner fall-throughs
- Return type:
BPMN
import pm4py bpmn_graph = pm4py.discover_bpmn_inductive(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_transition_system(log: EventLog | DataFrame, direction: str = 'forward', window: int = 2, view: str = 'sequence', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') TransitionSystem [source]#
Discovers a transition system as described in the process mining book “Process Mining: Data Science in Action”
- Parameters:
log – event log / Pandas dataframe
direction (
str
) – direction in which the transition system is built (forward, backward)window (
int
) – window (2, 3, …)view (
str
) – view to use in the construction of the states (sequence, set, multiset)activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
TransitionSystem
import pm4py transition_system = pm4py.discover_transition_system(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_prefix_tree(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Trie [source]#
Discovers a prefix tree from the provided log object.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Trie
import pm4py prefix_tree = pm4py.discover_prefix_tree(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_temporal_profile(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[Tuple[str, str], Tuple[float, float]] [source]#
Discovers a temporal profile from a log object.
Implements the approach described in: Stertz, Florian, Jürgen Mangler, and Stefanie Rinderle-Ma. “Temporal Conformance Checking at Runtime based on Time-infused Process Models.” arXiv preprint arXiv:2008.07262 (2020).
The output is a dictionary containing, for every couple of activities eventually following in at least a case of the log, the average and the standard deviation of the difference of the timestamps.
E.g. if the log has two cases:
A (timestamp: 1980-01) B (timestamp: 1980-03) C (timestamp: 1980-06) A (timestamp: 1990-01) B (timestamp: 1990-02) D (timestamp: 1990-03)
The returned dictionary will contain: {(‘A’, ‘B’): (1.5 months, 0.5 months), (‘A’, ‘C’): (5 months, 0), (‘A’, ‘D’): (2 months, 0)}
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[Tuple[str, str], Tuple[float, float]]
import pm4py temporal_profile = pm4py.discover_temporal_profile(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_log_skeleton(log: EventLog | DataFrame, noise_threshold: float = 0.0, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Any] [source]#
Discovers a log skeleton from an event log.
A log skeleton is a declarative model which consists of six different constraints: - “directly_follows”: specifies for some activities some strict bounds on the activities directly-following. For example,
‘A should be directly followed by B’ and ‘B should be directly followed by C’.
- “always_before”: specifies that some activities may be executed only if some other activities are executed somewhen before
in the history of the case. For example, ‘C should always be preceded by A’
- “always_after”: specifies that some activities should always trigger the execution of some other activities
in the future history of the case. For example, ‘A should always be followed by C’
- “equivalence”: specifies that a given couple of activities should happen with the same number of occurrences inside
a case. For example, ‘B and C should always happen the same number of times’.
- “never_together”: specifies that a given couple of activities should never happen together in the history of the case.
For example, ‘there should be no case containing both C and D’.
- “activ_occurrences”: specifies the allowed number of occurrences per activity:
E.g. A is allowed to be executed 1 or 2 times, B is allowed to be executed 1 or 2 or 3 or 4 times.
Reference paper: Verbeek, H. M. W., and R. Medeiros de Carvalho. “Log skeletons: A classification approach to process discovery.” arXiv preprint arXiv:1806.08247 (2018).
- Parameters:
log – event log / Pandas dataframe
noise_threshold (
float
) – noise threshold, acting as described in the paper.activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[str, Any]
import pm4py log_skeleton = pm4py.discover_log_skeleton(dataframe, noise_threshold=0.1, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.discovery.discover_declare(log: EventLog | DataFrame, allowed_templates: Set[str] | None = None, considered_activities: Set[str] | None = None, min_support_ratio: float | None = None, min_confidence_ratio: float | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') Dict[str, Dict[Any, Dict[str, int]]] [source]#
Discovers a DECLARE model from an event log.
Reference paper: F. M. Maggi, A. J. Mooij and W. M. P. van der Aalst, “User-guided discovery of declarative process models,” 2011 IEEE Symposium on Computational Intelligence and Data Mining (CIDM), Paris, France, 2011, pp. 192-199, doi: 10.1109/CIDM.2011.5949297.
- Parameters:
log – event log / Pandas dataframe
allowed_templates – (optional) collection of templates to consider for the discovery
considered_activities – (optional) collection of activities to consider for the discovery
min_support_ratio – (optional, decided automatically otherwise) minimum percentage of cases (over the entire set of cases of the log) for which the discovered rules apply
min_confidence_ratio – (optional, decided automatically otherwise) minimum percentage of cases (over the rule’s support) for which the discovered rules are valid
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Dict[str, Any]
import pm4py declare_model = pm4py.discover_declare(log)
- pm4py.discovery.discover_powl(log: EventLog | DataFrame, variant=POWLDiscoveryVariant.MAXIMAL, filtering_weight_factor: float = 0.0, order_graph_filtering_threshold: float = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') POWL [source]#
Discovers a POWL model from an event log.
Reference paper: Kourani, Humam, and Sebastiaan J. van Zelst. “POWL: partially ordered workflow language.” International Conference on Business Process Management. Cham: Springer Nature Switzerland, 2023.
- Parameters:
log – event log / Pandas dataframe
variant – variant of the algorithm
filtering_weight_factor (
float
) – accepts values 0 <= x < 1order_graph_filtering_threshold (
float
) – accepts values 0.5 < x <= 1activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
POWL
import pm4py log = pm4py.read_xes('tests/input_data/receipt.xes') powl_model = pm4py.discover_powl(log, activity_key='concept:name') print(powl_model)
- pm4py.discovery.discover_batches(log: EventLog | DataFrame, merge_distance: int = 900, min_batch_size: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') List[Tuple[Tuple[str, str], int, Dict[str, Any]]] [source]#
Discover batches from the provided log object
We say that an activity is executed in batches by a given resource when the resource executes several times the same activity in a short period of time.
Identifying such activities may identify points of the process that can be automated, since the activity of the person may be repetitive.
The following categories of batches are detected: - Simultaneous (all the events in the batch have identical start and end timestamps) - Batching at start (all the events in the batch have identical start timestamp) - Batching at end (all the events in the batch have identical end timestamp) - Sequential batching (for all the consecutive events, the end of the first is equal to the start of the second) - Concurrent batching (for all the consecutive events that are not sequentially matched)
The approach has been described in the following paper: Martin, N., Swennen, M., Depaire, B., Jans, M., Caris, A., & Vanhoof, K. (2015, December). Batch Processing: Definition and Event Log Identification. In SIMPDA (pp. 137-140).
- The output is a (sorted) list containing tuples. Each tuple contain:
Index 0: the activity-resource for which at least one batch has been detected
Index 1: the number of batches for the given activity-resource
- Index 2: a list containing all the batches. Each batch is described by:
# The start timestamp of the batch # The complete timestamp of the batch # The list of events that are executed in the batch
- Parameters:
log – event log / Pandas dataframe
merge_distance (
int
) – the maximum time distance between non-overlapping intervals in order for them to be considered belonging to the same batch (default: 15*60 15 minutes)min_batch_size (
int
) – the minimum number of events for a batch to be considered (default: 2)activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierresource_key (
str
) – attribute to be used as resource
- Return type:
List[Tuple[Tuple[str, str], int, Dict[str, Any]]]
import pm4py batches = pm4py.discover_log_skeleton(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp', resource_key='org:resource')
pm4py.filtering module#
The pm4py.filtering
module contains the filtering features offered in pm4py
- pm4py.filtering.filter_log_relative_occurrence_event_attribute(log: EventLog | DataFrame, min_relative_stake: float, attribute_key: str = 'concept:name', level='cases', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the event log keeping only the events having an attribute value which occurs: - in at least the specified (min_relative_stake) percentage of events, when level=”events” - in at least the specified (min_relative_stake) percentage of cases, when level=”cases”
- Parameters:
log – event log / Pandas dataframe
min_relative_stake (
float
) – minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.attribute_key (
str
) – the attribute to filterlevel (
str
) – the level of the filter (if level=”events”, then events / if level=”cases”, then cases)timestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_log_relative_occurrence_event_attribute(dataframe, 0.5, level='cases', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_start_activities(log: EventLog | DataFrame, activities: Set[str] | List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter cases having a start activity in the provided list
- Parameters:
log – event log / Pandas dataframe
activities – collection of start activities
retain (
bool
) – if True, we retain the traces containing the given start activities, if false, we drop the tracesactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_start_activities(dataframe, ['Act. A'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_end_activities(log: EventLog | DataFrame, activities: Set[str] | List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter cases having an end activity in the provided list
- Parameters:
log – event log / Pandas dataframe
activities – collection of end activities
retain (
bool
) – if True, we retain the traces containing the given end activities, if false, we drop the tracesactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_end_activities(dataframe, ['Act. Z'], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_event_attribute_values(log: EventLog | DataFrame, attribute_key: str, values: Set[str] | List[str], level: str = 'case', retain: bool = True, case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter a log object on the values of some event attribute
- Parameters:
log – event log / Pandas dataframe
attribute_key (
str
) – attribute to filtervalues – admitted (or forbidden) values
level (
str
) – specifies how the filter should be applied (‘case’ filters the cases where at least one occurrence happens, ‘event’ filter the events eventually trimming the cases)retain (
bool
) – specifies if the values should be kept or removedcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_event_attribute_values(dataframe, 'concept:name', ['Act. A', 'Act. Z'], case_id_key='case:concept:name')
- pm4py.filtering.filter_trace_attribute_values(log: EventLog | DataFrame, attribute_key: str, values: Set[str] | List[str], retain: bool = True, case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter a log on the values of a trace attribute
- Parameters:
log – event log / Pandas dataframe
attribute_key (
str
) – attribute to filtervalues – collection of values to filter
retain (
bool
) – boolean value (keep/discard matching traces)case_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_trace_attribute_values(dataframe, 'case:creator', ['Mike'], case_id_key='case:concept:name')
- pm4py.filtering.filter_variants(log: EventLog | DataFrame, variants: Set[str] | List[str] | List[Tuple[str]], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter a log on a specified set of variants
- Parameters:
log – event log / Pandas dataframe
variants – collection of variants to filter; A variant should be specified as a list of tuples of activity names, e.g., [(‘a’, ‘b’, ‘c’)]
retain (
bool
) – boolean; if True all traces conforming to the specified variants are retained; if False, all those traces are removedactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_variants(dataframe, [('Act. A', 'Act. B', 'Act. Z'), ('Act. A', 'Act. C', 'Act. Z')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_directly_follows_relation(log: EventLog | DataFrame, relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Retain traces that contain any of the specified ‘directly follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>].
- Parameters:
log – event log / Pandas dataframe
relations – list of activity name pairs, which are allowed/forbidden paths
retain (
bool
) – parameter that says whether the paths should be kept/removedactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_directly_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_eventually_follows_relation(log: EventLog | DataFrame, relations: List[str], retain: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Retain traces that contain any of the specified ‘eventually follows’ relations. For example, if relations == [(‘a’,’b’),(‘a’,’c’)] and log [<a,b,c>,<a,c,b>,<a,d,b>] the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].
- Parameters:
log – event log / Pandas dataframe
relations – list of activity name pairs, which are allowed/forbidden paths
retain (
bool
) – parameter that says whether the paths should be kept/removedactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_eventually_follows_relation(dataframe, [('A','B'),('A','C')], activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_time_range(log: EventLog | DataFrame, dt1: str, dt2: str, mode='events', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filter a log on a time interval
- Parameters:
log – event log / Pandas dataframe
dt1 (
str
) – left extreme of the intervaldt2 (
str
) – right extreme of the intervalmode (
str
) – modality of filtering (events, traces_contained, traces_intersecting). events: any event that fits the time frame is retained; traces_contained: any trace completely contained in the timeframe is retained; traces_intersecting: any trace intersecting with the time-frame is retained.timestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_contained', case_id_key='case:concept:name', timestamp_key='time:timestamp') filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='traces_intersecting', case_id_key='case:concept:name', timestamp_key='time:timestamp') filtered_dataframe1 = pm4py.filter_time_range(dataframe, '2010-01-01 00:00:00', '2011-01-01 00:00:00', mode='events', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_between(log: EventLog | DataFrame, act1: str | List[str], act2: str | List[str], activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Finds all the sub-cases leading from an event with activity “act1” to an event with activity “act2” in the log, and returns a log containing only them.
Example:
Log A B C D E F A B E F C A B F C B C B E F C
act1 = B act2 = C
Returned sub-cases: B C (from the first case) B E F C (from the second case) B F C (from the third case) B C (from the third case) B E F C (from the third case)
- Parameters:
log – event log / Pandas dataframe
act1 – source activity (or collection of activities)
act2 – target activity (or collection of activities)
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_between(dataframe, 'A', 'D', activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.filtering.filter_case_size(log: EventLog | DataFrame, min_size: int, max_size: int, case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the event log, keeping the cases having a length (number of events) included between min_size and max_size
- Parameters:
log – event log / Pandas dataframe
min_size (
int
) – minimum allowed number of eventsmax_size (
int
) – maximum allowed number of eventscase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_case_size(dataframe, 5, 10, case_id_key='case:concept:name')
- pm4py.filtering.filter_case_performance(log: EventLog | DataFrame, min_performance: float, max_performance: float, timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the event log, keeping the cases having a duration (the timestamp of the last event minus the timestamp of the first event) included between min_performance and max_performance
- Parameters:
log – event log / Pandas dataframe
min_performance (
float
) – minimum allowed case durationmax_performance (
float
) – maximum allowed case durationtimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_case_performance(dataframe, 3600.0, 86400.0, timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_activities_rework(log: EventLog | DataFrame, activity: str, min_occurrences: int = 2, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the event log, keeping the cases where the specified activity occurs at least min_occurrences times.
- Parameters:
log – event log / Pandas dataframe
activity (
str
) – activitymin_occurrences (
int
) – minimum desidered number of occurrencesactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_activities_rework(dataframe, 'Approve Order', 2, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_paths_performance(log: EventLog | DataFrame, path: Tuple[str, str], min_performance: float, max_performance: float, keep=True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the event log, either: - (keep=True) keeping the cases having the specified path (tuple of 2 activities) with a duration included between min_performance and max_performance - (keep=False) discarding the cases having the specified path with a duration included between min_performance and max_performance
- Parameters:
log – event log / Pandas dataframe
path – tuple of two activities (source_activity, target_activity)
min_performance (
float
) – minimum allowed performance (of the path)max_performance (
float
) – maximum allowed performance (of the path)keep (
bool
) – keep/discard the cases having the specified path with a duration included between min_performance and max_performanceactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_paths_performance(dataframe, ('A', 'D'), 3600.0, 86400.0, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_variants_top_k(log: EventLog | DataFrame, k: int, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Keeps the top-k variants of the log
- Parameters:
log – event log / Pandas dataframe
k (
int
) – number of variants that should be keptactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_variants_top_k(dataframe, 5, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_variants_by_coverage_percentage(log: EventLog | DataFrame, min_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the variants of the log by a coverage percentage (e.g., if min_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 1 and variant 2).
- Parameters:
log – event log / Pandas dataframe
min_coverage_percentage (
float
) – minimum allowed percentage of coverageactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_variants_by_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_variants_by_maximum_coverage_percentage(log: EventLog | DataFrame, max_coverage_percentage: float, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the variants of the log by a maximum coverage percentage (e.g., if max_coverage_percentage=0.4, and we have a log with 1000 cases, of which 500 of the variant 1, 400 of the variant 2, and 100 of the variant 3, the filter keeps only the traces of variant 2 and variant 3).
- Parameters:
log – event log / Pandas dataframe
max_coverage_percentage (
float
) – maximum allowed percentage of coverageactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_variants_by_maximum_coverage_percentage(dataframe, 0.1, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_prefixes(log: EventLog | DataFrame, activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the log, keeping the prefixes to a given activity. E.g., for a log with traces:
A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F
The prefixes to “C” are respectively:
A,B A,B,Z,A,B A,B
- Parameters:
log – event log / Pandas dataframe
activity (
str
) – target activity of the filterstrict (
bool
) – applies the filter strictly (cuts the occurrences of the selected activity).first_or_last (
str
) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_suffixes(log: EventLog | DataFrame, activity: str, strict=True, first_or_last='first', activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters the log, keeping the suffixes from a given activity. E.g., for a log with traces:
A,B,C,D A,B,Z,A,B,C,D A,B,C,D,C,E,C,F
The suffixes from “C” are respectively:
D D D,C,E,C,F
- Parameters:
log – event log / Pandas dataframe
activity (
str
) – target activity of the filterstrict (
bool
) – applies the filter strictly (cuts the occurrences of the selected activity).first_or_last (
str
) – decides if the first or last occurrence of an activity should be selected as baseline for the filter.activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_prefixes(dataframe, 'Act. C', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_ocel_event_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL [source]#
Filters the object-centric event log on the provided event attributes values
- Parameters:
ocel (
OCEL
) – object-centric event logattribute_key (
str
) – attribute at the event levelattribute_values – collection of attribute values
positive (
bool
) – decides if the values should be kept (positive=True) or removed (positive=False)
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_event_attribute(ocel, 'ocel:activity', ['A', 'B', 'D'])
- pm4py.filtering.filter_ocel_object_attribute(ocel: OCEL, attribute_key: str, attribute_values: Collection[Any], positive: bool = True) OCEL [source]#
Filters the object-centric event log on the provided object attributes values
- Parameters:
ocel (
OCEL
) – object-centric event logattribute_key (
str
) – attribute at the event levelattribute_values – collection of attribute values
positive (
bool
) – decides if the values should be kept (positive=True) or removed (positive=False)
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_object_attribute(ocel, 'ocel:type', ['order'])
- pm4py.filtering.filter_ocel_object_types_allowed_activities(ocel: OCEL, correspondence_dict: Dict[str, Collection[str]]) OCEL [source]#
Filters an object-centric event log keeping only the specified object types with the specified activity set (filters out the rest).
- Parameters:
ocel (
OCEL
) – object-centric event logcorrespondence_dict – dictionary containing, for every object type of interest, a collection of allowed activities. Example: {“order”: [“Create Order”], “element”: [“Create Order”, “Create Delivery”]}
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(ocel, {'order': ['create order', 'pay order'], 'item})
- pm4py.filtering.filter_ocel_object_per_type_count(ocel: OCEL, min_num_obj_type: Dict[str, int]) OCEL [source]#
Filters the events of the object-centric logs which are related to at least the specified amount of objects per type.
E.g. pm4py.filter_object_per_type_count(ocel, {“order”: 1, “element”: 2})
Would keep the following events:
ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order
0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1] 1 e11 1981-01-01 Create Order [i6, i5] [o2] 2 e14 1981-01-04 Create Order [i8, i7] [o3]
- Parameters:
ocel (
OCEL
) – object-centric event logmin_num_obj_type – minimum number of objects per type
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_object_per_type_count(ocel, {'order': 1, 'element': 2})
- pm4py.filtering.filter_ocel_start_events_per_object_type(ocel: OCEL, object_type: str) OCEL [source]#
Filters the events in which a new object for the given object type is spawn. (E.g. an event with activity “Create Order” might spawn new orders).
- Parameters:
ocel (
OCEL
) – object-centric event logobject_type (
str
) – object type to consider
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(ocel, 'delivery')
- pm4py.filtering.filter_ocel_end_events_per_object_type(ocel: OCEL, object_type: str) OCEL [source]#
Filters the events in which an object for the given object type terminates its lifecycle. (E.g. an event with activity “Pay Order” might terminate an order).
- Parameters:
ocel (
OCEL
) – object-centric event logobject_type (
str
) – object type to consider
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(ocel, 'delivery')
- pm4py.filtering.filter_ocel_events_timestamp(ocel: OCEL, min_timest: datetime | str, max_timest: datetime | str, timestamp_key: str = 'ocel:timestamp') OCEL [source]#
Filters the object-centric event log keeping events in the provided timestamp range
- Parameters:
ocel (
OCEL
) – object-centric event logmin_timest – left extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)
max_timest – right extreme of the allowed timestamp interval (provided in the format: YYYY-mm-dd HH:MM:SS)
timestamp_key (
str
) – the attribute to use as timestamp (default: ocel:timestamp)
- Return type:
OCEL
import pm4py filtered_ocel = pm4py.filter_ocel_events_timestamp(ocel, '1990-01-01 00:00:00', '2010-01-01 00:00:00')
- pm4py.filtering.filter_four_eyes_principle(log: EventLog | DataFrame, activity1: str, activity2: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') EventLog | DataFrame [source]#
Filter the cases of the log which violates the four eyes principle on the provided activities.
- Parameters:
log – event log
activity1 (
str
) – first activityactivity2 (
str
) – second activityactivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierresource_key (
str
) – attribute to be used as resource
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_four_eyes_principle(dataframe, 'Act. A', 'Act. B', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_activity_done_different_resources(log: EventLog | DataFrame, activity: str, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', resource_key: str = 'org:resource') EventLog | DataFrame [source]#
Filters the cases where an activity is repeated by different resources.
- Parameters:
log – event log
activity (
str
) – activity to consideractivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierresource_key (
str
) – attribute to be used as resource
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py filtered_dataframe = pm4py.filter_activity_done_different_resources(dataframe, 'Act. A', activity_key='concept:name', resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.filtering.filter_trace_segments(log: EventLog | DataFrame, admitted_traces: List[List[str]], positive: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Filters an event log on a set of traces. A trace is a sequence of activities and “…”, in which: - a “…” before an activity tells that other activities can precede the given activity - a “…” after an activity tells that other activities can follow the given activity
For example: - pm4py.filter_trace_segments(log, [[“A”, “B”]]) <- filters only the cases of the event log having exactly the process variant A,B - pm4py.filter_trace_segments(log, [[”…”, “A”, “B”]]) <- filters only the cases of the event log ending with the activities A,B - pm4py.filter_trace_segments(log, [[“A”, “B”, “…”]]) <- filters only the cases of the event log starting with the activities A,B - pm4py.filter_trace_segments(log, [[”…”, “A”, “B”, “C”, “…”], [”…”, “D”, “E”, “F”, “…”]]
- <- filters only the cases of the event log in which at any point
there is A followed by B followed by C, and in which at any other point there is D followed by E followed by F
- Parameters:
log – event log / Pandas dataframe
admitted_traces – collection of traces admitted from the filter (with the aforementioned criteria)
positive (
bool
) – (boolean) indicates if the filter should keep/discard the cases satisfying the filteractivity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py log = pm4py.read_xes("tests/input_data/running-example.xes") filtered_log = pm4py.filter_trace_segments(log, [["...", "check ticket", "decide", "reinitiate request", "..."]]) print(filtered_log)
- pm4py.filtering.filter_ocel_object_types(ocel: OCEL, obj_types: Collection[str], positive: bool = True, level: int = 1) OCEL [source]#
Filters the object types of an object-centric event log.
- Parameters:
ocel (
OCEL
) – object-centric event logobj_types – object types to keep/remove
positive (
bool
) – boolean value (True=keep, False=remove)level (
int
) – recursively expand the set of object identifiers until the specified level
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_object_types(ocel, ['order'])
- pm4py.filtering.filter_ocel_objects(ocel: OCEL, object_identifiers: Collection[str], positive: bool = True, level: int = 1) OCEL [source]#
Filters the object identifiers of an object-centric event log.
- Parameters:
ocel (
OCEL
) – object-centric event logobject_identifiers – object identifiers to keep/remove
positive (
bool
) – boolean value (True=keep, False=remove)level (
int
) – recursively expand the set of object identifiers until the specified level
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_objects(ocel, ['o1'], level=1)
- pm4py.filtering.filter_ocel_events(ocel: OCEL, event_identifiers: Collection[str], positive: bool = True) OCEL [source]#
Filters the event identifiers of an object-centric event log.
- Parameters:
ocel (
OCEL
) – object-centric event logevent_identifiers – event identifiers to keep/remove
positive (
bool
) – boolean value (True=keep, False=remove)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_events(ocel, ['e1'])
- pm4py.filtering.filter_ocel_cc_object(ocel: OCEL, object_id: str, conn_comp: List[List[str]] | None = None, return_conn_comp: bool = False) OCEL | Tuple[OCEL, List[List[str]]] [source]#
Returns the connected component of the object-centric event log to which the object with the provided identifier belongs.
- Parameters:
ocel (
OCEL
) – object-centric event logobject_id (
str
) – object identifierconn_comp – (optional) connected components of the objects of the OCEL
return_conn_comp (
bool
) – if True, returns the computed connected components of the OCEL
- Return type:
Union[OCEL, Tuple[OCEL, List[List[str]]]]
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_cc_object(ocel, 'order1')
- pm4py.filtering.filter_ocel_cc_length(ocel: OCEL, min_cc_length: int, max_cc_length: int) OCEL [source]#
Keeps only the objects in an OCEL belonging to a connected component with a length falling in a specified range
Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
- Parameters:
ocel (
OCEL
) – object-centric event logmin_cc_length (
int
) – minimum allowed length for the connected componentmax_cc_length (
int
) – maximum allowed length for the connected component
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_cc_length(ocel, 2, 10)
- pm4py.filtering.filter_ocel_cc_otype(ocel: OCEL, otype: str, positive: bool = True) OCEL [source]#
Filters the objects belonging to the connected components having at least an object of the provided object type.
Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
- Parameters:
ocel (
OCEL
) – object-centric event logotype (
str
) – object typepositive (
bool
) – boolean that keeps or discards the objects of these components
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_cc_otype(ocel, 'order')
- pm4py.filtering.filter_ocel_cc_activity(ocel: OCEL, activity: str) OCEL [source]#
Filters the objects belonging to the connected components having at least an event with the provided activity.
Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
- Parameters:
ocel (
OCEL
) – object-centric event logactivity (
str
) – activity
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('log.jsonocel') filtered_ocel = pm4py.filter_ocel_cc_activity(ocel, 'Create Order')
pm4py.hof module#
- pm4py.hof.filter_log(f: Callable[[Any], bool], log: EventLog) EventLog | EventStream [source]#
Filters the log according to a given (lambda) function.
- Parameters:
f – function that specifies the filter criterion, may be a lambda
log (
EventLog
) – event log; either EventLog or EventStream Object
- Return type:
Union[log_inst.EventLog, log_inst.EventStream]
Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.
- pm4py.hof.filter_trace(f: Callable[[Any], bool], trace: Trace) Trace [source]#
Filters the trace according to a given (lambda) function.
- Parameters:
f – function that specifies the filter criterion, may be a lambda
trace (
Trace
) – trace; PM4Py trace object
- Return type:
log_inst.Trace
- pm4py.hof.sort_log(log: EventLog, key, reverse: bool = False) EventLog | EventStream [source]#
Sorts the event log according to a given key.
- Parameters:
log (
EventLog
) – event log object; either EventLog or EventStreamkey – sorting key
reverse (
bool
) – indicates whether sorting should be reversed or not
- Return type:
Union[log_inst.EventLog, log_inst.EventStream]
Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.
- pm4py.hof.sort_trace(trace: Trace, key, reverse: bool = False) Trace [source]#
Sorts the events in a trace according to a given key.
- Parameters:
trace (
Trace
) – input tracekey – sorting key
reverse (
bool
) – indicates whether sorting should be reversed (default False)
- Return type:
log_inst.Trace
Deprecated since version 2.3.0: This will be removed in 3.0.0. the EventLog class will be removed in a future release.
pm4py.llm module#
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
- pm4py.llm.openai_query(prompt: str, api_key: str | None = None, openai_model: str | None = None, api_url: str | None = None, **kwargs) str [source]#
Executes the provided prompt, obtaining the answer from the OpenAI APIs.
- Parameters:
prompt (
str
) – prompt that should be executedapi_key – OpenAI API key
openai_model – OpenAI model to be used (default: gpt-3.5-turbo)
api_url – OpenAI API URL
- Return type:
str
import pm4py resp = pm4py.llm.openai_query('what is the result of 3+3?', api_key="sk-382393", openai_model="gpt-3.5-turbo") print(resp)
- pm4py.llm.abstract_dfg(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, primary_performance_aggregation: str = 'mean', secondary_performance_aggregation: str | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str [source]#
Obtains the DFG abstraction of a traditional event log
- Parameters:
log_obj – log object
max_len (
int
) – maximum length of the (string) abstractioninclude_performance (
bool
) – (boolean) includes the performance of the paths in the abstractionrelative_frequency (
bool
) – (boolean) uses the relative instead of the absolute frequency of the pathsresponse_header (
bool
) – includes a short header before the paths, pointing to the description of the abstractionprimary_performance_aggregation (
str
) – primary aggregation to be used for the arc’s performance (default: mean, other options: median, min, max, sum, stdev)secondary_performance_aggregation – (optional) secondary aggregation to be used for the arc’s performance (default None, other options: mean, median, min, max, sum, stdev)
activity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestampcase_id_key (
str
) – the column to be used as case identifier
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes") print(pm4py.llm.abstract_dfg(log))
- pm4py.llm.abstract_variants(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_performance: bool = True, relative_frequency: bool = False, response_header: bool = True, primary_performance_aggregation: str = 'mean', secondary_performance_aggregation: str | None = None, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str [source]#
Obtains the variants abstraction of a traditional event log
- Parameters:
log_obj – log object
max_len (
int
) – maximum length of the (string) abstractioninclude_performance (
bool
) – (boolean) includes the performance of the variants in the abstractionrelative_frequency (
bool
) – (boolean) uses the relative instead of the absolute frequency of the variantsresponse_header (
bool
) – includes a short header before the variants, pointing to the description of the abstractionprimary_performance_aggregation (
str
) – primary aggregation to be used for the arc’s performance (default: mean, other options: median, min, max, sum, stdev)secondary_performance_aggregation – (optional) secondary aggregation to be used for the arc’s performance (default None, other options: mean, median, min, max, sum, stdev)
activity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestampcase_id_key (
str
) – the column to be used as case identifier
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes") print(pm4py.llm.abstract_variants(log))
- pm4py.llm.abstract_ocel(ocel: OCEL, include_timestamps: bool = True) str [source]#
Obtains the abstraction of an object-centric event log, including the list of events and the objects of the OCEL
- Parameters:
ocel (
OCEL
) – object-centric event loginclude_timestamps (
bool
) – (boolean) includes the timestamp information in the abstraction
- Return type:
str
import pm4py ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel") print(pm4py.llm.abstract_ocel(ocel))
- pm4py.llm.abstract_ocel_ocdfg(ocel: OCEL, include_header: bool = True, include_timestamps: bool = True, max_len: int = 10000) str [source]#
Obtains the abstraction of an object-centric event log, representing in text the object-centric directly-follows graph
- Parameters:
ocel (
OCEL
) – object-centric event loginclude_header (
bool
) – (boolean) includes the header in the abstractioninclude_timestamps (
bool
) – (boolean) includes the timestamp information in the abstractionmax_len (
int
) – maximum length of the abstraction
- Return type:
str
import pm4py ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel") print(pm4py.llm.abstract_ocel_ocdfg(ocel))
- pm4py.llm.abstract_ocel_features(ocel: OCEL, obj_type: str, include_header: bool = True, max_len: int = 10000, debug: bool = False, enable_object_lifecycle_paths: bool = True) str [source]#
Obtains the abstraction of an object-centric event log, representing in text the features and their values.
- Parameters:
ocel (
OCEL
) – object-centric event logobj_type (
str
) – the object type that should be considered in the feature extractioninclude_header (
bool
) – (boolean) includes the header in the abstractionmax_len (
int
) – maximum length of the abstractiondebug (
bool
) – enables debugging mode (telling at which point of the feature extraction you are)enable_object_lifecycle_paths (
bool
) – enables the “lifecycle paths” feature
- Return type:
str
import pm4py ocel = pm4py.read_ocel("tests/input_data/ocel/example_log.jsonocel") print(pm4py.llm.abstract_ocel_ocdfg(ocel))
- pm4py.llm.abstract_event_stream(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, response_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str [source]#
Obtains the event stream abstraction of a traditional event log
- Parameters:
log_obj – log object
max_len (
int
) – maximum length of the (string) abstractionresponse_header (
bool
) – includes a short header before the variants, pointing to the description of the abstractionactivity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestampcase_id_key (
str
) – the column to be used as case identifier
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes") print(pm4py.llm.abstract_event_stream(log))
- pm4py.llm.abstract_petri_net(net: PetriNet, im: Marking, fm: Marking, response_header: bool = True) str [source]#
Obtain an abstraction of a Petri net
- Parameters:
- Return type:
str
import pm4py net, im, fm = pm4py.read_pnml('tests/input_data/running-example.pnml') print(pm4py.llm.abstract_petri_net(net, im, fm))
- pm4py.llm.abstract_log_attributes(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str [source]#
Abstracts the attributes of a log (reporting their name, their type, and the top values)
- Parameters:
log_obj – log object
max_len (
int
) – maximum length of the (string) abstractionactivity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestampcase_id_key (
str
) – the column to be used as case identifier
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes") print(pm4py.llm.abstract_log_attributes(log))
- pm4py.llm.abstract_log_features(log_obj: DataFrame | EventLog | EventStream, max_len: int = 10000, include_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') str [source]#
Abstracts the machine learning features obtained from a log (reporting the top features until the desired length is obtained)
- Parameters:
log_obj – log object
max_len (
int
) – maximum length of the (string) abstractionactivity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestampcase_id_key (
str
) – the column to be used as case identifier
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes") print(pm4py.llm.abstract_log_features(log))
- pm4py.llm.abstract_temporal_profile(temporal_profile: Dict[Tuple[str, str], Tuple[float, float]], include_header: bool = True) str [source]#
Abstracts a temporal profile model to a string.
- Parameters:
temporal_profile – temporal profile model
include_header (
bool
) – includes an header in the response, describing the temporal profile
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True) temporal_profile = pm4py.discover_temporal_profile(log) text_abstr = pm4py.llm.abstract_temporal_profile(temporal_profile, include_header=True) print(text_abstr)
- pm4py.llm.abstract_case(case: Trace, include_case_attributes: bool = True, include_event_attributes: bool = True, include_timestamp: bool = True, include_header: bool = True, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp') str [source]#
Textually abstracts a case
- Parameters:
case (
Trace
) – case objectinclude_case_attributes (
bool
) – (boolean) include or not the attributes at the case levelinclude_event_attributes (
bool
) – (boolean) include or not the attributes at the event levelinclude_timestamp (
bool
) – (boolean) include or not the event timestamp in the abstractioninclude_header (
bool
) – (boolean) includes the header of the responseactivity_key (
str
) – the column to be used as activitytimestamp_key (
str
) – the column to be used as timestamp
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True) print(pm4py.llm.abstract_case(log[0]))
- pm4py.llm.abstract_declare(declare_model, include_header: bool = True) str [source]#
Textually abstracts a DECLARE model
- Parameters:
declare – DECLARE model
include_header (
bool
) – (boolean) includes the header of the response
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True) log_ske = pm4py.discover_declare(log) print(pm4py.llm.abstract_declare(log_ske))
- pm4py.llm.abstract_log_skeleton(log_skeleton, include_header: bool = True) str [source]#
Textually abstracts a log skeleton process model
- Parameters:
log_skeleton – log skeleton
include_header (
bool
) – (boolean) includes the header of the response
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/roadtraffic100traces.xes", return_legacy_log_object=True) log_ske = pm4py.discover_log_skeleton(log) print(pm4py.llm.abstract_log_skeleton(log_ske))
- pm4py.llm.explain_visualization(vis_saver, *args, connector=<function openai_query>, **kwargs) str [source]#
Explains a process mining visualization by using LLMs (saving that first in a .png image, then providing the .png file to the Large Language Model along with possibly a description of the visualization).
- Parameters:
vis_saver – the visualizer (saving to disk) to be used
args – the mandatory arguments that should be provided to the visualization
connector – the connector method to the large language model
kwargs – optional parameters of the visualization or the connector (for example, the annotation of the visualization, or the API key)
- Return type:
str
import pm4py log = pm4py.read_xes("tests/input_data/running-example.xes") descr = pm4py.llm.explain_visualization(pm4py.save_vis_dotted_chart, log, api_key="sk-5HN", show_legend=False) print(descr)
pm4py.meta module#
Process mining for Python
pm4py.ml module#
The pm4py.ml
module contains the machine learning features offered in pm4py
- pm4py.ml.split_train_test(log: EventLog | DataFrame, train_percentage: float = 0.8, case_id_key='case:concept:name') Tuple[EventLog, EventLog] | Tuple[DataFrame, DataFrame] [source]#
Split an event log in a training log and a test log (for machine learning purposes). Returns the training and the test event log.
- Parameters:
log – event log / Pandas dataframe
train_percentage (
float
) – fraction of traces to be included in the training log (from 0.0 to 1.0)case_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[Tuple[EventLog, EventLog], Tuple[pd.DataFrame, pd.DataFrame]]
import pm4py train_df, test_df = pm4py.split_train_test(dataframe, train_percentage=0.75)
- pm4py.ml.get_prefixes_from_log(log: EventLog | DataFrame, length: int, case_id_key: str = 'case:concept:name') EventLog | DataFrame [source]#
Gets the prefixes of a log of a given length. The returned log object contain the prefixes: - if a trace has lower or identical length, it is included as-is - if a trace has greater length, it is cut
- Parameters:
log – event log / Pandas dataframe
length (
int
) – lengthcase_id_key (
str
) – attribute to be used as case identifier
- Return type:
Union[EventLog, pd.DataFrame]
import pm4py trimmed_df = pm4py.get_prefixes_from_log(dataframe, length=5, case_id_key='case:concept:name')
- pm4py.ml.extract_outcome_enriched_dataframe(log: EventLog | DataFrame, activity_key: str = 'concept:name', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name', start_timestamp_key: str = 'time:timestamp') DataFrame [source]#
Inserts additional columns in the dataframe which are computed on the overall case, so they model the outcome of the case.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activitytimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifierstart_timestamp_key (
str
) – attribute to be used as start timestamp
- Return type:
pd.DataFrame
import pm4py enriched_df = pm4py.extract_outcome_enriched_dataframe(log, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name', start_timestamp_key='time:timestamp')
- pm4py.ml.extract_features_dataframe(log: EventLog | DataFrame, str_tr_attr=None, num_tr_attr=None, str_ev_attr=None, num_ev_attr=None, str_evsucc_attr=None, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key=None, resource_key='org:resource', include_case_id: bool = False, **kwargs) DataFrame [source]#
Extracts a dataframe containing the features of each case of the provided log object
- Parameters:
log – log object (event log / Pandas dataframe)
str_tr_attr – (if provided) string attributes at the case level which should be extracted as features
num_tr_attr – (if provided) numeric attributes at the case level which should be extracted as features
str_ev_attr – (if provided) string attributes at the event level which should be extracted as features (one-hot encoding)
num_ev_attr – (if provided) numeric attributes at the event level which should be extracted as features (last value per attribute in a case)
activity_key (
str
) – the attribute to be used as activitytimestamp_key (
str
) – the attribute to be used as timestampcase_id_key – (if provided, otherwise default) the attribute to be used as case identifier
resource_key (
str
) – the attribute to be used as resourceinclude_case_id (
bool
) – includes the case identifier column in the features table
- Return type:
pd.DataFrame
import pm4py features_df = pm4py.extract_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.ml.extract_ocel_features(ocel: OCEL, obj_type: str, enable_object_lifecycle_paths: bool = True, enable_object_work_in_progress: bool = False, object_str_attributes: Collection[str] | None = None, object_num_attributes: Collection[str] | None = None, include_obj_id: bool = False, debug: bool = False) DataFrame [source]#
Extracts from an object-centric event log a set of features (returned as dataframe) computed on the OCEL for the objects of a given object type.
Implements the approach described in: Berti, A., Herforth, J., Qafari, M.S. et al. Graph-based feature extraction on object-centric event logs. Int J Data Sci Anal (2023). https://doi.org/10.1007/s41060-023-00428-2
- Parameters:
ocel (
OCEL
) – object-centric event logobj_type (
str
) – object type that should be consideredenable_object_lifecycle_paths (
bool
) – enables the “lifecycle paths” featureenable_object_work_in_progress (
bool
) – enables the “work in progress” feature (which has an high computational cost)object_str_attributes – string attributes at the object level to one-hot encode during the feature extraction
object_num_attributes – numeric attributes at the object level to one-hot encode during the feature extraction
include_obj_id (
bool
) – includes the object identifier as column of the “features” dataframedebug (
bool
) – enables debugging mode (telling at which point of the feature extraction you are)
- Return type:
pd.DataFrame
import pm4py ocel = pm4py.read_ocel('log.jsonocel') fea_df = pm4py.extract_ocel_features(ocel, "item")
- pm4py.ml.extract_temporal_features_dataframe(log: EventLog | DataFrame, grouper_freq='W', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key=None, start_timestamp_key='time:timestamp', resource_key='org:resource') DataFrame [source]#
Extracts a dataframe containing the temporal features of the provided log object
Implements the approach described in the paper: Pourbafrani, Mahsa, Sebastiaan J. van Zelst, and Wil MP van der Aalst. “Supporting automatic system dynamics model generation for simulation in the context of process mining.” International Conference on Business Information Systems. Springer, Cham, 2020.
- Parameters:
log – log object (event log / Pandas dataframe)
grouper_freq (
str
) – the grouping frequency (D, W, M, Y) to useactivity_key (
str
) – the attribute to be used as activitytimestamp_key (
str
) – the attribute to be used as timestampcase_id_key – (if provided, otherwise default) the attribute to be used as case identifier
resource_key (
str
) – the attribute to be used as resourcestart_timestamp_key (
str
) – the attribute to be used as start timestamp
- Return type:
pd.DataFrame
import pm4py temporal_features_df = pm4py.extract_temporal_features_dataframe(dataframe, activity_key='concept:name', case_id_key='case:concept:name', timestamp_key='time:timestamp')
- pm4py.ml.extract_target_vector(log: EventLog | DataFrame, variant: str, activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name') Tuple[Any, List[str]] [source]#
Extracts from a log object the target vector for a specific ML use case (next activity, next time, remaining time)
- Parameters:
log – log object (event log / Pandas dataframe)
variant (
str
) – variant of the algorithm to be used: next_activity, next_time, remaining_timeactivity_key (
str
) – the attribute to be used as activitytimestamp_key (
str
) – the attribute to be used as timestampcase_id_key (
str
) – the attribute to be used as case identifier
- Return type:
Tuple[Any, List[str]]
import pm4py vector_next_act, class_next_act = pm4py.extract_target_vector(log, 'next_activity', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name') vector_next_time, class_next_time = pm4py.extract_target_vector(log, 'next_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name') vector_rem_time, class_rem_time = pm4py.extract_target_vector(log, 'remaining_time', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
pm4py.ocel module#
The pm4py.ocel
module contains the object-centric process mining features offered in pm4py
- pm4py.ocel.ocel_get_object_types(ocel: OCEL) List[str] [source]#
Gets the list of object types contained in the object-centric event log (e.g., [“order”, “item”, “delivery”]).
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
List[str]
import pm4py object_types = pm4py.ocel_get_object_types(ocel)
- pm4py.ocel.ocel_get_attribute_names(ocel: OCEL) List[str] [source]#
Gets the list of attributes at the event and the object level of an object-centric event log (e.g. [“cost”, “amount”, “name”])
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
List[str]
import pm4py attribute_names = pm4py.ocel_get_attribute_names(ocel)
- pm4py.ocel.ocel_flattening(ocel: OCEL, object_type: str) DataFrame [source]#
Flattens the object-centric event log to a traditional event log with the choice of an object type. In the flattened log, the objects of a given object type are the cases, and each case contains the set of events related to the object. The flattened log follows the XES notations for case identifier, activity, and timestamp. In particular: - “case:concept:name” is the column used for the case ID. - “concept:name” is the column used for the activity. - “time:timestamp” is the column used for the timestamp.
- Parameters:
ocel (
OCEL
) – object-centric event logobject_type (
str
) – object type
- Return type:
pd.DataFrame
import pm4py event_log = pm4py.ocel_flattening(ocel, 'items')
- pm4py.ocel.ocel_object_type_activities(ocel: OCEL) Dict[str, Collection[str]] [source]#
Gets the set of activities performed for each object type
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
Dict[str, Collection[str]]
import pm4py ot_activities = pm4py.ocel_object_type_activities(ocel)
- pm4py.ocel.ocel_objects_ot_count(ocel: OCEL) Dict[str, Dict[str, int]] [source]#
Counts for each event the number of related objects per type
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
Dict[str, Dict[str, int]]
import pm4py objects_ot_count = pm4py.ocel_objects_ot_count(ocel)
- pm4py.ocel.ocel_temporal_summary(ocel: OCEL) DataFrame [source]#
Returns the ``temporal summary’’ from an object-centric event log. The temporal summary aggregates all the events performed in the same timestamp, and reports the list of activities and the involved objects.
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
pd.DataFrame
import pm4py temporal_summary = pm4py.ocel_temporal_summary(ocel)
- pm4py.ocel.ocel_objects_summary(ocel: OCEL) DataFrame [source]#
Gets the objects summary of an object-centric event log
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
pd.DataFrame
import pm4py objects_summary = pm4py.ocel_objects_summary(ocel)
- pm4py.ocel.ocel_objects_interactions_summary(ocel: OCEL) DataFrame [source]#
Gets the objects interactions summary of an object-centric event log. The objects interactions summary has a row for every combination (event, related object, other related object). Properties such as the activity of the event, and the object types of the two related objects, are included.
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
OCEL
import pm4py interactions_summary = pm4py.ocel_objects_interactions_summary(ocel)
- pm4py.ocel.discover_ocdfg(ocel: OCEL, business_hours=False, business_hour_slots=[(25200, 61200), (111600, 147600), (198000, 234000), (284400, 320400), (370800, 406800)]) Dict[str, Any] [source]#
Discovers an OC-DFG from an object-centric event log.
Object-centric directly-follows multigraphs are a composition of directly-follows graphs for the single object type, which can be annotated with different metrics considering the entities of an object-centric event log (i.e., events, unique objects, total objects).
Reference paper: Berti, Alessandro, and Wil van der Aalst. “Extracting multiple viewpoint models from relational databases.” Data-Driven Process Discovery and Analysis. Springer, Cham, 2018. 24-51.
- Parameters:
ocel (
OCEL
) – object-centric event logbusiness_hours (
bool
) – boolean value that enables the usage of the business hoursbusiness_hour_slots – work schedule of the company, provided as a list of tuples where each tuple represents one time slot of business hours. One slot i.e. one tuple consists of one start and one end time given in seconds since week start, e.g. [(7 * 60 * 60, 17 * 60 * 60), ((24 + 7) * 60 * 60, (24 + 12) * 60 * 60), ((24 + 13) * 60 * 60, (24 + 17) * 60 * 60),] meaning that business hours are Mondays 07:00 - 17:00 and Tuesdays 07:00 - 12:00 and 13:00 - 17:00
- Return type:
Dict[str, Any]
import pm4py ocdfg = pm4py.discover_ocdfg(ocel)
- pm4py.ocel.discover_oc_petri_net(ocel: OCEL, inductive_miner_variant: str = 'im', diagnostics_with_tbr: bool = False) Dict[str, Any] [source]#
Discovers an object-centric Petri net from the provided object-centric event log.
Reference paper: van der Aalst, Wil MP, and Alessandro Berti. “Discovering object-centric Petri nets.” Fundamenta informaticae 175.1-4 (2020): 1-40.
- Parameters:
ocel (
OCEL
) – object-centric event loginductive_miner_variant (
str
) – specify the variant of the inductive miner to be used (“im” for traditional; “imd” for the faster inductive miner directly-follows)diagnostics_with_tbr (
bool
) – (boolean) enables the computation of some diagnostics using token-based replay
- Return type:
Dict[str, Any]
import pm4py ocpn = pm4py.discover_oc_petri_net(ocel)
- pm4py.ocel.discover_objects_graph(ocel: OCEL, graph_type: str = 'object_interaction') Set[Tuple[str, str]] [source]#
Discovers an object graph from the provided object-centric event log
- Parameters:
ocel (
OCEL
) – object-centric event loggraph_type (
str
) – type of graph to consider (object_interaction, object_descendants, object_inheritance, object_cobirth, object_codeath)
- Return type:
Dict[str, Any]
import pm4py ocel = pm4py.read_ocel('trial.ocel') obj_graph = pm4py.ocel_discover_objects_graph(ocel, graph_type='object_interaction')
- pm4py.ocel.ocel_o2o_enrichment(ocel: OCEL, included_graphs: Collection[str] | None = None) OCEL [source]#
Inserts the information inferred from the graph computations (pm4py.discover_objects_graph) in the list of O2O relations of the OCEL.
- Parameters:
ocel (
OCEL
) – object-centric event logincluded_graphs – types of graphs to include, provided as list/set of strings (object_interaction_graph, object_descendants_graph, object_inheritance_graph, object_cobirth_graph, object_codeath_graph)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_o2o_enrichment(ocel) print(ocel.o2o)
- pm4py.ocel.ocel_e2o_lifecycle_enrichment(ocel: OCEL) OCEL [source]#
Inserts lifecycle-based information (when an object is created/terminated or other types of relations) in the list of E2O relations of the OCEL
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_e2o_lifecycle_enrichment(ocel) print(ocel.relations)
- pm4py.ocel.sample_ocel_objects(ocel: OCEL, num_objects: int) OCEL [source]#
Given an object-centric event log, returns a sampled event log with a subset of the objects that is chosen in a random way. Only the events related to at least one of these objects are filtered from the event log. As a note, the relationships between the different objects are probably going to be ruined by this sampling.
- Parameters:
ocel (
OCEL
) – Object-centric event lognum_objects (
int
) – Number of objects of the object-centric event log
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') sampled_ocel = pm4py.sample_ocel_objects(ocel, 50) # keeps only 50 random objects
- pm4py.ocel.sample_ocel_connected_components(ocel: OCEL, connected_components: int = 1, max_num_events_per_cc: int = 9223372036854775807, max_num_objects_per_cc: int = 9223372036854775807, max_num_e2o_relations_per_cc: int = 9223372036854775807) OCEL [source]#
Given an object-centric event log, returns a sampled event log with a subset of the executions. The number of considered connected components need to be specified by the user.
Paper: Adams, Jan Niklas, et al. “Defining cases and variants for object-centric event data.” 2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
- Parameters:
ocel (
OCEL
) – Object-centric event logconnected_components (
int
) – Number of connected components to pick from the OCELmax_num_events_per_cc (
int
) – maximum number of events allowed per connected component (default: sys.maxsize)max_num_objects_per_cc (
int
) – maximum number of events allowed per connected component (default: sys.maxsize)max_num_e2o_relations_per_cc (
int
) – maximum number of event-to-object relationships allowed per connected component (default: sys.maxsize)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') sampled_ocel = pm4py.sample_ocel_connected_components(ocel, 5) # keeps only 5 connected components
- pm4py.ocel.ocel_drop_duplicates(ocel: OCEL) OCEL [source]#
Drop relations between events and objects happening at the same time, with the same activity, to the same object identifier. This ends up cleaning the OCEL from duplicate events.
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_drop_duplicates(ocel)
- pm4py.ocel.ocel_merge_duplicates(ocel: OCEL, have_common_object: bool | None = False) OCEL [source]#
Merge events in the OCEL that happen with the same activity at the same timestamp
- Parameters:
ocel (
OCEL
) – object-centric event loghave_common_object – impose the additional merge condition that the two events should happen at the same timestamp.
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_merge_duplicates(ocel)
- pm4py.ocel.ocel_sort_by_additional_column(ocel: OCEL, additional_column: str, primary_column: str = 'ocel:timestamp') OCEL [source]#
Sorts the OCEL not only based on the timestamp column and the index, but using an additional sorting column that further determines the order of the events happening at the same timestamp.
- Parameters:
ocel (
OCEL
) – object-centric event logadditional_column (
str
) – additional column to use for the sortingprimary_column (
str
) – primary column to be used for the sorting (default: ocel:timestamp)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_sort_by_additional_column(ocel, 'ordering')
- pm4py.ocel.ocel_add_index_based_timedelta(ocel: OCEL) OCEL [source]#
Adds a small time-delta to the timestamp column based on the current index of the event. This ensures the correct ordering of the events in any object-centric process mining solution.
- Parameters:
ocel (
OCEL
) – object-centric event log- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel('trial.ocel') ocel = pm4py.ocel_add_index_based_timedelta(ocel)
- pm4py.ocel.cluster_equivalent_ocel(ocel: OCEL, object_type: str, max_objs: int = 9223372036854775807) Dict[str, Collection[OCEL]] [source]#
Perform a clustering of the object-centric event log, based on the ‘executions’ of a single object type. Equivalent ‘executions’ are grouped in the output dictionary.
- Parameters:
ocel (
OCEL
) – object-centric event logobject_type (
str
) – reference object typemax_objs (
int
) – maximum number of objects (of the given object type)
- Return type:
Dict[str, Collection[OCEL]]
import pm4py ocel = pm4py.read_ocel('trial.ocel') clusters = pm4py.cluster_equivalent_ocel(ocel, "order")
pm4py.org module#
The pm4py.org
module contains the organizational analysis techniques offered in pm4py
- pm4py.org.discover_handover_of_work_network(log: EventLog | DataFrame, beta=0, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA [source]#
Calculates the handover of work network of the event log. The handover of work network is essentially the DFG of the event log, however, using the resource as a node of the graph, instead of the activity. As such, to use this, resource information should be present in the event log.
- Return type:
- Parameters:
log – event log / Pandas dataframe
beta (
int
) – beta parameter for Handover metricresource_key (
str
) – attribute to be used for the resourcetimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
import pm4py metric = pm4py.discover_handover_of_work_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.org.discover_working_together_network(log: EventLog | DataFrame, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA [source]#
Calculates the working together network of the process. Two nodes resources are connected in the graph if the resources collaborate on an instance of the process.
- Return type:
- Parameters:
log – event log / Pandas dataframe
resource_key (
str
) – attribute to be used for the resourcetimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
import pm4py metric = pm4py.discover_working_together_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.org.discover_activity_based_resource_similarity(log: EventLog | DataFrame, activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA [source]#
Calculates similarity between the resources in the event log, based on their activity profiles.
- Return type:
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activityresource_key (
str
) – attribute to be used for the resourcetimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
import pm4py act_res_sim = pm4py.discover_activity_based_resource_similarity(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.org.discover_subcontracting_network(log: EventLog | DataFrame, n=2, resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') SNA [source]#
Calculates the subcontracting network of the process.
- Return type:
- Parameters:
log – event log / Pandas dataframe
n (
int
) – n parameter for Subcontracting metricresource_key (
str
) – attribute to be used for the resourcetimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
import pm4py metric = pm4py.discover_subcontracting_network(dataframe, resource_key='org:resource', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.org.discover_organizational_roles(log: EventLog | DataFrame, activity_key: str = 'concept:name', resource_key: str = 'org:resource', timestamp_key: str = 'time:timestamp', case_id_key: str = 'case:concept:name') List[Role] [source]#
Mines the organizational roles
A role is a set of activities in the log that are executed by a similar (multi)set of resources. Hence, it is a specific function into organization. Grouping the activities in roles can help:
Reference paper: Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. “Business models enhancement through discovery of roles.” 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.
- Parameters:
log – event log / Pandas dataframe
activity_key (
str
) – attribute to be used for the activityresource_key (
str
) – attribute to be used for the resourcetimestamp_key (
str
) – attribute to be used for the timestampcase_id_key (
str
) – attribute to be used as case identifier
import pm4py roles = pm4py.discover_organizational_roles(dataframe, resource_key='org:resource', activity_key='concept:name', timestamp_key='time:timestamp', case_id_key='case:concept:name')
- pm4py.org.discover_network_analysis(log: DataFrame | EventLog | EventStream, out_column: str, in_column: str, node_column_source: str, node_column_target: str, edge_column: str, edge_reference: str = '_out', performance: bool = False, sorting_column: str = 'time:timestamp', timestamp_column: str = 'time:timestamp') Dict[Tuple[str, str], Dict[str, Any]] [source]#
Performs a network analysis of the log based on the provided parameters.
The classical social network analysis methods are based on the order of the events inside a case. For example, the Handover of Work metric considers the directly-follows relationships between resources during the work of a case. An edge is added between the two resources if such relationships occurs.
Real-life scenarios may be more complicated. At first, is difficult to collect events inside the same case without having convergence/divergence issues (see first section of the OCEL part). At second, the type of relationship may also be important. Consider for example the relationship between two resources: this may be more efficient if the activity that is executed is liked by the resources, rather than disgusted.
The network analysis that we introduce here generalizes some existing social network analysis metrics, becoming independent from the choice of a case notion and permitting to build a multi-graph instead of a simple graph.
With this, we assume events to be linked by signals. An event emits a signal (that is contained as one attribute of the event) that is assumed to be received by other events (also, this is an attribute of these events) that follow the first event in the log. So, we assume there is an OUT attribute (of the event) that is identical to the IN attribute (of the other events).
When we collect this information, we can build the network analysis graph: - The source node of the relation is given by an aggregation over a node_column_source attribute. - The target node of the relation is given by an aggregation over a node_column_target attribute. - The type of edge is given by an aggregation over an edge_column attribute. - The network analysis graph can either be annotated with frequency or performance information.
The output is a multigraph. Two events EV1 and EV2 of the log are merged (indipendently from the case notion) based on having EV1.OUT_COLUMN = EV2.IN_COLUMN. Then, an aggregation is applied on the couple of events (NODE_COLUMN) to obtain the nodes that are connected. The edges between these nodes are aggregated based on some property of the source event (EDGE_COLUMN).
- Parameters:
log – event log / Pandas dataframe
out_column (
str
) – the source column of the link (default: the case identifier; events of the same case are linked)in_column (
str
) – the target column of the link (default: the case identifier; events of the same case are linked)node_column_source (
str
) – the attribute to be used for the node definition of the source event (default: the resource of the log, org:resource)node_column_target (
str
) – the attribute to be used for the node definition of the target event (default: the resource of the log, org:resource)edge_column (
str
) – the attribute to be used for the edge definition (default: the activity of the log, concept:name)edge_reference (
str
) – decide if the edge attribute should be picked from the source event. Values: _out => the source event ; _in => the target eventperformance (
bool
) – boolean value that enables the performance calculation on the edges of the network analysissorting_column (
str
) – the column that should be used to sort the log before performing the network analysis (default: time:timestamp)timestamp_column (
str
) – the column that should be used as timestamp for the performance-related analysis (default: time:timestamp)
- Return type:
Dict[Tuple[str, str], Dict[str, Any]]
import pm4py net_ana = pm4py.discover_network_analysis(dataframe, out_column='case:concept:name', in_column='case:concept:name', node_column_source='org:resource', node_column_target='org:resource', edge_column='concept:name')
pm4py.privacy module#
This file is part of PM4Py (More Info: https://pm4py.fit.fraunhofer.de).
PM4Py is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
PM4Py is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with PM4Py. If not, see <https://www.gnu.org/licenses/>.
- pm4py.privacy.anonymize_differential_privacy(log: EventLog | DataFrame, epsilon: float = 1.0, k: int = 10, p: int = 20) DataFrame [source]#
Protect event logs with differential privacy. Differential privacy is a guarantee that bounds the impact the data of one individual has on a query result.
Control-flow information is anonymized with SaCoFa. This algorithm inserts noise into a trace-variant count, through the step-wise construction of a prefix tree.
Contextual-information, like timestamps or resources, is anonymized with PRIPEL. This technique enriches a control-flow anonymized event log with contextual information from the original log, while still achieving differential privacy. PRIPEL anonymizes each event’s timestamp and other attributes, that are stored as strings, integers, floats, or booleans.
Please install diffprivlib https://diffprivlib.readthedocs.io/en/latest/ (pip install diffprivlib==0.5.2) to run our algorithm.
SaCoFa is described in: S. A. Fahrenkog-Petersen, M. Kabierski, F. Rösel, H. van der Aa and M. Weidlich, “SaCoFa: Semantics-aware Control-flow Anonymization for Process Mining,” 2021 3rd International Conference on Process Mining (ICPM), 2021, pp. 72-79. https://doi.org/10.48550/arXiv.2109.08501
PRIPEL is described in: Fahrenkrog-Petersen, S.A., van der Aa, H., Weidlich, M. (2020). PRIPEL: Privacy-Preserving Event Log Publishing Including Contextual Information. In: Fahland, D., Ghidini, C., Becker, J., Dumas, M. (eds) Business Process Management. BPM 2020. Lecture Notes in Computer Science, vol 12168. Springer, Cham. https://doi.org/10.1007/978-3-030-58666-9_7
- Parameters:
log – event log / Pandas dataframe
epsilon (
float
) – the strength of the differential privacy guarantee. The smaller the value of epsilon, the stronger the privacy guarantee that is provided.k (
int
) – the maximal length of considered traces in the prefix tree. We recommend setting k, that roughly 80% of all traces from the original event log are covered.p (
int
) – the pruning parameter, which denotes the minimum count a prefix has to have in order to not be discarded. The dependent exponential runtime of the algorithms is mitigated by the pruning parameter.
- Return type:
pd.DataFrame
import pm4py event_log = pm4py.read_xes("running-example.xes") anonymized_event_log = pm4py.anonymize_differential_privacy(event_log, epsilon=1.0, k=10, p=20)
pm4py.read module#
The pm4py.read
module contains all funcationality related to reading files/objects from disk.
- pm4py.read.read_xes(file_path: str, variant: str | None = None, return_legacy_log_object: bool = False, encoding: str = 'utf-8', **kwargs) DataFrame | EventLog [source]#
Reads an event log stored in XES format (see xes-standard) Returns a table (
pandas.DataFrame
) view of the event log.- Parameters:
file_path (
str
) – file path of the event log (.xes
file) on diskvariant – the variant of the importer to use. “iterparse” => traditional XML parser; “line_by_line” => text-based line-by-line importer ; “chunk_regex” => chunk-of-bytes importer (default); “iterparse20” => XES 2.0 importer
return_legacy_log_object (
bool
) – boolean value enabling returning a log object (default: False)encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
DataFrame
import pm4py log = pm4py.read_xes("<path_to_xes_file>")
- pm4py.read.read_pnml(file_path: str, auto_guess_final_marking: bool = False, encoding: str = 'utf-8') Tuple[PetriNet, Marking, Marking] [source]#
Reads a Petri net object from a .pnml file. The Petri net object returned is a triple containing the following objects:
Petrinet Object, encoded as a
PetriNet
classInitial Marking
Final Marking
- Return type:
Tuple[PetriNet, Marking, Marking]
- Parameters:
file_path (
str
) – file path of the Petri net model (.pnml
file) on diskencoding (
str
) – the encoding to be used (default: utf-8)
import pm4py pn = pm4py.read_pnml("<path_to_pnml_file>")
- pm4py.read.read_ptml(file_path: str, encoding: str = 'utf-8') ProcessTree [source]#
Reads a process tree object from a .ptml file
- Parameters:
file_path (
str
) – file path of the process tree object on diskencoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
ProcessTree
import pm4py process_tree = pm4py.read_ptml("<path_to_ptml_file>")
- pm4py.read.read_dfg(file_path: str, encoding: str = 'utf-8') Tuple[Dict[Tuple[str, str], int], Dict[str, int], Dict[str, int]] [source]#
Reads a DFG object from a .dfg file. The DFG object returned is a triple containing the following objects:
DFG Object, encoded as a
Dict[Tuple[str,str],int]
, s.t.DFG[('a','b')]=k
implies that activity'a'
is directly followed by activity'b'
a total ofk
times in the logStart activity dictionary, encoded as a
Dict[str,int]
, s.t.,S['a']=k
implies that activity'a'
is startingk
traces in the event logEnd activity dictionary, encoded as a
Dict[str,int]
, s.t.,E['z']=k
implies that activity'z'
is endingk
traces in the event log.
- Return type:
Tuple[Dict[Tuple[str,str],int], Dict[str,int], Dict[str,int]]
- Parameters:
file_path (
str
) – file path of the dfg model on diskencoding (
str
) – the encoding to be used (default: utf-8)
import pm4py dfg = pm4py.read_dfg("<path_to_dfg_file>")
- pm4py.read.read_bpmn(file_path: str, encoding: str = 'utf-8') BPMN [source]#
Reads a BPMN model from a .bpmn file
- Parameters:
file_path (
str
) – file path of the bpmn modelencoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
BPMN
import pm4py bpmn = pm4py.read_bpmn('<path_to_bpmn_file>')
- pm4py.read.read_ocel(file_path: str, objects_path: str | None = None, encoding: str = 'utf-8') OCEL [source]#
Reads an object-centric event log from a file (see: http://www.ocel-standard.org/). The
OCEL
object is returned by this method- Parameters:
file_path (
str
) – file path of the object-centric event logobjects_path – [Optional] file path from which the objects dataframe should be read
encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel("<path_to_ocel_file>")
- pm4py.read.read_ocel_csv(file_path: str, objects_path: str | None = None, encoding: str = 'utf-8') OCEL [source]#
Reads an object-centric event log from a CSV file (see: http://www.ocel-standard.org/). The
OCEL
object is returned by this method- Parameters:
file_path (
str
) – file path of the object-centric event log (.csv)objects_path – [Optional] file path from which the objects dataframe should be read
encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel_csv("<path_to_ocel_file.csv>")
- pm4py.read.read_ocel_json(file_path: str, encoding: str = 'utf-8') OCEL [source]#
Reads an object-centric event log from a JSON-OCEL file (see: http://www.ocel-standard.org/). The
OCEL
object is returned by this method- Parameters:
file_path (
str
) – file path of the object-centric event log (.jsonocel)encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel_json("<path_to_ocel_file.jsonocel>")
- pm4py.read.read_ocel_xml(file_path: str, encoding: str = 'utf-8') OCEL [source]#
Reads an object-centric event log from a XML-OCEL file (see: http://www.ocel-standard.org/). The
OCEL
object is returned by this method- Parameters:
file_path (
str
) – file path of the object-centric event log (.xmlocel)encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel_xml("<path_to_ocel_file.xmlocel>")
- pm4py.read.read_ocel_sqlite(file_path: str, encoding: str = 'utf-8') OCEL [source]#
Reads an object-centric event log from a SQLite database (see: http://www.ocel-standard.org/). The
OCEL
object is returned by this method- Parameters:
file_path (
str
) – file path of the SQLite database (.sqlite)encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel_sqlite("<path_to_ocel_file.sqlite>")
- pm4py.read.read_ocel2(file_path: str, variant_str: str | None = None, encoding: str = 'utf-8') OCEL [source]#
Reads an OCEL2.0 event log
- Parameters:
file_path (
str
) – path to the OCEL2.0 event logvariant_str – (optional) specification of the importer variant to be used
encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel2("<path_to_ocel_file>")
- pm4py.read.read_ocel2_json(file_path: str, variant_str: str | None = None, encoding: str = 'utf-8') OCEL [source]#
Reads an OCEL2.0 event log from a JSON-OCEL(2) file
- Parameters:
file_path (
str
) – path to the JSON filevariant_str – (optional) specification of the importer variant to be used
encoding (
str
) – the encoding to be used (default: utf-8)
- Return type:
OCEL
import pm4py ocel = pm4py.read_ocel2_json("<path_to_ocel_file.jsonocel>")