-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathstatsd_server.py
More file actions
executable file
·114 lines (99 loc) · 4.38 KB
/
statsd_server.py
File metadata and controls
executable file
·114 lines (99 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
import socket
import csv
import datetime
import time
import threading
import sys
import re
from collections import defaultdict
class MetricsServer:
def __init__(self, host='0.0.0.0', port=8125, csv_file='metrics.csv'):
self.host = host
self.port = port
self.csv_file = csv_file
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.host, self.port))
self.lock = threading.Lock()
self.last_values = {}
def parse_metric(self, data):
try:
parts = data.strip().split('|')
name_value = parts[0].split(':')
if len(name_value) != 2:
return None
name, value = name_value[0], int(name_value[1])
metric_type = parts[1] if len(parts) > 1 else 'g'
pid = None
if len(parts) > 2:
for part in parts[2:]:
if part.startswith('#'):
tag_pairs = part[1:].split(',')
for tag_pair in tag_pairs:
if ':' in tag_pair:
key, val = tag_pair.split(':', 1)
if key == 'pid':
pid = val
break
return {'name': name, 'value': value, 'type': metric_type, 'pid': pid, 'clock_us': time.monotonic_ns() // 1000}
except:
return None
def write_metric(self, metric):
with self.lock:
metric_key = f"{metric['name']}_{metric['pid']}" if metric['pid'] else metric['name']
self.last_values[metric_key] = {'value': metric['value'], 'pid': metric['pid']}
with open(self.csv_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
metric['clock_us'],
metric['name'],
metric['value'],
metric['type'],
metric['pid']
])
def log_status(self):
while True:
time.sleep(10)
with self.lock:
if self.last_values:
pid_metrics = defaultdict(list)
for metric_key, metric_data in self.last_values.items():
if '_' in metric_key and metric_data['pid']:
base_name = metric_key.rsplit('_', 1)[0]
pid = metric_data['pid']
pid_metrics[pid].append((base_name, metric_data['value']))
else:
pid_metrics['N/A'].append((metric_key, metric_data['value'] if isinstance(metric_data, dict) else metric_data))
print(f"\n[{datetime.datetime.now().strftime('%H:%M:%S')}]", file=sys.stderr)
print(f"{'PID':<5} {'Metric':<30} {'Value':<12}", file=sys.stderr)
print("-" * 50, file=sys.stderr)
for pid in sorted(pid_metrics.keys()):
metrics_list = pid_metrics[pid]
first_metric, first_value = metrics_list[0]
print(f"{pid:<5} {first_metric:<30} {first_value:<12}", file=sys.stderr)
for metric_name, value in metrics_list[1:]:
print(f"{'':>5} {metric_name:<30} {value:<12}", file=sys.stderr)
if pid != sorted(pid_metrics.keys())[-1]:
print("", file=sys.stderr)
def run(self):
print(f"Metrics server listening on {self.host}:{self.port}")
print(f"Writing to {self.csv_file}")
with open(self.csv_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'name', 'value', 'type', 'pid'])
status_thread = threading.Thread(target=self.log_status, daemon=True)
status_thread.start()
while True:
try:
data, addr = self.sock.recvfrom(1024)
metric = self.parse_metric(data.decode('iso-8859-1'))
if metric:
self.write_metric(metric)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Error: {e}")
self.sock.close()
if __name__ == '__main__':
server = MetricsServer()
server.run()