Source code for pm4py.algo.connectors.variants.windows_events
'''
PM4Py – A Process Mining Library for Python
Copyright (C) 2024 Process Intelligence Solutions UG (haftungsbeschränkt)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see this software project's root or
visit <https://www.gnu.org/licenses/>.
Website: https://processintelligence.solutions
Contact: info@processintelligence.solutions
'''
from typing import Optional, Dict, Any
import pandas as pd
from datetime import datetime
from pm4py.util import pandas_utils
import importlib.util
[docs]
def apply(parameters: Optional[Dict[str, Any]] = None) -> pd.DataFrame:
"""
Extract a process mining dataframe from all the events recorded in the Windows registry.
CASE ID (case:concept:name) => name of the computer emitting the events.
ACTIVITY (concept:name) => concatenation of the source name of the event and the event identifier
(see https://learn.microsoft.com/en-us/previous-versions/windows/desktop/eventlogprov/win32-ntlogevent)
TIMESTAMP (time:timestamp) => timestamp of generation of the event
RESOURCE (org:resource) => username involved in the event
Returns
----------------
dataframe
Pandas dataframe
"""
if parameters is None:
parameters = {}
import win32com.client
print(
":: executing SQL query against the Windows registry. this can take time."
)
strComputer = "."
objWMIService = win32com.client.Dispatch("WbemScripting.SWbemLocator")
objSWbemServices = objWMIService.ConnectServer(strComputer, "root\\cimv2")
colItems = objSWbemServices.ExecQuery("Select * from Win32_NTLogEvent")
events = []
progress = None
if importlib.util.find_spec("tqdm"):
from tqdm.auto import tqdm
progress = tqdm(
total=len(colItems), desc="extracting Windows events, progress :: "
)
for objItem in colItems:
events.append(
{
"category": str(objItem.Properties_("Category")),
"categoryString": str(objItem.Properties_("CategoryString")),
"computerName": str(objItem.Properties_("ComputerName")),
"eventCode": str(objItem.Properties_("EventCode")),
"eventIdentifier": str(objItem.Properties_("EventIdentifier")),
"eventType": str(objItem.Properties_("EventType")),
"logFile": str(objItem.Properties_("LogFile")),
"message": str(objItem.Properties_("Message")),
"recordNumber": str(objItem.Properties_("RecordNumber")),
"sourceName": str(objItem.Properties_("SourceName")),
"timeGenerated": datetime.strptime(
str(str(objItem.Properties_("TimeGenerated")))
.split("+")[0]
.split("-")[0],
"%Y%m%d%H%M%S.%f",
),
"timeWritten": datetime.strptime(
str(str(objItem.Properties_("TimeWritten")))
.split("+")[0]
.split("-")[0],
"%Y%m%d%H%M%S.%f",
),
"type": str(str(objItem.Properties_("Type"))),
"user": str(str(objItem.Properties_("User"))),
}
)
if progress is not None:
progress.update()
if progress is not None:
progress.close()
dataframe = pandas_utils.instantiate_dataframe(events)
dataframe["case:concept:name"] = dataframe["computerName"]
dataframe["time:timestamp"] = dataframe["timeGenerated"]
dataframe["concept:name"] = (
dataframe["sourceName"] + " " + dataframe["eventIdentifier"]
)
dataframe["org:resource"] = dataframe["user"]
dataframe = pandas_utils.insert_index(
dataframe, "@@index", copy_dataframe=False, reset_index=False
)
dataframe = dataframe.sort_values(["time:timestamp", "@@index"])
dataframe["@@case_index"] = dataframe.groupby(
"case:concept:name", sort=False
).ngroup()
dataframe = dataframe.sort_values(
["@@case_index", "time:timestamp", "@@index"]
)
return dataframe