Source code for tests.other_tests

import os
import unittest
import importlib.util
from pm4py.algo.discovery.log_skeleton import algorithm as lsk_alg
from pm4py.algo.conformance.log_skeleton import algorithm as lsk_conf_alg
from pm4py.objects.process_tree.importer import importer as ptree_importer
from pm4py.objects.process_tree.exporter import exporter as ptree_exporter
from pm4py.algo.discovery.performance_spectrum.variants import log as log_pspectrum, dataframe as df_pspectrum
from pm4py.objects.dfg.importer import importer as dfg_importer
from pm4py.objects.dfg.exporter import exporter as dfg_exporter
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.statistics.start_activities.log import get as start_activities
from pm4py.statistics.end_activities.log import get as end_activities
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.statistics.variants.log import get as variants_get
from pm4py.algo.simulation.playout.petri_net import algorithm
from pm4py.objects.conversion.log import converter
from pm4py.objects.log.util import dataframe_utils
from pm4py.util import constants, pandas_utils
from pm4py.objects.conversion.process_tree import converter as process_tree_converter


[docs] class OtherPartsTests(unittest.TestCase):
[docs] def test_emd_1(self): if importlib.util.find_spec("pyemd"): from pm4py.algo.evaluation.earth_mover_distance import algorithm as earth_mover_distance M = {("a", "b", "d", "e"): 0.49, ("a", "d", "b", "e"): 0.49, ("a", "c", "d", "e"): 0.01, ("a", "d", "c", "e"): 0.01} L1 = {("a", "b", "d", "e"): 0.49, ("a", "d", "b", "e"): 0.49, ("a", "c", "d", "e"): 0.01, ("a", "d", "c", "e"): 0.01} earth_mover_distance.apply(M, L1)
[docs] def test_emd_2(self): if importlib.util.find_spec("pyemd"): from pm4py.algo.evaluation.earth_mover_distance import algorithm as earth_mover_distance log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) lang_log = variants_get.get_language(log) process_tree = inductive_miner.apply(log) net1, im1, fm1 = process_tree_converter.apply(process_tree) lang_model1 = variants_get.get_language( algorithm.apply(net1, im1, fm1, variant=algorithm.Variants.STOCHASTIC_PLAYOUT, parameters={algorithm.Variants.STOCHASTIC_PLAYOUT.value.Parameters.LOG: log})) emd = earth_mover_distance.apply(lang_model1, lang_log)
[docs] def test_importing_dfg(self): dfg, sa, ea = dfg_importer.apply(os.path.join("input_data", "running-example.dfg"))
[docs] def test_exporting_dfg(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) dfg = dfg_discovery.apply(log) dfg_exporter.apply(dfg, os.path.join("test_output_data", "running-example.dfg")) dfg, sa, ea = dfg_importer.apply(os.path.join("test_output_data", "running-example.dfg")) os.remove(os.path.join("test_output_data", "running-example.dfg"))
[docs] def test_exporting_dfg_with_sa_ea(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) dfg = dfg_discovery.apply(log) sa = start_activities.get_start_activities(log) ea = end_activities.get_end_activities(log) dfg_exporter.apply(dfg, os.path.join("test_output_data", "running-example.dfg"), parameters={dfg_exporter.Variants.CLASSIC.value.Parameters.START_ACTIVITIES: sa, dfg_exporter.Variants.CLASSIC.value.Parameters.END_ACTIVITIES: ea}) dfg, sa, ea = dfg_importer.apply(os.path.join("test_output_data", "running-example.dfg")) os.remove(os.path.join("test_output_data", "running-example.dfg"))
[docs] def test_log_skeleton(self): log = xes_importer.apply(os.path.join("input_data", "receipt.xes")) skeleton = lsk_alg.apply(log) conf_res = lsk_conf_alg.apply(log, skeleton)
[docs] def test_performance_spectrum_log(self): log = xes_importer.apply(os.path.join("input_data", "receipt.xes")) pspectr = log_pspectrum.apply(log, ["T02 Check confirmation of receipt", "T03 Adjust confirmation of receipt"], 1000, {})
[docs] def test_performance_spectrum_df(self): df = pandas_utils.read_csv(os.path.join("input_data", "receipt.csv")) df = dataframe_utils.convert_timestamp_columns_in_df(df, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT) pspectr = df_pspectrum.apply(df, ["T02 Check confirmation of receipt", "T03 Adjust confirmation of receipt"], 1000, {})
[docs] def test_alignment(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.alpha import algorithm as alpha_miner net, im, fm = alpha_miner.apply(log) from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments aligned_traces = alignments.apply(log, net, im, fm, variant=alignments.Variants.VERSION_STATE_EQUATION_A_STAR) aligned_traces = alignments.apply(log, net, im, fm, variant=alignments.Variants.VERSION_DIJKSTRA_NO_HEURISTICS)
[docs] def test_import_export_ptml(self): tree = ptree_importer.apply(os.path.join("input_data", "running-example.ptml")) ptree_exporter.apply(tree, os.path.join("test_output_data", "running-example2.ptml")) os.remove(os.path.join("test_output_data", "running-example2.ptml"))
[docs] def test_footprints_net(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.alpha import algorithm as alpha_miner net, im, fm = alpha_miner.apply(log) from pm4py.algo.discovery.footprints import algorithm as footprints_discovery fp_entire_log = footprints_discovery.apply(log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG) fp_trace_trace = footprints_discovery.apply(log) fp_net = footprints_discovery.apply(net, im) from pm4py.algo.conformance.footprints import algorithm as footprints_conformance conf1 = footprints_conformance.apply(fp_entire_log, fp_net) conf2 = footprints_conformance.apply(fp_trace_trace, fp_net) conf3 = footprints_conformance.apply(fp_entire_log, fp_net, variant=footprints_conformance.Variants.LOG_EXTENSIVE) conf4 = footprints_conformance.apply(fp_trace_trace, fp_net, variant=footprints_conformance.Variants.TRACE_EXTENSIVE)
[docs] def test_footprints_tree(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.inductive import algorithm as inductive_miner tree = inductive_miner.apply(log) from pm4py.algo.discovery.footprints import algorithm as footprints_discovery fp_entire_log = footprints_discovery.apply(log, variant=footprints_discovery.Variants.ENTIRE_EVENT_LOG) fp_trace_trace = footprints_discovery.apply(log) fp_tree = footprints_discovery.apply(tree) from pm4py.algo.conformance.footprints import algorithm as footprints_conformance conf1 = footprints_conformance.apply(fp_entire_log, fp_tree) conf2 = footprints_conformance.apply(fp_trace_trace, fp_tree) conf3 = footprints_conformance.apply(fp_entire_log, fp_tree, variant=footprints_conformance.Variants.LOG_EXTENSIVE) conf4 = footprints_conformance.apply(fp_trace_trace, fp_tree, variant=footprints_conformance.Variants.TRACE_EXTENSIVE)
[docs] def test_footprints_tree_df(self): df = pandas_utils.read_csv(os.path.join("input_data", "running-example.csv")) df = dataframe_utils.convert_timestamp_columns_in_df(df, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT) from pm4py.algo.discovery.inductive import algorithm as inductive_miner log = converter.apply(df, variant=converter.Variants.TO_EVENT_LOG) tree = inductive_miner.apply(log) from pm4py.algo.discovery.footprints import algorithm as footprints_discovery fp_df = footprints_discovery.apply(df) fp_tree = footprints_discovery.apply(tree) from pm4py.algo.conformance.footprints import algorithm as footprints_conformance conf = footprints_conformance.apply(fp_df, fp_tree)
[docs] def test_conversion_pn_to_pt(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.alpha import algorithm as alpha_miner net, im, fm = alpha_miner.apply(log) from pm4py.objects.conversion.wf_net import converter as wf_net_converter tree = wf_net_converter.apply(net, im, fm, variant=wf_net_converter.Variants.TO_PROCESS_TREE)
[docs] def test_playout_tree_basic(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.inductive import algorithm as inductive_miner tree = inductive_miner.apply(log) from pm4py.algo.simulation.playout.process_tree import algorithm as tree_playout new_log = tree_playout.apply(tree)
[docs] def test_playout_tree_extensive(self): log = xes_importer.apply(os.path.join("input_data", "running-example.xes")) from pm4py.algo.discovery.inductive import algorithm as inductive_miner tree = inductive_miner.apply(log) from pm4py.algo.simulation.playout.process_tree import algorithm as tree_playout new_log = tree_playout.apply(tree, variant=tree_playout.Variants.EXTENSIVE)
[docs] def test_service_time_xes(self): log = xes_importer.apply(os.path.join("input_data", "interval_event_log.xes")) from pm4py.statistics.service_time.log import get soj_time = get.apply(log, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_service_time_pandas(self): dataframe = pandas_utils.read_csv(os.path.join("input_data", "interval_event_log.csv")) from pm4py.objects.log.util import dataframe_utils dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT) from pm4py.statistics.service_time.pandas import get soj_time = get.apply(dataframe, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_concurrent_activities_xes(self): log = xes_importer.apply(os.path.join("input_data", "interval_event_log.xes")) from pm4py.statistics.concurrent_activities.log import get conc_act = get.apply(log, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_concurrent_activities_pandas(self): dataframe = pandas_utils.read_csv(os.path.join("input_data", "interval_event_log.csv")) from pm4py.objects.log.util import dataframe_utils dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT) from pm4py.statistics.concurrent_activities.pandas import get conc_act = get.apply(dataframe, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_efg_xes(self): log = xes_importer.apply(os.path.join("input_data", "interval_event_log.xes")) from pm4py.statistics.eventually_follows.log import get efg = get.apply(log, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_efg_pandas(self): dataframe = pandas_utils.read_csv(os.path.join("input_data", "interval_event_log.csv")) from pm4py.objects.log.util import dataframe_utils dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT) from pm4py.statistics.eventually_follows.pandas import get efg = get.apply(dataframe, parameters={get.Parameters.START_TIMESTAMP_KEY: "start_timestamp"})
[docs] def test_dfg_playout(self): import pm4py from pm4py.algo.simulation.playout.dfg import algorithm as dfg_playout log = pm4py.read_xes(os.path.join("input_data", "running-example.xes")) dfg, sa, ea = pm4py.discover_dfg(log) dfg_playout.apply(dfg, sa, ea)
[docs] def test_dfg_align(self): import pm4py from pm4py.algo.filtering.dfg import dfg_filtering from pm4py.algo.conformance.alignments.dfg import algorithm as dfg_alignment log = pm4py.read_xes(os.path.join("input_data", "running-example.xes")) dfg, sa, ea = pm4py.discover_dfg(log) act_count = pm4py.get_event_attribute_values(log, "concept:name") dfg, sa, ea, act_count = dfg_filtering.filter_dfg_on_activities_percentage(dfg, sa, ea, act_count, 0.5) dfg, sa, ea, act_count = dfg_filtering.filter_dfg_on_paths_percentage(dfg, sa, ea, act_count, 0.5) aligned_traces = dfg_alignment.apply(log, dfg, sa, ea)
[docs] def test_insert_idx_in_trace(self): df = pandas_utils.read_csv(os.path.join("input_data", "running-example.csv")) df = pandas_utils.insert_ev_in_tr_index(df)
[docs] def test_automatic_feature_extraction(self): df = pandas_utils.read_csv(os.path.join("input_data", "receipt.csv")) fea_df = dataframe_utils.automatic_feature_extraction_df(df)
[docs] def test_log_to_trie(self): import pm4py from pm4py.algo.transformation.log_to_trie import algorithm as log_to_trie log = pm4py.read_xes(os.path.join("input_data", "running-example.xes")) trie = log_to_trie.apply(log)
[docs] def test_minimum_self_distance(self): import pm4py from pm4py.algo.discovery.minimum_self_distance import algorithm as minimum_self_distance log = pm4py.read_xes(os.path.join("input_data", "running-example.xes")) msd = minimum_self_distance.apply(log)
[docs] def test_projection_univariate_log(self): import pm4py from pm4py.util.compression import util as compression_util log = pm4py.read_xes(os.path.join("input_data", "receipt.xes")) cl = compression_util.project_univariate(log, "concept:name") # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_projection_univariate_df(self): from pm4py.util.compression import util as compression_util dataframe = pandas_utils.read_csv(os.path.join("input_data", "receipt.csv")) dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT, timest_columns=["time:timestamp"]) cl = compression_util.project_univariate(dataframe, "concept:name") # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_compression_univariate_log(self): import pm4py from pm4py.util.compression import util as compression_util log = pm4py.read_xes(os.path.join("input_data", "receipt.xes")) cl, lookup = compression_util.compress_univariate(log, "concept:name") # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_compression_univariate_df(self): from pm4py.util.compression import util as compression_util dataframe = pandas_utils.read_csv(os.path.join("input_data", "receipt.csv")) dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT, timest_columns=["time:timestamp"]) cl, lookup = compression_util.compress_univariate(dataframe, "concept:name") # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_compression_multivariate_log(self): import pm4py from pm4py.util.compression import util as compression_util log = pm4py.read_xes(os.path.join("input_data", "receipt.xes")) cl, lookup = compression_util.compress_multivariate(log, ["concept:name", "org:resource"]) # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_compression_multivariate_df(self): from pm4py.util.compression import util as compression_util dataframe = pandas_utils.read_csv(os.path.join("input_data", "receipt.csv")) dataframe = dataframe_utils.convert_timestamp_columns_in_df(dataframe, timest_format=constants.DEFAULT_TIMESTAMP_PARSE_FORMAT, timest_columns=["time:timestamp"]) cl, lookup = compression_util.compress_multivariate(dataframe, ["concept:name", "org:resource"]) # just verify that the set is non-empty self.assertTrue(compression_util.get_start_activities(cl)) self.assertTrue(compression_util.get_end_activities(cl)) self.assertTrue(compression_util.get_alphabet(cl)) self.assertTrue(compression_util.discover_dfg(cl)) self.assertTrue(compression_util.get_variants(cl))
[docs] def test_log_to_target_rem_time(self): import pm4py from pm4py.algo.transformation.log_to_target import algorithm as log_to_target log = pm4py.read_xes("input_data/running-example.xes") rem_time_target, classes = log_to_target.apply(log, variant=log_to_target.Variants.REMAINING_TIME)
[docs] def test_log_to_target_next_time(self): import pm4py from pm4py.algo.transformation.log_to_target import algorithm as log_to_target log = pm4py.read_xes("input_data/running-example.xes") next_time_target, classes = log_to_target.apply(log, variant=log_to_target.Variants.NEXT_TIME)
[docs] def test_log_to_target_next_activity(self): import pm4py from pm4py.algo.transformation.log_to_target import algorithm as log_to_target log = pm4py.read_xes("input_data/running-example.xes") next_activity_target, next_activities = log_to_target.apply(log, variant=log_to_target.Variants.NEXT_ACTIVITY)
[docs] def test_ocel_split_cc_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.split_ocel import algorithm as split_ocel res = split_ocel.apply(ocel, variant=split_ocel.Variants.CONNECTED_COMPONENTS)
[docs] def test_ocel_split_ancestors_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.split_ocel import algorithm as split_ocel res = split_ocel.apply(ocel, parameters={"object_type": "order"}, variant=split_ocel.Variants.ANCESTORS_DESCENDANTS)
[docs] def test_ocel_object_features_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.features.objects import algorithm as ocel_fea res = ocel_fea.apply(ocel)
[docs] def test_ocel_event_features_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.features.events import algorithm as ocel_fea res = ocel_fea.apply(ocel)
[docs] def test_ocel_event_object_features_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.features.events_objects import algorithm as ocel_fea res = ocel_fea.apply(ocel)
[docs] def test_ocel_interaction_graph_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.graphs import object_interaction_graph object_interaction_graph.apply(ocel)
[docs] def test_ocel_descendants_graph_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.graphs import object_descendants_graph object_descendants_graph.apply(ocel)
[docs] def test_ocel_inheritance_graph_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.graphs import object_inheritance_graph object_inheritance_graph.apply(ocel)
[docs] def test_ocel_cobirth_graph_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.graphs import object_cobirth_graph object_cobirth_graph.apply(ocel)
[docs] def test_ocel_codeath_graph_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.graphs import object_codeath_graph object_codeath_graph.apply(ocel)
[docs] def test_ocel_description_non_simpl_interface(self): import pm4py ocel = pm4py.read_ocel("input_data/ocel/example_log.jsonocel") from pm4py.algo.transformation.ocel.description.variants import variant1 variant1.apply(ocel)
if __name__ == "__main__": unittest.main()