-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoper.py
More file actions
225 lines (176 loc) · 7.41 KB
/
oper.py
File metadata and controls
225 lines (176 loc) · 7.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import struct
from datetime import datetime, timedelta
import re
import codecs
import pyesedb
import pandas as pd
def string_to_ole_timestamp(dt):
"""Converts a string datetime-local from html input to a binary OLE timestamp and returns it's integer form."""
# OLE timestamp base date: December 30, 1899
ole_base_date = datetime(1899, 12, 30)
dt=datetime.strptime(dt, "%Y-%m-%dT%H:%M")
# Calculate the difference in days and seconds
delta = dt - ole_base_date
ole_time = delta.days + (delta.seconds + delta.microseconds / 1_000_000) / 86400 # seconds in a day
# Pack the double-precision float value as a little-endian binary
return int.from_bytes(struct.pack("<d", ole_time),byteorder='little')
# Usage example
# # 2024-09-12T18:58
# dt = datetime(2024, 9, 10, 14, 15, 0)
# binblob = "2024-09-12T18:58"
# binblob=string_to_ole_timestamp(binblob)
# print(binblob)
def blob_to_string(chrblob):
"""Takes in a binary blob hex characters and does its best to convert it to a readable string.
Works great for UTF-16 LE, UTF-16 BE, ASCII like data. Otherwise return it as hex.
"""
try:
chrblob = codecs.decode(chrblob,"hex")
except:
pass
try:
if re.match(b'^(?:[^\x00]\x00)+\x00\x00$', chrblob):
binblob = chrblob.decode("utf-16-le").strip("\x00")
elif re.match(b'^(?:\x00[^\x00])+\x00\x00$', chrblob):
binblob = chrblob.decode("utf-16-be").strip("\x00")
else:
binblob = chrblob.decode("latin1").strip("\x00")
except Exception as e:
binblob = "" if not chrblob else codecs.decode(chrblob,"latin-1")
return binblob
def summarize_app_usage_from_data(stats_data, mapping_data, start, end):
# Convert input lists of dicts to DataFrames
df_stats = pd.DataFrame(stats_data)
df_map = pd.DataFrame(mapping_data)
# Filter by TimeStamp range
df_stats_filtered = df_stats[(df_stats['TimeStamp'] >= start) & (df_stats['TimeStamp'] <= end)]
# Merge stats with mapping on AppId and IdIndex
merged_df = pd.merge(df_stats_filtered, df_map, left_on='AppId', right_on='IdIndex', how='inner')
# Group by AppId and IdBlob, and aggregate
summary_df = merged_df.groupby(['AppId', 'IdBlob']).agg(
BytesSent_sum=pd.NamedAgg(column='BytesSent', aggfunc='sum'),
BytesRecvd_sum=pd.NamedAgg(column='BytesRecvd', aggfunc='sum'),
Total_sum=pd.NamedAgg(column='Total', aggfunc='sum')
).reset_index()
# Sort by total descending
summary_df = summary_df.sort_values(by='Total_sum', ascending=False)
# Convert to list of dicts for rendering
summary_data = summary_df.to_dict(orient='records')
# Return total and summary rows
return summary_df['Total_sum'].sum(), summary_data
def genpoints(start_date,end_date,duration):
#divide the time period into points of 'duration' days
points=[]
current_date=start_date
points.append(current_date)
current_date+=timedelta(days=duration)
current_date=datetime.strptime(datetime.strftime(current_date,"%Y-%m-%d")+"T00:00",'%Y-%m-%dT%H:%M')
while current_date<end_date:
points.append(current_date)
current_date+=timedelta(days=duration)
if current_date>end_date:
break
if points[-1]<end_date:
points.append(end_date)
points[0]=string_to_ole_timestamp(datetime.strftime(points[0],"%Y-%m-%dT%H:%M"))
points[-1]=string_to_ole_timestamp(datetime.strftime(points[-1],"%Y-%m-%dT%H:%M"))
for i in range(1,len(points)-1):
points[i]=string_to_ole_timestamp(datetime.strftime(points[i],"%Y-%m-%d")+"T00:00")
return points
# start_date=datetime.strptime('2024-01-12T18:58','%Y-%m-%dT%H:%M')
# end_date=datetime.strptime('2024-09-30T18:59','%Y-%m-%dT%H:%M')
# duration=10
# points=genpoints(start_date,end_date,duration)
# for i in range(0,len(points)):
# print(points[i])
def genGraph(start_date, end_date, duration, app_id, stats_data):
# Convert input date strings to datetime
start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
end_date = datetime.strptime(end_date, '%Y-%m-%dT%H:%M')
# Generate list of timestamp points
timestamps = genpoints(start_date, end_date, duration)
# Convert stats_data (list of dicts) to DataFrame
df = pd.DataFrame(stats_data)
# Ensure TimeStamp column is numeric
df['TimeStamp'] = pd.to_numeric(df['TimeStamp'], errors='coerce')
# Filter rows where AppId matches
filtered_df = df[df['AppId'] == app_id]
# Calculate Total sums for each time range
sums_between_timestamps = []
for i in range(len(timestamps) - 1):
range_data = filtered_df[
(filtered_df['TimeStamp'] >= timestamps[i]) &
(filtered_df['TimeStamp'] < timestamps[i + 1])
]
total_sum = range_data['Total'].sum()
sums_between_timestamps.append(int(total_sum))
# Convert timestamps to human-readable time points (excluding the first)
time = [int_to_timestamp(timestamps[i]) for i in range(1, len(timestamps))]
return time, sums_between_timestamps
def int_to_timestamp(ole_int):
"""Converts an OLE integer timestamp back to a string datetime-local format."""
# OLE timestamp base date: December 30, 1899
ole_base_date = datetime(1899, 12, 30)
# Unpack the little-endian binary double-precision float
ole_time = struct.unpack("<d", ole_int.to_bytes(8, byteorder='little'))[0]
# Convert OLE time to days and seconds
delta = timedelta(days=ole_time)
dt = ole_base_date + delta
# Return the datetime in string format as "YYYY-MM-DDTHH:MM"
return dt.strftime("%Y-%m-%d") #"%Y-%m-%dT%H:%M" if time needed
# Example usage:
# ole_timestamp = 4676488072672575488
# print(int_to_timestamp(ole_timestamp))
def Sruconvert_table_to_data(ese_file, table_name):
# Open the ESE database
ese = pyesedb.file()
ese.open(ese_file)
# Find the target table
table = None
for t in ese.tables:
if t.name == table_name:
table = t
break
if table is None:
ese.close()
return None # Or return [], depending on preference
# Extract column headers
headers = [col.name for col in table.columns]
data = []
# Extract each record
for record in table.records:
row = {}
for column_index, column_name in enumerate(headers):
if column_index == 2:
value = blob_to_string(record.get_value_data(column_index))
else:
value = record.get_value_data_as_integer(column_index)
row[column_name] = value
data.append(row)
ese.close()
return data
def convert_table_to_data(ese_file, table_name):
ese = pyesedb.file()
ese.open(ese_file)
table = next((t for t in ese.tables if t.name == table_name), None)
if not table:
ese.close()
return []
headers = ['TimeStamp', 'AppId', 'BytesSent', 'BytesRecvd', 'Total']
data = []
for record in table.records:
timestamp = record.get_value_data_as_integer(1)
app_id = record.get_value_data_as_integer(2)
bytes_sent = record.get_value_data_as_integer(7)
bytes_recvd = record.get_value_data_as_integer(8)
total = bytes_sent + bytes_recvd
row = {
'TimeStamp': timestamp,
'AppId': app_id,
'BytesSent': bytes_sent,
'BytesRecvd': bytes_recvd,
'Total': total
}
data.append(row)
ese.close()
return data