PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
__doc__ = """
The ``pm4py.filtering`` module contains the filtering features offered in ``pm4py``.
from typing import Union, Set, List, Tuple, Collection, Any, Dict, Optional
from collections import Counter
import pandas as pd
from pm4py.objects.log.obj import EventLog
from pm4py.util import constants, xes_constants, pandas_utils, nx_utils
import warnings
from pm4py.util.pandas_utils import check_is_pandas_dataframe, check_pandas_dataframe_columns
from pm4py.utils import get_properties, __event_log_deprecation_warning
from pm4py.objects.ocel.obj import OCEL
import datetime
def filter_log_relative_occurrence_event_attribute(
log: Union[EventLog, pd.DataFrame],
min_relative_stake: float,
attribute_key: str = xes_constants.DEFAULT_NAME_KEY,
level: str = "cases",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the event log, keeping only the events that have an attribute value which occurs:
- in at least the specified (min_relative_stake) percentage of events when level="events",
- in at least the specified (min_relative_stake) percentage of cases when level="cases".
:param log: Event log or Pandas DataFrame.
:param min_relative_stake: Minimum percentage of cases (expressed as a number between 0 and 1) in which the attribute should occur.
:param attribute_key: The attribute to filter.
:param level: The level of the filter (if level="events", then events; if level="cases", then cases).
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_log_relative_occurrence_event_attribute(
parameters = get_properties(log, timestamp_key=timestamp_key, case_id_key=case_id_key, activity_key=attribute_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
parameters[attributes_filter.Parameters.ATTRIBUTE_KEY] = attribute_key
parameters[attributes_filter.Parameters.KEEP_ONCE_PER_CASE] = True if level == "cases" else False
return attributes_filter.filter_df_relative_occurrence_event_attribute(log, min_relative_stake, parameters=parameters)
from pm4py.algo.filtering.log.attributes import attributes_filter
parameters[attributes_filter.Parameters.ATTRIBUTE_KEY] = attribute_key
parameters[attributes_filter.Parameters.KEEP_ONCE_PER_CASE] = True if level == "cases" else False
return attributes_filter.filter_log_relative_occurrence_event_attribute(log, min_relative_stake, parameters=parameters)
def filter_start_activities(
log: Union[EventLog, pd.DataFrame],
activities: Union[Set[str], List[str]],
retain: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters cases that have a start activity in the provided list.
:param log: Event log or Pandas DataFrame.
:param activities: Collection of start activities.
:param retain: If True, retains the traces containing the given start activities; if False, drops the traces.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_start_activities(
['Act. A'],
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.start_activities import start_activities_filter
parameters[start_activities_filter.Parameters.POSITIVE] = retain
return start_activities_filter.apply(log, activities, parameters=parameters)
from pm4py.algo.filtering.log.start_activities import start_activities_filter
parameters[start_activities_filter.Parameters.POSITIVE] = retain
return start_activities_filter.apply(log, activities, parameters=parameters)
def filter_end_activities(
log: Union[EventLog, pd.DataFrame],
activities: Union[Set[str], List[str]],
retain: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters cases that have an end activity in the provided list.
:param log: Event log or Pandas DataFrame.
:param activities: Collection of end activities.
:param retain: If True, retains the traces containing the given end activities; if False, drops the traces.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_end_activities(
['Act. Z'],
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.end_activities import end_activities_filter
parameters[end_activities_filter.Parameters.POSITIVE] = retain
return end_activities_filter.apply(log, activities, parameters=parameters)
from pm4py.algo.filtering.log.end_activities import end_activities_filter
parameters[end_activities_filter.Parameters.POSITIVE] = retain
return end_activities_filter.apply(log, activities, parameters=parameters)
def filter_event_attribute_values(
log: Union[EventLog, pd.DataFrame],
attribute_key: str,
values: Union[Set[str], List[str]],
level: str = "case",
retain: bool = True,
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters a log object based on the values of a specified event attribute.
:param log: Event log or Pandas DataFrame.
:param attribute_key: Attribute to filter.
:param values: Admitted or forbidden values.
:param level: Specifies how the filter should be applied ('case' filters the cases where at least one occurrence happens; 'event' filters the events, potentially trimming the cases).
:param retain: Specifies if the values should be kept or removed.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_event_attribute_values(
['Act. A', 'Act. Z'],
parameters = get_properties(log, case_id_key=case_id_key)
parameters[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = attribute_key
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
if level == "event":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_events(log, values, parameters=parameters)
elif level == "case":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values, parameters=parameters)
from pm4py.algo.filtering.log.attributes import attributes_filter
if level == "event":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_events(log, values, parameters=parameters)
elif level == "case":
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values, parameters=parameters)
def filter_trace_attribute_values(
log: Union[EventLog, pd.DataFrame],
attribute_key: str,
values: Union[Set[str], List[str]],
retain: bool = True,
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters a log based on the values of a specified trace attribute.
:param log: Event log or Pandas DataFrame.
:param attribute_key: Attribute to filter.
:param values: Collection of values to filter.
:param retain: Boolean value indicating whether to keep or discard matching traces.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_trace_attribute_values(
parameters = get_properties(log, case_id_key=case_id_key)
parameters[constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY] = attribute_key
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.attributes import attributes_filter
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply(log, values, parameters=parameters)
from pm4py.algo.filtering.log.attributes import attributes_filter
parameters[attributes_filter.Parameters.POSITIVE] = retain
return attributes_filter.apply_trace_attribute(log, values, parameters=parameters)
def filter_variants(
log: Union[EventLog, pd.DataFrame],
variants: Union[Set[str], List[str], List[Tuple[str]]],
retain: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters a log based on a specified set of variants.
:param log: Event log or Pandas DataFrame.
:param variants: Collection of variants to filter. A variant should be specified as a list of tuples of activity names, e.g., [('a', 'b', 'c')].
:param retain: Boolean indicating whether to retain (if True) or remove (if False) traces conforming to the specified variants.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_variants(
[('Act. A', 'Act. B', 'Act. Z'), ('Act. A', 'Act. C', 'Act. Z')],
from pm4py.util import variants_util
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.variants import variants_filter
parameters[variants_filter.Parameters.POSITIVE] = retain
return variants_filter.apply(log, variants, parameters=parameters)
from pm4py.algo.filtering.log.variants import variants_filter
parameters[variants_filter.Parameters.POSITIVE] = retain
return variants_filter.apply(log, variants, parameters=parameters)
def filter_directly_follows_relation(
log: Union[EventLog, pd.DataFrame],
relations: List[str],
retain: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Retains traces that contain any of the specified 'directly follows' relations.
For example, if relations == [('a','b'),('a','c')] and log [<a,b,c>,<a,c,b>,<a,d,b>],
the resulting log will contain traces describing [<a,b,c>,<a,c,b>].
:param log: Event log or Pandas DataFrame.
:param relations: List of activity name pairs, representing allowed or forbidden paths.
:param retain: Boolean indicating whether the paths should be kept (if True) or removed (if False).
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_directly_follows_relation(
[('A', 'B'), ('A', 'C')],
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.paths import paths_filter
parameters[paths_filter.Parameters.POSITIVE] = retain
return paths_filter.apply(log, relations, parameters=parameters)
from pm4py.algo.filtering.log.paths import paths_filter
parameters[paths_filter.Parameters.POSITIVE] = retain
return paths_filter.apply(log, relations, parameters=parameters)
def filter_eventually_follows_relation(
log: Union[EventLog, pd.DataFrame],
relations: List[str],
retain: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Retains traces that contain any of the specified 'eventually follows' relations.
For example, if relations == [('a','b'),('a','c')] and log [<a,b,c>,<a,c,b>,<a,d,b>],
the resulting log will contain traces describing [<a,b,c>,<a,c,b>,<a,d,b>].
:param log: Event log or Pandas DataFrame.
:param relations: List of activity name pairs, representing allowed or forbidden paths.
:param retain: Boolean indicating whether the paths should be kept (if True) or removed (if False).
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_eventually_follows_relation(
[('A', 'B'), ('A', 'C')],
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.ltl import ltl_checker
parameters[ltl_checker.Parameters.POSITIVE] = retain
if retain:
cases = set()
cases = set(log[case_id_key].to_numpy().tolist())
for path in relations:
filt_log = ltl_checker.eventually_follows(log, path, parameters=parameters)
this_traces = set(filt_log[case_id_key].to_numpy().tolist())
if retain:
cases = cases.union(this_traces)
cases = cases.intersection(this_traces)
return log[log[case_id_key].isin(cases)]
from pm4py.algo.filtering.log.ltl import ltl_checker
parameters[ltl_checker.Parameters.POSITIVE] = retain
if retain:
cases = set()
cases = set(id(trace) for trace in log)
for path in relations:
filt_log = ltl_checker.eventually_follows(log, path, parameters=parameters)
this_traces = set(id(trace) for trace in filt_log)
if retain:
cases = cases.union(this_traces)
cases = cases.intersection(this_traces)
filtered_log = EventLog(
for trace in log:
if id(trace) in cases:
return filtered_log
def filter_time_range(
log: Union[EventLog, pd.DataFrame],
dt1: str,
dt2: str,
mode: str = "events",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters a log based on a time interval.
:param log: Event log or Pandas DataFrame.
:param dt1: Left extreme of the interval.
:param dt2: Right extreme of the interval.
:param mode: Modality of filtering ('events', 'traces_contained', 'traces_intersecting').
- 'events': Any event that fits the time frame is retained.
- 'traces_contained': Any trace completely contained in the timeframe is retained.
- 'traces_intersecting': Any trace intersecting with the timeframe is retained.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe1 = pm4py.filter_time_range(
'2010-01-01 00:00:00',
'2011-01-01 00:00:00',
filtered_dataframe2 = pm4py.filter_time_range(
'2010-01-01 00:00:00',
'2011-01-01 00:00:00',
filtered_dataframe3 = pm4py.filter_time_range(
'2010-01-01 00:00:00',
'2011-01-01 00:00:00',
properties = get_properties(log, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
from pm4py.algo.filtering.pandas.timestamp import timestamp_filter
if mode == "events":
return timestamp_filter.apply_events(log, dt1, dt2, parameters=properties)
elif mode == "traces_contained":
return timestamp_filter.filter_traces_contained(log, dt1, dt2, parameters=properties)
elif mode == "traces_intersecting":
return timestamp_filter.filter_traces_intersecting(log, dt1, dt2, parameters=properties)
warnings.warn(f"Mode provided: {mode} is not recognized; original log returned!")
return log
from pm4py.algo.filtering.log.timestamp import timestamp_filter
if mode == "events":
return timestamp_filter.apply_events(log, dt1, dt2, parameters=properties)
elif mode == "traces_contained":
return timestamp_filter.filter_traces_contained(log, dt1, dt2, parameters=properties)
elif mode == "traces_intersecting":
return timestamp_filter.filter_traces_intersecting(log, dt1, dt2, parameters=properties)
warnings.warn(f"Mode provided: {mode} is not recognized; original log returned!")
return log
def filter_between(
log: Union[EventLog, pd.DataFrame],
act1: Union[str, List[str]],
act2: Union[str, List[str]],
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Finds all the sub-cases leading from an event with activity "act1" to an event with activity "act2" in the log,
and returns a log containing only them.
act1 = B
act2 = C
Returned sub-cases:
B C (from the first case)
B E F C (from the second case)
B F C (from the third case)
B C (from the third case)
B E F C (from the third case)
:param log: Event log or Pandas DataFrame.
:param act1: Source activity or collection of activities.
:param act2: Target activity or collection of activities.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_between(
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.between import between_filter
return between_filter.apply(log, act1, act2, parameters=parameters)
from pm4py.algo.filtering.log.between import between_filter
return between_filter.apply(log, act1, act2, parameters=parameters)
def filter_case_size(
log: Union[EventLog, pd.DataFrame],
min_size: int,
max_size: int,
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the event log, keeping cases that have a length (number of events) between min_size and max_size.
:param log: Event log or Pandas DataFrame.
:param min_size: Minimum allowed number of events.
:param max_size: Maximum allowed number of events.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_case_size(
parameters = get_properties(log, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.cases import case_filter
case_id = parameters[constants.PARAMETER_CONSTANT_CASEID_KEY] if constants.PARAMETER_CONSTANT_CASEID_KEY in parameters else constants.CASE_CONCEPT_NAME
return case_filter.filter_on_case_size(log, case_id, min_size, max_size)
from pm4py.algo.filtering.log.cases import case_filter
return case_filter.filter_on_case_size(log, min_size, max_size)
def filter_activities_rework(
log: Union[EventLog, pd.DataFrame],
activity: str,
min_occurrences: int = 2,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the event log, keeping cases where the specified activity occurs at least min_occurrences times.
:param log: Event log or Pandas DataFrame.
:param activity: Activity to consider.
:param min_occurrences: Minimum desired number of occurrences.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_activities_rework(
'Approve Order',
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["min_occurrences"] = min_occurrences
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.rework import rework_filter
return rework_filter.apply(log, activity, parameters=parameters)
from pm4py.algo.filtering.log.rework import rework_filter
return rework_filter.apply(log, activity, parameters=parameters)
def filter_variants_top_k(
log: Union[EventLog, pd.DataFrame],
k: int,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Keeps the top-k variants of the log.
:param log: Event log or Pandas DataFrame.
:param k: Number of variants to keep.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_variants_top_k(
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.variants import variants_filter
return variants_filter.filter_variants_top_k(log, k, parameters=parameters)
from pm4py.algo.filtering.log.variants import variants_filter
return variants_filter.filter_variants_top_k(log, k, parameters=parameters)
def filter_variants_by_coverage_percentage(
log: Union[EventLog, pd.DataFrame],
min_coverage_percentage: float,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the variants of the log based on a coverage percentage.
For example, if min_coverage_percentage=0.4 and the log has 1000 cases with:
- 500 cases of variant 1,
- 400 cases of variant 2,
- 100 cases of variant 3,
the filter keeps only the traces of variant 1 and variant 2.
:param log: Event log or Pandas DataFrame.
:param min_coverage_percentage: Minimum allowed percentage of coverage.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_variants_by_coverage_percentage(
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.variants import variants_filter
return variants_filter.filter_variants_by_coverage_percentage(log, min_coverage_percentage, parameters=parameters)
from pm4py.algo.filtering.log.variants import variants_filter
return variants_filter.filter_variants_by_coverage_percentage(log, min_coverage_percentage, parameters=parameters)
def filter_prefixes(
log: Union[EventLog, pd.DataFrame],
activity: str,
strict: bool = True,
first_or_last: str = "first",
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the log, keeping the prefixes leading up to a given activity.
For example, for a log with traces:
- A,B,C,D
- A,B,Z,A,B,C,D
- A,B,C,D,C,E,C,F
The prefixes to "C" are respectively:
- A,B
- A,B,Z,A,B
- A,B
:param log: Event log or Pandas DataFrame.
:param activity: Target activity for the filter.
:param strict: Applies the filter strictly, cutting the occurrences of the selected activity.
:param first_or_last: Decides if the first or last occurrence of an activity should be selected as the baseline for the filter.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_prefixes(
'Act. C',
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["strict"] = strict
parameters["first_or_last"] = first_or_last
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.prefixes import prefix_filter
return prefix_filter.apply(log, activity, parameters=parameters)
from pm4py.algo.filtering.log.prefixes import prefix_filter
return prefix_filter.apply(log, activity, parameters=parameters)
def filter_suffixes(
log: Union[EventLog, pd.DataFrame],
activity: str,
strict: bool = True,
first_or_last: str = "first",
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters the log, keeping the suffixes starting from a given activity.
For example, for a log with traces:
- A,B,C,D
- A,B,Z,A,B,C,D
- A,B,C,D,C,E,C,F
The suffixes from "C" are respectively:
- D
- D
- D,C,E,C,F
:param log: Event log or Pandas DataFrame.
:param activity: Target activity for the filter.
:param strict: Applies the filter strictly, cutting the occurrences of the selected activity.
:param first_or_last: Decides if the first or last occurrence of an activity should be selected as the baseline for the filter.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_suffixes(
'Act. C',
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["strict"] = strict
parameters["first_or_last"] = first_or_last
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.suffixes import suffix_filter
return suffix_filter.apply(log, activity, parameters=parameters)
from pm4py.algo.filtering.log.suffixes import suffix_filter
return suffix_filter.apply(log, activity, parameters=parameters)
def filter_ocel_event_attribute(
ocel: OCEL,
attribute_key: str,
attribute_values: Collection[Any],
positive: bool = True
) -> OCEL:
Filters the object-centric event log based on the provided event attribute values.
:param ocel: Object-centric event log.
:param attribute_key: Attribute at the event level to filter.
:param attribute_values: Collection of attribute values to keep or remove.
:param positive: Determines whether the values should be kept (True) or removed (False).
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_event_attribute(
['A', 'B', 'D']
from pm4py.algo.filtering.ocel import event_attributes
return event_attributes.apply(
event_attributes.Parameters.ATTRIBUTE_KEY: attribute_key,
event_attributes.Parameters.POSITIVE: positive
def filter_ocel_object_attribute(
ocel: OCEL,
attribute_key: str,
attribute_values: Collection[Any],
positive: bool = True
) -> OCEL:
Filters the object-centric event log based on the provided object attribute values.
:param ocel: Object-centric event log.
:param attribute_key: Attribute at the object level to filter.
:param attribute_values: Collection of attribute values to keep or remove.
:param positive: Determines whether the values should be kept (True) or removed (False).
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_object_attribute(
from pm4py.algo.filtering.ocel import object_attributes
return object_attributes.apply(
object_attributes.Parameters.ATTRIBUTE_KEY: attribute_key,
object_attributes.Parameters.POSITIVE: positive
def filter_ocel_object_types_allowed_activities(
ocel: OCEL,
correspondence_dict: Dict[str, Collection[str]]
) -> OCEL:
Filters an object-centric event log, keeping only the specified object types with the specified set of allowed activities.
:param ocel: Object-centric event log.
:param correspondence_dict: Dictionary containing, for every object type of interest, a collection of allowed activities.
Example: {"order": ["Create Order"], "element": ["Create Order", "Create Delivery"]}.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_object_types_allowed_activities(
{'order': ['create order', 'pay order'], 'item': ['create item', 'deliver item']}
from pm4py.algo.filtering.ocel import activity_type_matching
return activity_type_matching.apply(ocel, correspondence_dict)
def filter_ocel_object_per_type_count(
ocel: OCEL,
min_num_obj_type: Dict[str, int]
) -> OCEL:
Filters the events of the object-centric logs that are related to at least the specified number of objects per type.
pm4py.filter_object_per_type_count(ocel, {"order": 1, "element": 2})
Would keep the following events:
ocel:eid ocel:timestamp ocel:activity ocel:type:element ocel:type:order
0 e1 1980-01-01 Create Order [i4, i1, i3, i2] [o1]
1 e11 1981-01-01 Create Order [i6, i5] [o2]
2 e14 1981-01-04 Create Order [i8, i7] [o3]
:param ocel: Object-centric event log.
:param min_num_obj_type: Minimum number of objects per type.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_object_per_type_count(
{'order': 1, 'element': 2}
from pm4py.algo.filtering.ocel import objects_ot_count
return objects_ot_count.apply(ocel, min_num_obj_type)
def filter_ocel_start_events_per_object_type(
ocel: OCEL,
object_type: str
) -> OCEL:
Filters the events in which a new object of the given object type is spawned.
For example, an event with activity "Create Order" might spawn new orders.
:param ocel: Object-centric event log.
:param object_type: Object type to consider.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_start_events_per_object_type(
from pm4py.algo.filtering.ocel import ot_endpoints
return ot_endpoints.filter_start_events_per_object_type(ocel, object_type)
def filter_ocel_end_events_per_object_type(
ocel: OCEL,
object_type: str
) -> OCEL:
Filters the events in which an object of the given object type terminates its lifecycle.
For example, an event with activity "Pay Order" might terminate an order.
:param ocel: Object-centric event log.
:param object_type: Object type to consider.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_end_events_per_object_type(
from pm4py.algo.filtering.ocel import ot_endpoints
return ot_endpoints.filter_end_events_per_object_type(ocel, object_type)
def filter_ocel_events_timestamp(
ocel: OCEL,
min_timest: Union[datetime.datetime, str],
max_timest: Union[datetime.datetime, str],
timestamp_key: str = "ocel:timestamp"
) -> OCEL:
Filters the object-centric event log, keeping events within the provided timestamp range.
:param ocel: Object-centric event log.
:param min_timest: Left extreme of the allowed timestamp interval (format: YYYY-mm-dd HH:MM:SS).
:param max_timest: Right extreme of the allowed timestamp interval (format: YYYY-mm-dd HH:MM:SS).
:param timestamp_key: The attribute to use as timestamp (default: ocel:timestamp).
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_events_timestamp(
'1990-01-01 00:00:00',
'2010-01-01 00:00:00'
from pm4py.algo.filtering.ocel import event_attributes
return event_attributes.apply_timestamp(
parameters={"pm4py:param:timestamp_key": timestamp_key}
def filter_four_eyes_principle(
log: Union[EventLog, pd.DataFrame],
activity1: str,
activity2: str,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name",
resource_key: str = "org:resource",
keep_violations: bool = False
) -> Union[EventLog, pd.DataFrame]:
Filters out the cases of the log that violate the four-eyes principle on the provided activities.
:param log: Event log or Pandas DataFrame.
:param activity1: First activity.
:param activity2: Second activity.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:param resource_key: Attribute to be used as resource.
:param keep_violations: Boolean indicating whether to discard (if False) or retain (if True) the violations.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_four_eyes_principle(
'Act. A',
'Act. B',
properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key, resource_key=resource_key)
properties["positive"] = not keep_violations
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.ltl import ltl_checker
return ltl_checker.four_eyes_principle(log, activity1, activity2, parameters=properties)
from pm4py.algo.filtering.log.ltl import ltl_checker
return ltl_checker.four_eyes_principle(log, activity1, activity2, parameters=properties)
def filter_activity_done_different_resources(
log: Union[EventLog, pd.DataFrame],
activity: str,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name",
resource_key: str = "org:resource",
keep_violations: bool = True
) -> Union[EventLog, pd.DataFrame]:
Filters the cases where an activity is performed by different resources multiple times.
:param log: Event log or Pandas DataFrame.
:param activity: Activity to consider.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:param resource_key: Attribute to be used as resource.
:param keep_violations: Boolean indicating whether to discard (if False) or retain (if True) the violations.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
filtered_dataframe = pm4py.filter_activity_done_different_resources(
'Act. A',
properties = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key, resource_key=resource_key)
properties["positive"] = keep_violations
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.ltl import ltl_checker
return ltl_checker.attr_value_different_persons(log, activity, parameters=properties)
from pm4py.algo.filtering.log.ltl import ltl_checker
return ltl_checker.attr_value_different_persons(log, activity, parameters=properties)
def filter_trace_segments(
log: Union[EventLog, pd.DataFrame],
admitted_traces: List[List[str]],
positive: bool = True,
activity_key: str = "concept:name",
timestamp_key: str = "time:timestamp",
case_id_key: str = "case:concept:name"
) -> Union[EventLog, pd.DataFrame]:
Filters an event log based on a set of trace segments. A trace is a sequence of activities and "..."
- "..." before an activity indicates that other activities can precede the given activity.
- "..." after an activity indicates that other activities can follow the given activity.
- pm4py.filter_trace_segments(log, [["A", "B"]]) retains only cases with the exact process variant A,B.
- pm4py.filter_trace_segments(log, [["...", "A", "B"]]) retains only cases ending with activities A,B.
- pm4py.filter_trace_segments(log, [["A", "B", "..."]]) retains only cases starting with activities A,B.
- pm4py.filter_trace_segments(log, [["...", "A", "B", "C", "..."], ["...", "D", "E", "F", "..."]]) retains cases where:
- At any point, there is A followed by B followed by C,
- And at any other point, there is D followed by E followed by F.
:param log: Event log or Pandas DataFrame.
:param admitted_traces: Collection of trace segments to admit based on the criteria above.
:param positive: Boolean indicating whether to keep (if True) or discard (if False) the cases satisfying the filter.
:param activity_key: Attribute to be used for the activity.
:param timestamp_key: Attribute to be used for the timestamp.
:param case_id_key: Attribute to be used as case identifier.
:return: Filtered event log or Pandas DataFrame.
.. code-block:: python3
import pm4py
log = pm4py.read_xes("tests/input_data/running-example.xes")
filtered_log = pm4py.filter_trace_segments(
[["...", "check ticket", "decide", "reinitiate request", "..."]]
parameters = get_properties(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
parameters["positive"] = positive
if check_is_pandas_dataframe(log):
check_pandas_dataframe_columns(log, activity_key=activity_key, timestamp_key=timestamp_key, case_id_key=case_id_key)
from pm4py.algo.filtering.pandas.traces import trace_filter
return trace_filter.apply(log, admitted_traces, parameters=parameters)
from pm4py.algo.filtering.log.traces import trace_filter
return trace_filter.apply(log, admitted_traces, parameters=parameters)
def filter_ocel_object_types(
ocel: OCEL,
obj_types: Collection[str],
positive: bool = True,
level: int = 1
) -> OCEL:
Filters the object types of an object-centric event log.
:param ocel: Object-centric event log.
:param obj_types: Object types to keep or remove.
:param positive: Boolean indicating whether to keep (True) or remove (False) the specified object types.
:param level: Recursively expands the set of object identifiers until the specified level.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_object_types(
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
if level == 1:
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.objects = filtered_ocel.objects[filtered_ocel.objects[filtered_ocel.object_type_column].isin(obj_types)]
filtered_ocel.objects = filtered_ocel.objects[~filtered_ocel.objects[filtered_ocel.object_type_column].isin(obj_types)]
return filtering_utils.propagate_object_filtering(filtered_ocel)
object_ids = pandas_utils.format_unique(
return filter_ocel_objects(ocel, object_ids, level=level, positive=positive)
def filter_ocel_objects(
ocel: OCEL,
object_identifiers: Collection[str],
positive: bool = True,
level: int = 1
) -> OCEL:
Filters the object identifiers of an object-centric event log.
:param ocel: Object-centric event log.
:param object_identifiers: Object identifiers to keep or remove.
:param positive: Boolean indicating whether to keep (True) or remove (False) the specified object identifiers.
:param level: Recursively expands the set of object identifiers until the specified level.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_objects(
object_identifiers = set(object_identifiers)
if level > 1:
ev_rel_obj = ocel.relations.groupby(ocel.event_id_column)[ocel.object_id_column].agg(list).to_dict()
objects_ids = ocel.objects[ocel.object_id_column].to_numpy().tolist()
graph = {o: set() for o in objects_ids}
for ev in ev_rel_obj:
rel_obj = ev_rel_obj[ev]
for o1 in rel_obj:
for o2 in rel_obj:
if o1 != o2:
while level > 1:
curr = list(object_identifiers)
for el in curr:
for el2 in graph[el]:
level = level - 1
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.objects = filtered_ocel.objects[filtered_ocel.objects[ocel.object_id_column].isin(object_identifiers)]
filtered_ocel.objects = filtered_ocel.objects[~filtered_ocel.objects[ocel.object_id_column].isin(object_identifiers)]
return filtering_utils.propagate_object_filtering(filtered_ocel)
def filter_ocel_events(
ocel: OCEL,
event_identifiers: Collection[str],
positive: bool = True
) -> OCEL:
Filters the event identifiers of an object-centric event log.
:param ocel: Object-centric event log.
:param event_identifiers: Event identifiers to keep or remove.
:param positive: Boolean indicating whether to keep (True) or remove (False) the specified event identifiers.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_events(
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
filtered_ocel = copy(ocel)
if positive:
filtered_ocel.events = filtered_ocel.events[filtered_ocel.events[ocel.event_id_column].isin(event_identifiers)]
filtered_ocel.events = filtered_ocel.events[~filtered_ocel.events[ocel.event_id_column].isin(event_identifiers)]
return filtering_utils.propagate_event_filtering(filtered_ocel)
def filter_ocel_activities_connected_object_type(ocel: OCEL, object_type: str) -> OCEL:
Filter an OCEL on the set of activities executed on objects of the given object type.
:param ocel: object-centric event log
:param object_type: object type
:rtype: ``OCEL``
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel2("tests/input_data/ocel/ocel20_example.xmlocel")
filtered_ocel = pm4py.filter_ocel_activities_connected_object_type(ocel, "Purchase Order")
from copy import copy
from pm4py.objects.ocel.util import filtering_utils
relations = ocel.relations[ocel.relations[ocel.object_type_column] == object_type]
activities = relations[ocel.event_activity].unique()
filtered_ocel = copy(ocel)
filtered_ocel.relations = filtered_ocel.relations[filtered_ocel.relations[ocel.event_activity].isin(activities)]
return filtering_utils.propagate_relations_filtering(filtered_ocel)
def filter_ocel_cc_object(
ocel: OCEL,
object_id: str,
conn_comp: Optional[List[List[str]]] = None,
return_conn_comp: bool = False
) -> Union[OCEL, Tuple[OCEL, List[List[str]]]]:
Returns the connected component of the object-centric event log to which the specified object belongs.
:param ocel: Object-centric event log.
:param object_id: Object identifier.
:param conn_comp: (Optional) Precomputed connected components of the OCEL objects.
:param return_conn_comp: If True, returns the filtered OCEL along with the computed connected components.
:return: Filtered OCEL, optionally with the list of connected components.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_object(
if conn_comp is None:
from pm4py.algo.transformation.ocel.graphs import object_interaction_graph
g0 = object_interaction_graph.apply(ocel)
g = nx_utils.Graph()
for edge in g0:
g.add_edge(edge[0], edge[1])
conn_comp = list(nx_utils.connected_components(g))
for cc in conn_comp:
if object_id in cc:
if return_conn_comp:
return filter_ocel_objects(ocel, cc), conn_comp
return filter_ocel_objects(ocel, cc)
if return_conn_comp:
return filter_ocel_objects(ocel, [object_id]), conn_comp
return filter_ocel_objects(ocel, [object_id])
def filter_ocel_cc_length(
ocel: OCEL,
min_cc_length: int,
max_cc_length: int
) -> OCEL:
Keeps only the objects in an OCEL belonging to a connected component with a length
falling within the specified range.
Adams, Jan Niklas, et al. "Defining cases and variants for object-centric event data."
2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
:param ocel: Object-centric event log.
:param min_cc_length: Minimum allowed length for the connected component.
:param max_cc_length: Maximum allowed length for the connected component.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
filtered_ocel = pm4py.filter_ocel_cc_length(
from pm4py.algo.transformation.ocel.graphs import object_interaction_graph
g0 = object_interaction_graph.apply(ocel)
g = nx_utils.Graph()
for edge in g0:
g.add_edge(edge[0], edge[1])
conn_comp = list(nx_utils.connected_components(g))
conn_comp = [x for x in conn_comp if min_cc_length <= len(x) <= max_cc_length]
objs = [y for x in conn_comp for y in x]
return filter_ocel_objects(ocel, objs)
def filter_ocel_cc_otype(
ocel: OCEL,
otype: str,
positive: bool = True
) -> OCEL:
Filters the objects belonging to connected components that have at least one object of the specified type.
Adams, Jan Niklas, et al. "Defining cases and variants for object-centric event data."
2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
:param ocel: Object-centric event log.
:param otype: Object type to consider.
:param positive: Boolean indicating whether to keep (True) or discard (False) the objects in these components.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_otype(
if positive:
objs = set(
ocel.objects[ocel.objects[ocel.object_type_column] == otype][ocel.object_id_column]
objs = set(
ocel.objects[~(ocel.objects[ocel.object_type_column] == otype)][ocel.object_id_column]
from pm4py.algo.transformation.ocel.graphs import object_interaction_graph
g0 = object_interaction_graph.apply(ocel)
g = nx_utils.Graph()
for edge in g0:
g.add_edge(edge[0], edge[1])
conn_comp = list(nx_utils.connected_components(g))
conn_comp = [x for x in conn_comp if len(set(x).intersection(objs)) > 0]
objs = [y for x in conn_comp for y in x]
return filter_ocel_objects(ocel, objs)
def filter_ocel_cc_activity(
ocel: OCEL,
activity: str
) -> OCEL:
Filters the objects belonging to connected components that include at least one event with the specified activity.
Adams, Jan Niklas, et al. "Defining cases and variants for object-centric event data."
2022 4th International Conference on Process Mining (ICPM). IEEE, 2022.
:param ocel: Object-centric event log.
:param activity: Activity to consider.
:return: Filtered OCEL.
.. code-block:: python3
import pm4py
ocel = pm4py.read_ocel('log.jsonocel')
filtered_ocel = pm4py.filter_ocel_cc_activity(
'Create Order'
evs = ocel.events[ocel.events[ocel.event_activity] == activity][ocel.event_id_column].to_numpy().tolist()
objs = pandas_utils.format_unique(
from pm4py.algo.transformation.ocel.graphs import object_interaction_graph
g0 = object_interaction_graph.apply(ocel)
g = nx_utils.Graph()
for edge in g0:
g.add_edge(edge[0], edge[1])
conn_comp = list(nx_utils.connected_components(g))
conn_comp = [x for x in conn_comp if len(set(x).intersection(objs)) > 0]
objs = [y for x in conn_comp for y in x]
return filter_ocel_objects(ocel, objs)