Source code for pm4py.algo.querying.llm.abstractions.ocel_ocdfg_descr

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''

from pm4py.objects.ocel.obj import OCEL
from typing import Optional, Dict, Any
from pm4py.algo.discovery.ocel.ocdfg import algorithm as ocdfg_disc
import numpy as np
from enum import Enum
from pm4py.util import exec_utils, constants


[docs] class Parameters(Enum): MAX_LEN = "max_len" INCLUDE_HEADER = "include_header" INCLUDE_PERFORMANCE = "include_performance"
def __get_descr(curr, include_performance): stru = ( ' "%s" -> "%s" (frequency (number of events) = %d, frequency (number of objects) = %d' % (curr[1][0], curr[1][1], curr[2], curr[3])) if include_performance: stru += ", duration = %.2f" % curr[5] stru += ")\n" return stru
[docs] def apply(ocel: OCEL, parameters: Optional[Dict[Any, Any]] = None) -> str: if parameters is None: parameters = {} max_len = exec_utils.get_param_value( Parameters.MAX_LEN, parameters, constants.OPENAI_MAX_LEN ) include_header = exec_utils.get_param_value( Parameters.INCLUDE_HEADER, parameters, True ) include_performance = exec_utils.get_param_value( Parameters.INCLUDE_PERFORMANCE, parameters, True ) ocdfg = ocdfg_disc.apply(ocel, parameters=parameters) object_types = sorted(list(ocdfg["edges"]["total_objects"].keys())) edges = set() for ot in object_types: for e in ocdfg["edges"]["event_couples"][ot]: edges.add((ot, e)) edges_values = [] for obj in edges: ot = obj[0] e = obj[1] edges_values.append( [ obj[0], obj[1], len(ocdfg["edges"]["event_couples"][ot][e]), len(ocdfg["edges"]["unique_objects"][ot][e]), len(ocdfg["edges"]["total_objects"][ot][e]), float( np.average( ocdfg["edges_performance"]["event_couples"][ot][e] ) ), float( np.average( ocdfg["edges_performance"]["total_objects"][ot][e] ) ), ] ) edges_values = sorted( edges_values, key=lambda x: (x[2], x[5], x[0], x[1]), reverse=True ) i = 0 curr_len = 0 while i < len(edges_values): if curr_len >= max_len: break stru = __get_descr(edges_values[i], include_performance) curr_len += len(stru) i = i + 1 edges_values = edges_values[:i] ot_edges = {} for edg in edges_values: if not edg[0] in ot_edges: ot_edges[edg[0]] = [] ot_edges[edg[0]].append(edg) ret = ["\n"] if include_header: ret.append( "If I have an object-centric event log with the following directly follows graph (split between the different object types):\n" ) for ot in ot_edges: ret.append("\nObject type: %s\n" % (ot)) for edg in ot_edges[ot]: ret.append(__get_descr(edg, include_performance)) ret.append("\n") return "".join(ret)