Source code for pm4py.objects.ocel.exporter.jsonocel.variants.ocel20_standard

'''
    PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.

Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects.ocel.exporter.jsonocel.variants import ocel20
from pm4py.objects.ocel.obj import OCEL
from typing import Optional, Dict, Any
from enum import Enum
import json
from pm4py.util import exec_utils, constants as pm4_constants


[docs] class Parameters(Enum): ENCODING = "encoding"
[docs] def apply( ocel: OCEL, target_path: str, parameters: Optional[Dict[Any, Any]] = None ): """ Exports an object-centric event log (OCEL 2.0) in a JSON-OCEL 2.0 standard file Parameters ------------------ ocel Object-centric event log target_path Destination path parameters Possible parameters of the method, including: - Parameters.ENCODING """ if parameters is None: parameters = {} encoding = exec_utils.get_param_value( Parameters.ENCODING, parameters, pm4_constants.DEFAULT_ENCODING ) legacy_object = ocel20.get_enriched_object(ocel) json_object = {} json_object["objectTypes"] = [] json_object["eventTypes"] = [] json_object["objects"] = [] json_object["events"] = [] for ot, attrs in legacy_object["ocel:objectTypes"].items(): descr = { "name": ot, "attributes": [{"name": x, "type": y} for x, y in attrs.items()], } json_object["objectTypes"].append(descr) for et, attrs in legacy_object["ocel:eventTypes"].items(): descr = { "name": et, "attributes": [{"name": x, "type": y} for x, y in attrs.items()], } json_object["eventTypes"].append(descr) obj_idx = {} for objid, obj in legacy_object["ocel:objects"].items(): descr = {} descr["id"] = objid descr["type"] = obj["ocel:type"] if "ocel:ovmap" in obj and obj["ocel:ovmap"]: descr["attributes"] = [] for k, v in obj["ocel:ovmap"].items(): descr["attributes"].append( {"name": k, "time": "1970-01-01T00:00:00Z", "value": v} ) if "ocel:o2o" in obj and obj["ocel:o2o"]: descr["relationships"] = [] for v in obj["ocel:o2o"]: descr["relationships"].append( { "objectId": v["ocel:oid"], "qualifier": v["ocel:qualifier"], } ) json_object["objects"].append(descr) obj_idx[objid] = len(obj_idx) eve_idx = {} for evid, eve in legacy_object["ocel:events"].items(): descr = {} descr["id"] = evid descr["type"] = eve["ocel:activity"] descr["time"] = eve["ocel:timestamp"] if "ocel:vmap" in eve and eve["ocel:vmap"]: descr["attributes"] = [] for k, v in eve["ocel:vmap"].items(): descr["attributes"].append({"name": k, "value": v}) if "ocel:typedOmap" in eve and eve["ocel:typedOmap"]: descr["relationships"] = [] for v in eve["ocel:typedOmap"]: descr["relationships"].append( { "objectId": v["ocel:oid"], "qualifier": v["ocel:qualifier"], } ) json_object["events"].append(descr) eve_idx[evid] = len(eve_idx) for change in legacy_object["ocel:objectChanges"]: oid = change["ocel:oid"] obj = json_object["objects"][obj_idx[oid]] obj["attributes"].append( { "name": change["ocel:field"], "time": change["ocel:timestamp"], "value": change[change["ocel:field"]], } ) json_object["objects"][obj_idx[oid]] = obj F = open(target_path, "w", encoding=encoding) json.dump(json_object, F, indent=2) F.close()