Source code for pm4py.algo.clustering.trace_attribute_driven.merge_log.merge_log

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from functools import reduce
from scipy.cluster.hierarchy import fcluster
from pm4py.algo.clustering.trace_attribute_driven.util import filter_subsets
from pm4py.objects.log.importer.xes import importer as xes_importer
from pm4py.objects.log.obj import EventLog
from pm4py.util import constants


[docs] def merge_log(path, cate, iter): loglist = [] mergedlog = EventLog() for i in range(1, cate + 1): for j in range(1, iter + 1): log = xes_importer.apply( path + "\\log_1_" + str(i) + "_" + str(j) + ".xes" ) for trace in log: trace.attributes["concept:name"] = str(iter * (i - 1) + j) trace.attributes["index"] = str(iter * (i - 1) + j) loglist.append(log) for i in range(len(loglist)): for trace in loglist[i]: mergedlog.append(trace) return loglist, mergedlog
[docs] def update_merge(loglist): mergedlog = EventLog() for i in range(len(loglist)): for trace in loglist[i]: mergedlog.append(trace) return mergedlog
# this is for single string
[docs] def log2sublog(log, string, KEY): tracefilter_log = filter_subsets.apply_trace_attributes( log, [string], parameters={ constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY: KEY, "positive": True, }, ) return tracefilter_log
# this is for string list
[docs] def logslice(log, str_list, KEY): tracefilter_log = filter_subsets.apply_trace_attributes( log, str_list, parameters={ constants.PARAMETER_CONSTANT_ATTRIBUTE_KEY: KEY, "positive": True, }, ) return tracefilter_log
# Create a nested dictionary from the ClusterNode's returned by SciPy
[docs] def add_node(node, parent): # First create the new node and append it to its parent's children newNode = dict(node_id=node.id, children=[]) parent["children"].append(newNode) # Recursively add the current node's children if node.left: add_node(node.left, newNode) if node.right: add_node(node.right, newNode)
# Label each node with the names of each leaf in its subtree
[docs] def label_tree(n, id2name): # flatten_tree=[] # If the node is a leaf, then we have its name if len(n["children"]) == 0: leafNames = [id2name[n["node_id"]]] # If not, flatten all the leaves in the node's subtree else: leafNames = reduce( lambda ls, c: ls + label_tree(c, id2name), n["children"], [] ) # Delete the node id since we don't need it anymore and # it makes for cleaner JSON del n["node_id"] # Labeling convention: "-"-separated leaf names n["name"] = name = "-".join(sorted(map(str, leafNames))) return leafNames
[docs] def clusteredlog(Z, maxclust, list_of_vals, log, METHOD, ATTR_NAME): clu_index = fcluster(Z, maxclust, criterion="maxclust") clu_index = dict(zip(list_of_vals, clu_index)) clu_list_log = [] clu_list = [] for i in range(maxclust): temp = [key for key, value in clu_index.items() if value == i + 1] clu_list.append(temp) logtemp = logslice(log, temp, ATTR_NAME) clu_list_log.append(logtemp) return clu_list_log, clu_list