-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmainProgram.py
More file actions
130 lines (106 loc) · 4.52 KB
/
mainProgram.py
File metadata and controls
130 lines (106 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from pcars2.server import PCarsStreamReceiver
from pcars2.extraModules import removeObjects, LiveDisplay
import datetime
from time import time
import json
class LatestData(object):
def __init__(self):
self.all = {i:[] for i in range(10)}
self.partialPacket = {}
self._TelemetryData = None
self._RaceData = None
self._ParticipantsData = None
self._TimeStatsData = None
self._GameStateData = None
self._TimingsData = None
self._VehicleClassNamesData = None
def handlePacket(self, packetData):
self.lastestPacketID = packetData.s_header.mPacketType
Toggle = True
if packetData.s_header.mPartialPacketNumber > 1 and Toggle:
self.handlePartialPacket(packetData)
else:
self.all[self.lastestPacketID].append(packetData)
match self.lastestPacketID:
case 0:
self._TelemetryData = packetData
case 1:
self._RaceData = packetData
case 2:
self._ParticipantsData = packetData
case 3:
self._TimeStatsData = packetData
case 4:
self._GameStateData = packetData
case 7:
self._TimingsData = packetData
case 8:
self._VehicleClassNamesData = packetData
case _:
print("no packet match found")
def handlePartialPacket(self, packetData):
key = (self.lastestPacketID, packetData.s_header.mCategoryPacketNumber)
if key not in self.partialPacket:
self.partialPacket[key] = {}
self.partialPacket[key][packetData.s_header.mPartialPacketIndex] = packetData
if len(self.partialPacket[key]) == packetData.s_header.mPartialPacketNumber:
self.all[self.lastestPacketID].append(self._assemblePacket(self.partialPacket[key]))
del self.partialPacket[key]
def _assemblePacket(self, partialPacketData):
match self.lastestPacketID:
case 2:
tempList1 = []
for i in partialPacketData.keys():
tempList1 += partialPacketData[i].sIndex
tempList2 = []
for i in partialPacketData.keys():
tempList2 += partialPacketData[i].sName
tempList3 = []
for i in partialPacketData.keys():
tempList3 += partialPacketData[i].sNationality
class data:
s_header = partialPacketData[1].s_header
sParticipantsChangedTimestamp = partialPacketData[1].sParticipantsChangedTimestamp
sName = tempList2
sNationality = tempList3
sIndex = tempList1
case 8:
class data:
s_header = partialPacketData[1].s_header
if len(partialPacketData) == 2:
sVehicleInfo = partialPacketData[1].sVehicleInfo
sClassInfo = partialPacketData[2].sClassInfo
else:
sVehicleInfo = partialPacketData[1].sVehicleInfo + partialPacketData[2].sVehicleInfo
sClassInfo = partialPacketData[3].sClassInfo
return data
curTime = datetime.datetime.now().strftime("%H:%M:%S")
print(f"[{curTime}] - Started...")
listener = LatestData()
# creating new thread
stream = PCarsStreamReceiver()
stream.addListener(listener)
# displayGraph = LiveDisplay(listener)
# storing all threads
activeThreads = [stream]
print("Starting server telemetry...")
# starting all threads
for thread in activeThreads:
thread.start()
# wait for user input to stop the program
while True:
userInput = input("[Stop] to stop the progam and export data to json:\n")
if userInput.lower() == "stop":
break
curTime = datetime.datetime.now().strftime("%H:%M:%S")
print(f"[{curTime}] - Stopping server telemetry...")
# stopping all active threads
for thread in activeThreads:
thread.stop()
print("Converting data...")
#with open('data.json', 'w', encoding='utf-8') as f:
with open(f'{time()}.json', 'w', encoding='utf-8') as f:
cleanDataFormat = removeObjects(listener.all)
print("Saving data...")
json.dump(cleanDataFormat, f, ensure_ascii=False, indent=4)
print("All complete")