'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from pm4py.objects.bpmn.obj import BPMN
from pm4py.util import constants, exec_utils
from enum import Enum
[docs]
class Parameters(Enum):
ENCODING = "encoding"
[docs]
class Counts:
def __init__(self):
self.number_processes = 0
[docs]
def parse_element(
bpmn_graph,
counts,
curr_el,
parents,
incoming_dict,
outgoing_dict,
nodes_dict,
nodes_bounds,
flow_info,
process=None,
node=None,
bpmn_element=None,
flow=None,
rec_depth=0,
):
"""
Parses a BPMN element from the XML file
"""
layout = bpmn_graph.get_layout()
tag = curr_el.tag.lower()
if curr_el.get("id") is not None:
if tag.endswith("collaboration"):
collaboration_id = curr_el.get("id")
collaboration = BPMN.Collaboration(
id=collaboration_id, process=collaboration_id
)
bpmn_graph.add_node(collaboration)
nodes_dict[collaboration_id] = collaboration
elif tag.endswith("participant"):
participant_id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
process_ref = curr_el.get("processRef")
participant = BPMN.Participant(
id=participant_id,
name=name,
process=None,
process_ref=process_ref,
)
bpmn_graph.add_node(participant)
nodes_dict[participant_id] = participant
elif tag.endswith("textannotation"):
annotation_id = curr_el.get("id")
text = ""
for child in curr_el:
text = child.text
annotation = BPMN.TextAnnotation(
id=annotation_id, text=text, process=process
)
bpmn_graph.add_node(annotation)
nodes_dict[annotation_id] = annotation
elif tag.endswith("subprocess"): # subprocess invocation
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
subprocess = BPMN.SubProcess(
id=curr_el.get("id"),
name=name,
process=process,
depth=rec_depth,
)
bpmn_graph.add_node(subprocess)
node = subprocess
process = curr_el.get("id")
nodes_dict[process] = node
elif tag.endswith("process"): # process of the current subtree
process = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
bpmn_graph.set_process_id(process)
bpmn_graph.set_name(name)
elif tag.endswith(
"shape"
): # shape of a node, contains x,y,width,height information
bpmn_element = curr_el.get("bpmnElement")
elif tag.endswith("task"): # simple task object
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
# this_type = str(curr_el.tag)
# this_type = this_type[this_type.index("}") + 1:]
if tag.endswith("usertask"):
task = BPMN.UserTask(id=id, name=name, process=process)
elif tag.endswith("sendtask"):
task = BPMN.SendTask(id=id, name=name, process=process)
else:
task = BPMN.Task(id=id, name=name, process=process)
bpmn_graph.add_node(task)
node = task
nodes_dict[id] = node
elif tag.endswith(
"startevent"
): # start node starting the (sub)process
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", " ").replace("\n", " ")
if "name" in curr_el.attrib
else ""
)
event_definitions = [
child.tag.lower().replace("eventdefinition", "")
for child in curr_el
if child.tag.lower().endswith("eventdefinition")
]
if len(event_definitions) > 0:
event_type = event_definitions[0]
if event_type.endswith("message"):
start_event = BPMN.MessageStartEvent(
id=curr_el.get("id"), name=name, process=process
)
else: # TODO: expand functionality, support more start event types
start_event = BPMN.NormalStartEvent(
id=curr_el.get("id"), name=name, process=process
)
else:
start_event = BPMN.NormalStartEvent(
id=curr_el.get("id"), name=name, process=process
)
bpmn_graph.add_node(start_event)
node = start_event
nodes_dict[id] = node
elif tag.endswith("endevent"): # end node ending the (sub)process
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", " ").replace("\n", " ")
if "name" in curr_el.attrib
else ""
)
event_definitions = [
child.tag.lower().replace("eventdefinition", "")
for child in curr_el
if child.tag.lower().endswith("eventdefinition")
]
if len(event_definitions) > 0:
event_type = event_definitions[0]
if event_type.endswith("message"):
end_event = BPMN.MessageEndEvent(
id=curr_el.get("id"), name=name, process=process
)
elif event_type.endswith("terminate"):
end_event = BPMN.TerminateEndEvent(
id=curr_el.get("id"), name=name, process=process
)
elif event_type.endswith("error"):
end_event = BPMN.ErrorEndEvent(
id=curr_el.get("id"), name=name, process=process
)
elif event_type.endswith("cancel"):
end_event = BPMN.CancelEndEvent(
id=curr_el.get("id"), name=name, process=process
)
else: # TODO: expand functionality, support more start event types
end_event = BPMN.NormalEndEvent(
id=curr_el.get("id"), name=name, process=process
)
else:
end_event = BPMN.NormalEndEvent(
id=curr_el.get("id"), name=name, process=process
)
bpmn_graph.add_node(end_event)
node = end_event
nodes_dict[id] = node
elif tag.endswith(
"intermediatecatchevent"
): # intermediate event that happens (externally) and can be catched
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", " ").replace("\n", " ")
if "name" in curr_el.attrib
else ""
)
event_definitions = [
child.tag.lower().replace("eventdefinition", "")
for child in curr_el
if child.tag.lower().endswith("eventdefinition")
]
if len(event_definitions) > 0:
event_type = event_definitions[0]
if event_type.endswith("message"):
intermediate_catch_event = (
BPMN.MessageIntermediateCatchEvent(
id=curr_el.get("id"), name=name, process=process
)
)
elif event_type.endswith("error"):
intermediate_catch_event = (
BPMN.ErrorIntermediateCatchEvent(
id=curr_el.get("id"), name=name, process=process
)
)
elif event_type.endswith("cancel"):
intermediate_catch_event = (
BPMN.CancelIntermediateCatchEvent(
id=curr_el.get("id"), name=name, process=process
)
)
else:
intermediate_catch_event = BPMN.IntermediateCatchEvent(
id=curr_el.get("id"), name=name, process=process
)
else:
intermediate_catch_event = BPMN.IntermediateCatchEvent(
id=curr_el.get("id"), name=name, process=process
)
bpmn_graph.add_node(intermediate_catch_event)
node = intermediate_catch_event
nodes_dict[id] = node
elif tag.endswith(
"intermediatethrowevent"
): # intermediate event that is activated through the (sub)process
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", " ").replace("\n", " ")
if "name" in curr_el.attrib
else ""
)
event_definitions = [
child.tag.lower().replace("eventdefinition", "")
for child in curr_el
if child.tag.lower().endswith("eventdefinition")
]
if len(event_definitions) > 0:
event_type = event_definitions[0]
if event_type.endswith("message"):
intermediate_throw_event = (
BPMN.MessageIntermediateThrowEvent(
id=curr_el.get("id"), name=name, process=process
)
)
else:
intermediate_throw_event = (
BPMN.NormalIntermediateThrowEvent(
id=curr_el.get("id"), name=name, process=process
)
)
else:
intermediate_throw_event = BPMN.NormalIntermediateThrowEvent(
id=curr_el.get("id"), name=name, process=process
)
bpmn_graph.add_node(intermediate_throw_event)
node = intermediate_throw_event
nodes_dict[id] = node
elif tag.endswith("boundaryevent"):
id = curr_el.get("id")
ref_activity = curr_el.get("attachedToRef")
name = (
curr_el.get("name").replace("\r", " ").replace("\n", " ")
if "name" in curr_el.attrib
else ""
)
event_definitions = [
child.tag.lower().replace("eventdefinition", "")
for child in curr_el
if child.tag.lower().endswith("eventdefinition")
]
if len(event_definitions) > 0:
event_type = event_definitions[0]
if event_type.endswith("message"):
boundary_event = BPMN.MessageBoundaryEvent(
id=curr_el.get("id"),
name=name,
process=process,
activity=ref_activity,
)
elif event_type.endswith("error"):
boundary_event = BPMN.ErrorBoundaryEvent(
id=curr_el.get("id"),
name=name,
process=process,
activity=ref_activity,
)
elif event_type.endswith("cancel"):
boundary_event = BPMN.CancelBoundaryEvent(
id=curr_el.get("id"),
name=name,
process=process,
activity=ref_activity,
)
else:
boundary_event = BPMN.BoundaryEvent(
id=curr_el.get("id"),
name=name,
process=process,
activity=ref_activity,
)
else:
boundary_event = BPMN.BoundaryEvent(
id=curr_el.get("id"),
name=name,
process=process,
activity=ref_activity,
)
bpmn_graph.add_node(boundary_event)
node = boundary_event
nodes_dict[id] = node
elif tag.endswith("edge"): # related to the x, y information of an arc
bpmnElement = curr_el.get("bpmnElement")
flow = bpmnElement
elif tag.endswith("exclusivegateway"):
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
try:
direction = BPMN.Gateway.Direction[
curr_el.get("gatewayDirection").upper()
]
exclusive_gateway = BPMN.ExclusiveGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=direction,
process=process,
)
except BaseException:
exclusive_gateway = BPMN.ExclusiveGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=BPMN.Gateway.Direction.UNSPECIFIED,
process=process,
)
bpmn_graph.add_node(exclusive_gateway)
node = exclusive_gateway
nodes_dict[id] = node
elif tag.endswith("parallelgateway"):
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
try:
direction = BPMN.Gateway.Direction[
curr_el.get("gatewayDirection").upper()
]
parallel_gateway = BPMN.ParallelGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=direction,
process=process,
)
except BaseException:
parallel_gateway = BPMN.ParallelGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=BPMN.Gateway.Direction.UNSPECIFIED,
process=process,
)
bpmn_graph.add_node(parallel_gateway)
node = parallel_gateway
nodes_dict[id] = node
elif tag.endswith("inclusivegateway"):
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
try:
direction = BPMN.Gateway.Direction[
curr_el.get("gatewayDirection").upper()
]
inclusive_gateway = BPMN.InclusiveGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=direction,
process=process,
)
except BaseException:
inclusive_gateway = BPMN.InclusiveGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=BPMN.Gateway.Direction.UNSPECIFIED,
process=process,
)
bpmn_graph.add_node(inclusive_gateway)
node = inclusive_gateway
nodes_dict[id] = node
elif tag.endswith("eventbasedgateway"):
id = curr_el.get("id")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
try:
direction = BPMN.Gateway.Direction[
curr_el.get("gatewayDirection").upper()
]
event_based_gateway = BPMN.EventBasedGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=direction,
process=process,
)
except BaseException:
event_based_gateway = BPMN.EventBasedGateway(
id=curr_el.get("id"),
name=name,
gateway_direction=BPMN.Gateway.Direction.UNSPECIFIED,
process=process,
)
bpmn_graph.add_node(event_based_gateway)
node = event_based_gateway
nodes_dict[id] = node
# FIXED: Moved the flow handling inside the id block
elif (
tag.endswith("sequenceflow")
or tag.endswith("messageflow")
or tag.endswith("association")
):
seq_flow_id = curr_el.get("id")
source_ref = curr_el.get("sourceRef")
target_ref = curr_el.get("targetRef")
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
if source_ref is not None and target_ref is not None:
incoming_dict[seq_flow_id] = (
target_ref,
process,
tag,
name,
parents,
)
outgoing_dict[seq_flow_id] = (
source_ref,
process,
tag,
name,
parents,
)
elif tag.endswith("incoming"): # incoming flow of a node
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
if node is not None:
if curr_el.text.strip() not in incoming_dict:
incoming_dict[curr_el.text.strip()] = (
node,
process,
tag,
name,
parents,
)
elif tag.endswith("outgoing"): # outgoing flow of a node
name = (
curr_el.get("name").replace("\r", "").replace("\n", "")
if "name" in curr_el.attrib
else ""
)
if node is not None:
if curr_el.text.strip() not in outgoing_dict:
outgoing_dict[curr_el.text.strip()] = (
node,
process,
tag,
name,
parents,
)
elif tag.endswith(
"waypoint"
): # contains information of x, y values of an edge
if flow is not None:
x = float(curr_el.get("x"))
y = float(curr_el.get("y"))
if flow not in flow_info:
flow_info[flow] = []
flow_info[flow].append((x, y))
elif tag.endswith(
"label"
): # label of a node, mostly at the end of a shape object
bpmn_element = None
elif tag.endswith(
"bounds"
): # contains information of width, height, x, y of a node
if bpmn_element is not None:
x = float(curr_el.get("x"))
y = float(curr_el.get("y"))
width = float(curr_el.get("width"))
height = float(curr_el.get("height"))
nodes_bounds[bpmn_element] = {
"x": x,
"y": y,
"width": width,
"height": height,
}
for child in curr_el:
bpmn_graph = parse_element(
bpmn_graph,
counts,
child,
list(parents) + [child],
incoming_dict,
outgoing_dict,
nodes_dict,
nodes_bounds,
flow_info,
process=process,
node=node,
bpmn_element=bpmn_element,
flow=flow,
rec_depth=rec_depth + 1,
)
# afterprocessing when the xml tree has been recursively parsed already
if rec_depth == 0:
# bpmn_graph.set_process_id(process)
for seq_flow_id in incoming_dict:
if incoming_dict[seq_flow_id][0] in nodes_dict:
incoming_dict[seq_flow_id] = (
nodes_dict[incoming_dict[seq_flow_id][0]],
incoming_dict[seq_flow_id][1],
incoming_dict[seq_flow_id][2],
incoming_dict[seq_flow_id][3],
incoming_dict[seq_flow_id][4],
)
for seq_flow_id in outgoing_dict:
if outgoing_dict[seq_flow_id][0] in nodes_dict:
outgoing_dict[seq_flow_id] = (
nodes_dict[outgoing_dict[seq_flow_id][0]], outgoing_dict[seq_flow_id][1], outgoing_dict[seq_flow_id][2],
outgoing_dict[seq_flow_id][3], outgoing_dict[seq_flow_id][4])
# also supports flows without waypoints
flows_without_waypoints = set(flow_info).union(
set(outgoing_dict).intersection(set(incoming_dict))
)
for flow_id in flows_without_waypoints:
flow_info[flow_id] = []
for flow_id in flow_info:
if flow_id in outgoing_dict and flow_id in incoming_dict:
flow = None
flow_type = outgoing_dict[flow_id][2]
if flow_type.endswith("messageflow"):
collaboration_id = None
for par in outgoing_dict[flow_id][4]:
par_tag = str(par.tag).lower()
if par_tag.endswith("collaboration"):
collaboration_id = par.get("id")
flow = BPMN.MessageFlow(
outgoing_dict[flow_id][0],
incoming_dict[flow_id][0],
id=flow_id,
name=outgoing_dict[flow_id][3],
process=collaboration_id,
)
elif flow_type.endswith("association"):
flow = BPMN.Association(
outgoing_dict[flow_id][0],
incoming_dict[flow_id][0],
id=flow_id,
name=outgoing_dict[flow_id][3],
process=outgoing_dict[flow_id][1],
)
else:
flow = BPMN.SequenceFlow(
outgoing_dict[flow_id][0],
incoming_dict[flow_id][0],
id=flow_id,
name=outgoing_dict[flow_id][3],
process=outgoing_dict[flow_id][1],
)
if flow is not None:
bpmn_graph.add_flow(flow)
layout.get(flow).del_waypoints()
for waypoint in flow_info[flow_id]:
layout.get(flow).add_waypoint(waypoint)
for node_id in nodes_bounds:
if node_id in nodes_dict:
bounds = nodes_bounds[node_id]
node = nodes_dict[node_id]
layout.get(node).set_x(bounds["x"])
layout.get(node).set_y(bounds["y"])
layout.get(node).set_width(bounds["width"])
layout.get(node).set_height(bounds["height"])
return bpmn_graph
[docs]
def import_xml_tree_from_root(root):
"""
Imports a BPMN graph from (the root of) an XML tree
Parameters
-------------
root
Root of the tree
Returns
-------------
bpmn_graph
BPMN graph
"""
bpmn_graph = BPMN()
counts = Counts()
incoming_dict = {}
outgoing_dict = {}
nodes_dict = {}
nodes_bounds = {}
flow_info = {}
return parse_element(
bpmn_graph,
counts,
root,
[],
incoming_dict,
outgoing_dict,
nodes_dict,
nodes_bounds,
flow_info,
)
[docs]
def apply(path, parameters=None):
"""
Imports a BPMN diagram from a file
Parameters
-------------
path
Path to the file
parameters
Parameters of the algorithm
Returns
-------------
bpmn_graph
BPMN graph
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, None
)
from lxml import etree, objectify
parser = etree.XMLParser(remove_comments=True, encoding=encoding)
F = open(path, "rb")
xml_tree = objectify.parse(F, parser=parser)
F.close()
return import_xml_tree_from_root(xml_tree.getroot())
[docs]
def import_from_string(bpmn_string, parameters=None):
"""
Imports a BPMN diagram from a string
Parameters
-------------
path
Path to the file
parameters
Parameters of the algorithm
Returns
-------------
bpmn_graph
BPMN graph
"""
if parameters is None:
parameters = {}
encoding = exec_utils.get_param_value(
Parameters.ENCODING, parameters, constants.DEFAULT_ENCODING
)
if type(bpmn_string) is str:
bpmn_string = bpmn_string.encode(encoding)
from lxml import etree, objectify
parser = etree.XMLParser(remove_comments=True)
root = objectify.fromstring(bpmn_string, parser=parser)
return import_xml_tree_from_root(root)