'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from collections import Counter
import numpy as np
from pm4py.util import exec_utils
from enum import Enum
from pm4py.util import constants
from pm4py.objects.org.roles.obj import Role
from typing import List
[docs]
class Parameters(Enum):
ROLES_THRESHOLD_PARAMETER = "roles_threshold_parameter"
RESOURCE_KEY = constants.PARAMETER_CONSTANT_RESOURCE_KEY
ACTIVITY_KEY = constants.PARAMETER_CONSTANT_ACTIVITY_KEY
[docs]
def get_sum_from_dictio_values(dictio, parameters=None):
"""
Get the sum of a dictionary values
Parameters
-------------
dictio
Dictionary
parameters
Parameters of the algorithm
Returns
--------------
sum_values
Sum of the dictionary values
"""
return np.sum(list(dictio.values()))
[docs]
def normalize_role(role, parameters=None):
"""
Normalize a role
Parameters
--------------
role
Originators of the role
parameters
Parameters of the algorithm
Returns
--------------
normalized_role
Normalized multiset of originators
"""
sum_role = get_sum_from_dictio_values(role)
new_role = {}
for res in role:
new_role[res] = role[res] / float(sum_role)
return new_role
[docs]
def find_multiset_intersection(role1, role2, normalize=False, parameters=None):
"""
Finds the intersection of a multiset
Parameters
-------------
role1
First role originators
role2
Second role originators
normalize
Do the normalization of the roles
parameters
Parameters of the algorithm
Returns
--------------
intersection
Intersection of the multiset
"""
intersection = {}
if normalize:
role1 = normalize_role(role1, parameters=parameters)
role2 = normalize_role(role2, parameters=parameters)
for res in role1:
if res in role2:
intersection[res] = min(role1[res], role2[res])
return intersection
[docs]
def find_multiset_union(role1, role2, normalize=False, parameters=None):
"""
Finds the union of a multiset
Parameters
-------------
role1
First role originators
role2
Second role originators
normalize
Do the normalization of the roles
parameters
Parameters of the algorithm
Returns
--------------
union
Union of the multiset
"""
union = {}
if normalize:
role1 = normalize_role(role1, parameters=parameters)
role2 = normalize_role(role2, parameters=parameters)
for res in role1:
if res in role2:
union[res] = max(role1[res], role2[res])
else:
union[res] = role1[res]
for res in role2:
if res not in role1:
union[res] = role2[res]
return union
[docs]
def find_role_similarity(roles, i, j, parameters=None):
"""
Calculate a number of similarity between different roles
Parameters
-------------
roles
List of roles
i
Index of the first role
j
Index of the second role
parameters
Parameters of the algorithm
Returns
--------------
similarity
Similarity measure
"""
num = get_sum_from_dictio_values(
find_multiset_intersection(
roles[i][1], roles[j][1], normalize=True, parameters=parameters
),
parameters=parameters,
)
den = get_sum_from_dictio_values(
find_multiset_union(
roles[i][1], roles[j][1], normalize=True, parameters=parameters
),
parameters=parameters,
)
return num / den
[docs]
def aggregate_roles_iteration(roles, parameters=None):
"""
Single iteration of the roles aggregation algorithm
Parameters
--------------
roles
Roles
parameters
Parameters of the algorithm
Returns
--------------
agg_roles
(Partially aggregated) roles
"""
threshold = exec_utils.get_param_value(
Parameters.ROLES_THRESHOLD_PARAMETER, parameters, 0.65
)
sim = []
for i in range(len(roles)):
for j in range(i + 1, len(roles)):
sim.append(
(
i,
j,
roles[i][0],
roles[j][0],
-find_role_similarity(roles, i, j, parameters=parameters),
)
)
sim = sorted(
sim,
key=lambda x: (
x[-1],
constants.DEFAULT_VARIANT_SEP.join(x[-3]),
constants.DEFAULT_VARIANT_SEP.join(x[-2]),
),
)
found_feasible = False
if sim:
if -sim[0][-1] > threshold:
set_act1 = roles[sim[0][0]][0]
set_act2 = roles[sim[0][1]][0]
set_res1 = roles[sim[0][0]][1]
set_res2 = roles[sim[0][1]][1]
total_set_act = sorted(list(set(set_act1).union(set(set_act2))))
total_set_res = Counter(set_res1 + set_res2)
del roles[sim[0][0]]
del roles[sim[0][1] - 1]
roles.append([total_set_act, total_set_res])
roles = sorted(
roles, key=lambda x: constants.DEFAULT_VARIANT_SEP.join(x[0])
)
found_feasible = True
return roles, found_feasible
[docs]
def aggregate_roles_algorithm(roles, parameters=None):
"""
Algorithm to aggregate similar roles
Parameters
--------------
roles
Roles
parameters
Parameters of the algorithm
Returns
--------------
agg_roles
(Aggregated) roles
"""
found_feasible = True
while found_feasible:
roles, found_feasible = aggregate_roles_iteration(
roles, parameters=parameters
)
return roles
[docs]
def get_initial_roles(res_act_couples, parameters=None):
"""
Get the initial list of roles (each activity is a stand-alone role)
Parameters
-------------
res_act_couples
(resource, activity) couples along with the number of occurrences
parameters
Parameters of the algorithm
Returns
-------------
roles
List of roles (set of activities + multiset of resources)
"""
if parameters is None:
parameters = {}
roles0 = {}
for ra_couple in res_act_couples.keys():
res = ra_couple[0]
act = ra_couple[1]
if act not in roles0:
roles0[act] = Counter()
if res not in roles0[act]:
roles0[act][res] = res_act_couples[ra_couple]
roles = []
for act in roles0:
roles.append([[act], roles0[act]])
roles = sorted(
roles,
key=lambda x: (
len(x[0]),
len(x[1]),
constants.DEFAULT_VARIANT_SEP.join(sorted(x[0])),
),
reverse=True,
)
roles = aggregate_roles_algorithm(roles, parameters=parameters)
roles = sorted(
roles,
key=lambda x: (
len(x[0]),
len(x[1]),
constants.DEFAULT_VARIANT_SEP.join(sorted(x[0])),
),
reverse=True,
)
return roles
[docs]
def apply(res_act_couples, parameters=None) -> List[Role]:
"""
Apply the roles detection, introduced by
Burattin, Andrea, Alessandro Sperduti, and Marco Veluscek. "Business models enhancement through discovery of roles." 2013 IEEE Symposium on Computational Intelligence and Data Mining (CIDM). IEEE, 2013.
Parameters
-------------
res_act_couples
(resource, activity) couples along with the number of occurrences
parameters
Parameters of the algorithm
Returns
-------------
roles
List of roles (set of activities + multiset of resources)
"""
if parameters is None:
parameters = {}
roles = get_initial_roles(res_act_couples, parameters=parameters)
final_roles = []
for r in roles:
dictio = {x: int(y) for x, y in r[1].items()}
final_roles.append(Role(r[0], dictio))
return final_roles