From 476924acdad5183dba7bce0e1701d054144d6da2 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Mon, 24 Nov 2014 15:00:11 -0200 Subject: [PATCH 1/7] added support for custom backends --- pystatsd/backends/__init__.py | 3 + pystatsd/backends/console.py | 19 ++++ pystatsd/backends/ganglia.py | 44 +++++++++ pystatsd/backends/graphite.py | 57 ++++++++++++ pystatsd/server.py | 165 ++++++---------------------------- statsd_test.py | 3 +- 6 files changed, 153 insertions(+), 138 deletions(-) create mode 100644 pystatsd/backends/__init__.py create mode 100644 pystatsd/backends/console.py create mode 100644 pystatsd/backends/ganglia.py create mode 100644 pystatsd/backends/graphite.py diff --git a/pystatsd/backends/__init__.py b/pystatsd/backends/__init__.py new file mode 100644 index 0000000..1c21a1f --- /dev/null +++ b/pystatsd/backends/__init__.py @@ -0,0 +1,3 @@ +from .ganglia import Ganglia +from .graphite import Graphite +from .console import Console diff --git a/pystatsd/backends/console.py b/pystatsd/backends/console.py new file mode 100644 index 0000000..9787d29 --- /dev/null +++ b/pystatsd/backends/console.py @@ -0,0 +1,19 @@ +import logging + +log = logging.getLogger(__name__) + +class Console(object): + def init(self, options): + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + + def flush(self, timestamp, metrics): + for k, v in metrics['counters'].items(): + print("%s => count=%s" % (k, v)) + + for k, v in metrics['gauges'].items(): + print("%s => value=%s" % (k, v)) + + for k, v in metrics['timers'].items(): + print("%s => lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" + % (k, v['min'], v['mean'], v['max'], v['pct_threshold'], v['max_threshold'], v['count'])) diff --git a/pystatsd/backends/ganglia.py b/pystatsd/backends/ganglia.py new file mode 100644 index 0000000..a01de58 --- /dev/null +++ b/pystatsd/backends/ganglia.py @@ -0,0 +1,44 @@ +import logging + +from .. import gmetric + +log = logging.getLogger(__name__) + +class Ganglia(object): + def init(self, options): + self.ganglia_host = options.get('ganglia_host', 'localhost') + self.ganglia_port = options.get('ganglia_port', 8649) + self.ganglia_protocol = options.get('ganglia_protocol', 'udp') + self.ganglia_spoof_host = options.get('ganglia_spoof_host', 'statsd:statsd') + self.dmax = int(self.flush_interval * 1.2) + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + + def flush(self, timestamp, metrics): + g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) + + for k, v in metrics['counters'].items(): + # We put counters in _counters group. Underscore is to make sure counters show up + # first in the GUI. Change below if you disagree + g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) + + for k, v in metrics['gauges'].items(): + g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) + + for k, v in metrics['timers'].items(): + # We are gonna convert all times into seconds, then let rrdtool + # add proper SI unit. This avoids things like 3521 k ms which + # is 3.521 seconds. What group should these metrics be in. For the + # time being we'll set it to the name of the key + group = k + g.send(k + "_min", min / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.ganglia_spoof_host) + g.send(k + "_mean", mean / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.ganglia_spoof_host) + g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.ganglia_spoof_host) + g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, + group, self.ganglia_spoof_host) + g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, + "double", "seconds", "both", 60, self.dmax, group, + self.ganglia_spoof_host) diff --git a/pystatsd/backends/graphite.py b/pystatsd/backends/graphite.py new file mode 100644 index 0000000..1aca11c --- /dev/null +++ b/pystatsd/backends/graphite.py @@ -0,0 +1,57 @@ +import socket +import logging + + +TIMER_MSG = '''%(prefix)s.%(key)s.lower %(min)s %(ts)s +%(prefix)s.%(key)s.count %(count)s %(ts)s +%(prefix)s.%(key)s.mean %(mean)s %(ts)s +%(prefix)s.%(key)s.upper %(max)s %(ts)s +%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s +''' + +log = logging.getLogger(__name__) + +class Graphite(object): + def init(self, options): + self.graphite_host = options.get('graphite_host', 'localhost') + self.graphite_port = options.get('graphite_port', 2003) + self.counters_prefix = options.get('counters_prefix', 'stats') + self.timers_prefix = options.get('timers_prefix', 'stats.timers') + self.global_prefix = options.get('global_prefix', None) + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + + def flush(self, timestamp, metrics): + stat_string = '' + + for k, v in metrics['counters'].items(): + msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) + stat_string += msg + + for k, v in metrics['gauges'].items(): + msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) + stat_string += msg + + for k, v in metrics['timers'].items(): + v.update({'prefix': self.timers_prefix, 'key': k, 'ts': timestamp}) + stat_string += TIMER_MSG % v + + stat_string += "statsd.numStats %s %d\n" % (stats, timestamp) + self._send_metrics(stat_string) + + def _send_metrics(self, stat_string): + # Prepend stats with Hosted Graphite API key if necessary + if self.global_prefix: + stat_string = '\n'.join([ + '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] + ]) + + graphite = socket.socket() + try: + graphite.connect((self.graphite_host, self.graphite_port)) + graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) + graphite.close() + except socket.error as e: + log.error("Error communicating with Graphite: %s" % e) + if self.debug: + print("Error communicating with Graphite: %s" % e) diff --git a/pystatsd/server.py b/pystatsd/server.py index 40118c8..1cc41af 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -32,65 +32,34 @@ def _clean_key(k): k.replace('/', '-').replace(' ', '_') ) ) - - - -TIMER_MSG = '''%(prefix)s.%(key)s.lower %(min)s %(ts)s -%(prefix)s.%(key)s.count %(count)s %(ts)s -%(prefix)s.%(key)s.mean %(mean)s %(ts)s -%(prefix)s.%(key)s.upper %(max)s %(ts)s -%(prefix)s.%(key)s.upper_%(pct_threshold)s %(max_threshold)s %(ts)s -''' - + class Server(object): def __init__(self, pct_threshold=90, debug=False, transport='graphite', - ganglia_host='localhost', ganglia_port=8649, - ganglia_spoof_host='statsd:statsd', - gmetric_exec='/usr/bin/gmetric', gmetric_options = '-d', - graphite_host='localhost', graphite_port=2003, global_prefix=None, - flush_interval=10000, - no_aggregate_counters=False, counters_prefix='stats', - timers_prefix='stats.timers', expire=0): + flush_interval=10000, expire=0, no_aggregate_counters=False, + deleteGauges=False, backends=[], options={}): self.buf = 8192 self.flush_interval = flush_interval self.pct_threshold = pct_threshold self.transport = transport - # Embedded Ganglia library options specific settings - self.ganglia_host = ganglia_host - self.ganglia_port = ganglia_port - self.ganglia_protocol = "udp" - # Use gmetric - self.gmetric_exec = gmetric_exec - self.gmetric_options = gmetric_options - # Set DMAX to flush interval plus 20%. That should avoid metrics to prematurely expire if there is - # some type of a delay when flushing - self.dmax = int(self.flush_interval * 1.2) - # What hostname should these metrics be attached to. - self.ganglia_spoof_host = ganglia_spoof_host - - # Graphite specific settings - self.graphite_host = graphite_host - self.graphite_port = graphite_port + self.no_aggregate_counters = no_aggregate_counters - self.counters_prefix = counters_prefix - self.timers_prefix = timers_prefix self.debug = debug self.expire = expire - # For services like Hosted Graphite, etc. - self.global_prefix = global_prefix - + self.backends = backends + self.deleteGauges = deleteGauges + + options.update({'debug': debug, 'flush_interval': flush_interval}) + for backend in backends: + backend.init(options) + self.counters = {} self.timers = {} self.gauges = {} self.flusher = 0 - def send_to_ganglia_using_gmetric(self,k,v,group, units): - call([self.gmetric_exec, self.gmetric_options, "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v) ]) - - def process(self, data): # the data is a sequence of newline-delimited metrics # a metric is in the form "name:value|rest" (rest may have more pipes) @@ -144,18 +113,15 @@ def on_timer(self): try: self.flush() except Exception as e: + print e log.exception('Error while flushing: %s', e) self._set_timer() def flush(self): ts = int(time.time()) stats = 0 - - if self.transport == 'graphite': - stat_string = '' - elif self.transport == 'ganglia': - g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) - + metrics = {'counters': {}, 'gauges': {}, 'timers': {}} + for k, (v, t) in self.counters.items(): if self.expire > 0 and t + self.expire < ts: if self.debug: @@ -164,46 +130,23 @@ def flush(self): continue v = float(v) v = v if self.no_aggregate_counters else v / (self.flush_interval / 1000) - - if self.debug: - print("Sending %s => count=%s" % (k, v)) - - if self.transport == 'graphite': - msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) - stat_string += msg - elif self.transport == 'ganglia': - # We put counters in _counters group. Underscore is to make sure counters show up - # first in the GUI. Change below if you disagree - g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - self.send_to_ganglia_using_gmetric(k,v, "_counters", "count") + metrics['counters'][k] = v # Clear the counter once the data is sent del(self.counters[k]) stats += 1 - + for k, (v, t) in self.gauges.items(): if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring gauge %s (age: %s)" % (k, ts - t)) del(self.gauges[k]) continue - v = float(v) - - if self.debug: - print("Sending %s => value=%s" % (k, v)) - - if self.transport == 'graphite': - # note: counters and gauges implicitly end up in the same namespace - msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, ts) - stat_string += msg - elif self.transport == 'ganglia': - g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - self.send_to_ganglia_using_gmetric(k,v, "_gauges", "gauge") - + metrics['gauges'][k] = float(v) + if self.deleteGauges: + del(self.gauges[k]) stats += 1 - + for k, (v, t) in self.timers.items(): if self.expire > 0 and t + self.expire < ts: if self.debug: @@ -226,68 +169,16 @@ def flush(self): total = sum(v) mean = total / count + metrics['timers'][k] = { + 'mean': mean, 'max': max, 'min': min, 'count': count, + 'max_threshold': max_threshold, 'pct_threshold': pct_threshold + } del(self.timers[k]) - - if self.debug: - print("Sending %s ====> lower=%s, mean=%s, upper=%s, %dpct=%s, count=%s" \ - % (k, min, mean, max, self.pct_threshold, max_threshold, count)) - - if self.transport == 'graphite': - - stat_string += TIMER_MSG % { - 'prefix': self.timers_prefix, - 'key': k, - 'mean': mean, - 'max': max, - 'min': min, - 'count': count, - 'max_threshold': max_threshold, - 'pct_threshold': self.pct_threshold, - 'ts': ts, - } - - elif self.transport == 'ganglia': - # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like - # 3521 k ms which is 3.521 seconds - # What group should these metrics be in. For the time being we'll set it to the name of the key - group = k - g.send(k + "_min", min / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_mean", mean / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, "double", "seconds", "both", 60, self.dmax, group, self.ganglia_spoof_host) - elif self.transport == 'ganglia-gmetric': - # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like - # 3521 k ms which is 3.521 seconds - group = k - self.send_to_ganglia_using_gmetric(k + "_mean", mean / 1000, group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_min", min / 1000 , group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_max", max / 1000, group, "seconds") - self.send_to_ganglia_using_gmetric(k + "_count", count , group, "count") - self.send_to_ganglia_using_gmetric(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, group, "seconds") - stats += 1 - - if self.transport == 'graphite': - - stat_string += "statsd.numStats %s %d\n" % (stats, ts) - - # Prepend stats with Hosted Graphite API key if necessary - if self.global_prefix: - stat_string = '\n'.join([ - '%s.%s' % (self.global_prefix, s) for s in stat_string.split('\n')[:-1] - ]) - - graphite = socket.socket() - try: - graphite.connect((self.graphite_host, self.graphite_port)) - graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) - graphite.close() - except socket.error as e: - log.error("Error communicating with Graphite: %s" % e) - if self.debug: - print("Error communicating with Graphite: %s" % e) - + + for backend in self.backends: + backend.flush(ts, metrics) + if self.debug: print("\n================== Flush completed. Waiting until next flush. Sent out %d metrics =======" \ % (stats)) diff --git a/statsd_test.py b/statsd_test.py index 12bb37c..d9355e7 100644 --- a/statsd_test.py +++ b/statsd_test.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from pystatsd import Client, Server +from pystatsd.backends import Console sc = Client('localhost', 8125) @@ -9,5 +10,5 @@ sc.decrement('python_test.decr_int') sc.gauge('python_test.gauge', 42) -srvr = Server(debug=True) +srvr = Server(debug=True, flush_interval=2000, deleteGauges=True, backends=[Console()]) srvr.serve() From f8b41720caecd40ac2faaeefb8f8dd6b6ffb5d26 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Mon, 1 Dec 2014 11:12:20 -0200 Subject: [PATCH 2/7] removed print statement --- pystatsd/server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pystatsd/server.py b/pystatsd/server.py index 1cc41af..21c5e86 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -113,7 +113,6 @@ def on_timer(self): try: self.flush() except Exception as e: - print e log.exception('Error while flushing: %s', e) self._set_timer() From 4a746169577bdc3183e62de5a0d1ac0d89a6b016 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Sun, 28 Dec 2014 22:23:18 -0200 Subject: [PATCH 3/7] Included tests for the server --- pystatsd/server.py | 13 ++--- tests/server.py | 131 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 130 insertions(+), 14 deletions(-) diff --git a/pystatsd/server.py b/pystatsd/server.py index 21c5e86..86f000e 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -36,13 +36,12 @@ def _clean_key(k): class Server(object): - def __init__(self, pct_threshold=90, debug=False, transport='graphite', - flush_interval=10000, expire=0, no_aggregate_counters=False, - deleteGauges=False, backends=[], options={}): + def __init__(self, pct_threshold=90, debug=False, flush_interval=10000, + expire=0, no_aggregate_counters=False, deleteGauges=False, + backends=[], options={}): self.buf = 8192 self.flush_interval = flush_interval self.pct_threshold = pct_threshold - self.transport = transport self.no_aggregate_counters = no_aggregate_counters self.debug = debug @@ -52,6 +51,8 @@ def __init__(self, pct_threshold=90, debug=False, transport='graphite', self.deleteGauges = deleteGauges options.update({'debug': debug, 'flush_interval': flush_interval}) + + # initialize each backend for backend in backends: backend.init(options) @@ -76,7 +77,7 @@ def process(self, data): value = match.group(2) rest = match.group(3).split('|') mtype = rest.pop(0) - + if (mtype == 'ms'): self.__record_timer(key, value, rest) elif (mtype == 'g' ): self.__record_gauge(key, value, rest) elif (mtype == 'c' ): self.__record_counter(key, value, rest) @@ -170,7 +171,7 @@ def flush(self): metrics['timers'][k] = { 'mean': mean, 'max': max, 'min': min, 'count': count, - 'max_threshold': max_threshold, 'pct_threshold': pct_threshold + 'max_threshold': max_threshold, 'pct_threshold': self.pct_threshold } del(self.timers[k]) stats += 1 diff --git a/tests/server.py b/tests/server.py index 566f3e0..efc18a5 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,25 +1,140 @@ import unittest import mock +import socket +import threading -# from pystatsd.statsd import Client from pystatsd.server import Server +from pystatsd.backends import Console +from mock import Mock, ANY +from time import sleep class ServerBasicsTestCase(unittest.TestCase): """ - Tests the basic operations of the client + Tests the basic operations of the server """ def setUp(self): - self.patchers = [] - - socket_patcher = mock.patch('pystatsd.statsd.socket.socket') - self.mock_socket = socket_patcher.start() - self.patchers.append(socket_patcher) + self.addr = (socket.gethostbyname(''), 8125) + self.backend = self.__create_backend() def test_server_create(self): - server = Server() + """Create a new server and checks if initialization is correct""" + server = Server(backends=[self.backend]) if getattr(self, "assertIsNotNone", False): self.assertIsNotNone(server) else: assert server is not None + + self.assertEquals(server.expire, 0) + self.assertEquals(server.buf, 8192) + self.assertEquals(len(server.backends), 1) + self.assertEquals(len(server.counters), 0) + self.assertEquals(len(server.timers), 0) + self.assertEquals(len(server.gauges), 0) + + self.backend.init.assert_called_with({ + 'debug': False, + 'flush_interval': 10000 + }) + + def test_server_process(self): + """ + Checks if the server is properly processing the different types of metrics. + """ + server = Server() + server.process('gorets:1|c\nglork:320|ms\ngaugor:333|g') + + self.assertEquals(len(server.counters), 1) + self.assertEquals(server.counters.get('gorets')[0], 1.0) + self.assertEquals(len(server.timers), 1) + self.assertEquals(server.timers.get('glork')[0], [320.0]) + self.assertEquals(len(server.gauges), 1) + self.assertEquals(server.gauges.get('gaugor')[0], 333.0) + + server.process('gorets:1|c|@0.1') + self.assertEquals(len(server.counters), 1) + self.assertEquals(server.counters.get('gorets')[0], 11.0) + + def test_server_flush(self): + """ + Checks if the backend is receiving the processed metrics properly after + a flush. + """ + server = Server(backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + + server.flush() + self.backend.flush.assert_called_with(ANY, { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + }) + + server.flush() + self.backend.flush.assert_called_with(ANY, { + 'timers': {}, 'gauges': {'gaugor': 333.0}, 'counters': {}}) + + self.assertEquals(len(server.gauges), 1) + self.assertEquals(len(server.timers), 0) + self.assertEquals(len(server.counters), 0) + + def test_server_flush_del_gauges(self): + """ + Checks if all metrics are removed after a flush with a server set to delete + gauges. + """ + server = Server(deleteGauges=True, backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + server.flush() + + self.backend.flush.assert_called_with(ANY, { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + }) + + self.assertEquals(len(server.gauges), 0) + self.assertEquals(len(server.timers), 0) + self.assertEquals(len(server.counters), 0) + + def test_server_flush_backends(self): + """ + Test a server with multiple backends and ensures that all of them are + called after a flush. + """ + backend_a = self.__create_backend() + backend_b = self.__create_backend() + + server = Server(backends=[backend_a, backend_b]) + server.flush() + + metrics = {'timers': {}, 'gauges': {}, 'counters': {}} + backend_a.flush.assert_called_with(ANY, metrics) + backend_b.flush.assert_called_with(ANY, metrics) + + def test_server_flush_expire(self): + """ + Test if metrics are removed by forcing them to expire. + """ + server = Server(expire=1, backends=[self.backend]) + server.process('gorets:1|c\ngorets:1|c|@0.1\nglork:320|ms\ngaugor:333|g') + + sleep(2) + server.flush() + self.assertEquals(len(server.gauges), 0) + self.assertEquals(len(server.timers), 0) + self.assertEquals(len(server.counters), 0) + + + def __create_backend(self): + backend = Console() + backend.init = Mock() + backend.flush = Mock() + return backend From d4ad68a5043338648f92e589c1ee3fa31ad35624 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Mon, 29 Dec 2014 09:19:45 -0200 Subject: [PATCH 4/7] Adapted server code for Python3 --- pystatsd/server.py | 12 +++++++++--- tests/server.py | 46 +++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/pystatsd/server.py b/pystatsd/server.py index 86f000e..4534fda 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -122,7 +122,9 @@ def flush(self): stats = 0 metrics = {'counters': {}, 'gauges': {}, 'timers': {}} - for k, (v, t) in self.counters.items(): + #for k, (v, t) in self.counters.items(): + for k in list(self.counters.keys()): + v, t = self.counters[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring counter %s (age: %s)" % (k, ts -t)) @@ -136,7 +138,9 @@ def flush(self): del(self.counters[k]) stats += 1 - for k, (v, t) in self.gauges.items(): + #for k, (v, t) in self.gauges.items(): + for k in list(self.gauges.keys()): + v, t = self.gauges[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring gauge %s (age: %s)" % (k, ts - t)) @@ -147,7 +151,9 @@ def flush(self): del(self.gauges[k]) stats += 1 - for k, (v, t) in self.timers.items(): + #for k, (v, t) in self.timers.items(): + for k in list(self.timers.keys()): + v, t = self.timers[k] if self.expire > 0 and t + self.expire < ts: if self.debug: print("Expiring timer %s (age: %s)" % (k, ts - t)) diff --git a/tests/server.py b/tests/server.py index efc18a5..7b4f508 100644 --- a/tests/server.py +++ b/tests/server.py @@ -26,12 +26,12 @@ def test_server_create(self): else: assert server is not None - self.assertEquals(server.expire, 0) - self.assertEquals(server.buf, 8192) - self.assertEquals(len(server.backends), 1) - self.assertEquals(len(server.counters), 0) - self.assertEquals(len(server.timers), 0) - self.assertEquals(len(server.gauges), 0) + self.assertEqual(server.expire, 0) + self.assertEqual(server.buf, 8192) + self.assertEqual(len(server.backends), 1) + self.assertEqual(len(server.counters), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.gauges), 0) self.backend.init.assert_called_with({ 'debug': False, @@ -45,16 +45,16 @@ def test_server_process(self): server = Server() server.process('gorets:1|c\nglork:320|ms\ngaugor:333|g') - self.assertEquals(len(server.counters), 1) - self.assertEquals(server.counters.get('gorets')[0], 1.0) - self.assertEquals(len(server.timers), 1) - self.assertEquals(server.timers.get('glork')[0], [320.0]) - self.assertEquals(len(server.gauges), 1) - self.assertEquals(server.gauges.get('gaugor')[0], 333.0) + self.assertEqual(len(server.counters), 1) + self.assertEqual(server.counters.get('gorets')[0], 1.0) + self.assertEqual(len(server.timers), 1) + self.assertEqual(server.timers.get('glork')[0], [320.0]) + self.assertEqual(len(server.gauges), 1) + self.assertEqual(server.gauges.get('gaugor')[0], 333.0) server.process('gorets:1|c|@0.1') - self.assertEquals(len(server.counters), 1) - self.assertEquals(server.counters.get('gorets')[0], 11.0) + self.assertEqual(len(server.counters), 1) + self.assertEqual(server.counters.get('gorets')[0], 11.0) def test_server_flush(self): """ @@ -78,9 +78,9 @@ def test_server_flush(self): self.backend.flush.assert_called_with(ANY, { 'timers': {}, 'gauges': {'gaugor': 333.0}, 'counters': {}}) - self.assertEquals(len(server.gauges), 1) - self.assertEquals(len(server.timers), 0) - self.assertEquals(len(server.counters), 0) + self.assertEqual(len(server.gauges), 1) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) def test_server_flush_del_gauges(self): """ @@ -100,9 +100,9 @@ def test_server_flush_del_gauges(self): 'counters': {'gorets': 1.1} }) - self.assertEquals(len(server.gauges), 0) - self.assertEquals(len(server.timers), 0) - self.assertEquals(len(server.counters), 0) + self.assertEqual(len(server.gauges), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) def test_server_flush_backends(self): """ @@ -128,9 +128,9 @@ def test_server_flush_expire(self): sleep(2) server.flush() - self.assertEquals(len(server.gauges), 0) - self.assertEquals(len(server.timers), 0) - self.assertEquals(len(server.counters), 0) + self.assertEqual(len(server.gauges), 0) + self.assertEqual(len(server.timers), 0) + self.assertEqual(len(server.counters), 0) def __create_backend(self): From 13f22fdc3866cdd8196af171c022ea8f6aca67e4 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Wed, 31 Dec 2014 13:07:40 -0200 Subject: [PATCH 5/7] Included testes for the backends, corrected some errors in the backends --- pystatsd/backends/console.py | 9 ++-- pystatsd/backends/ganglia.py | 38 ++++++++------- pystatsd/backends/graphite.py | 19 +++++--- pystatsd/server.py | 9 +++- tests/__init__.py | 2 + tests/ganglia_backend.py | 71 +++++++++++++++++++++++++++ tests/graphite_backend.py | 91 +++++++++++++++++++++++++++++++++++ tests/server.py | 5 +- 8 files changed, 211 insertions(+), 33 deletions(-) create mode 100644 tests/ganglia_backend.py create mode 100644 tests/graphite_backend.py diff --git a/pystatsd/backends/console.py b/pystatsd/backends/console.py index 9787d29..72d9dd4 100644 --- a/pystatsd/backends/console.py +++ b/pystatsd/backends/console.py @@ -3,9 +3,12 @@ log = logging.getLogger(__name__) class Console(object): - def init(self, options): - self.debug = options.get('debug') - self.flush_interval = options.get('flush_interval') + def __init__(self, options={}): + pass + + def init(self, cfg): + self.debug = cfg.get('debug') + self.flush_interval = cfg.get('flush_interval') def flush(self, timestamp, metrics): for k, v in metrics['counters'].items(): diff --git a/pystatsd/backends/ganglia.py b/pystatsd/backends/ganglia.py index a01de58..2aef52a 100644 --- a/pystatsd/backends/ganglia.py +++ b/pystatsd/backends/ganglia.py @@ -5,25 +5,27 @@ log = logging.getLogger(__name__) class Ganglia(object): + def __init__(self, options): + self.host = options.get('ganglia_host', 'localhost') + self.port = options.get('ganglia_port', 8649) + self.protocol = options.get('ganglia_protocol', 'udp') + self.spoof_host = options.get('ganglia_spoof_host', 'statsd:statsd') + def init(self, options): - self.ganglia_host = options.get('ganglia_host', 'localhost') - self.ganglia_port = options.get('ganglia_port', 8649) - self.ganglia_protocol = options.get('ganglia_protocol', 'udp') - self.ganglia_spoof_host = options.get('ganglia_spoof_host', 'statsd:statsd') - self.dmax = int(self.flush_interval * 1.2) self.debug = options.get('debug') self.flush_interval = options.get('flush_interval') + self.dmax = int(self.flush_interval * 1.2) def flush(self, timestamp, metrics): - g = gmetric.Gmetric(self.ganglia_host, self.ganglia_port, self.ganglia_protocol) + g = gmetric.Gmetric(self.host, self.port, self.protocol) for k, v in metrics['counters'].items(): # We put counters in _counters group. Underscore is to make sure counters show up # first in the GUI. Change below if you disagree - g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.ganglia_spoof_host) + g.send(k, v, "double", "count", "both", 60, self.dmax, "_counters", self.spoof_host) for k, v in metrics['gauges'].items(): - g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.ganglia_spoof_host) + g.send(k, v, "double", "count", "both", 60, self.dmax, "_gauges", self.spoof_host) for k, v in metrics['timers'].items(): # We are gonna convert all times into seconds, then let rrdtool @@ -31,14 +33,14 @@ def flush(self, timestamp, metrics): # is 3.521 seconds. What group should these metrics be in. For the # time being we'll set it to the name of the key group = k - g.send(k + "_min", min / 1000, "double", "seconds", "both", 60, - self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_mean", mean / 1000, "double", "seconds", "both", 60, - self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_max", max / 1000, "double", "seconds", "both", 60, - self.dmax, group, self.ganglia_spoof_host) - g.send(k + "_count", count, "double", "count", "both", 60, self.dmax, - group, self.ganglia_spoof_host) - g.send(k + "_" + str(self.pct_threshold) + "pct", max_threshold / 1000, + g.send(k + "_min", v['min'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_mean", v['mean'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_max", v['max'] / 1000, "double", "seconds", "both", 60, + self.dmax, group, self.spoof_host) + g.send(k + "_count", v['count'], "double", "count", "both", 60, self.dmax, + group, self.spoof_host) + g.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000, "double", "seconds", "both", 60, self.dmax, group, - self.ganglia_spoof_host) + self.spoof_host) diff --git a/pystatsd/backends/graphite.py b/pystatsd/backends/graphite.py index 1aca11c..7d168cd 100644 --- a/pystatsd/backends/graphite.py +++ b/pystatsd/backends/graphite.py @@ -12,29 +12,35 @@ log = logging.getLogger(__name__) class Graphite(object): - def init(self, options): - self.graphite_host = options.get('graphite_host', 'localhost') - self.graphite_port = options.get('graphite_port', 2003) + def __init__(self, options={}): + self.host = options.get('graphite_host', 'localhost') + self.port = options.get('graphite_port', 2003) self.counters_prefix = options.get('counters_prefix', 'stats') self.timers_prefix = options.get('timers_prefix', 'stats.timers') self.global_prefix = options.get('global_prefix', None) - self.debug = options.get('debug') - self.flush_interval = options.get('flush_interval') + + def init(self, cfg): + self.debug = cfg.get('debug') + self.flush_interval = cfg.get('flush_interval') def flush(self, timestamp, metrics): stat_string = '' + stats = 0 for k, v in metrics['counters'].items(): msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) stat_string += msg + stats += 1 for k, v in metrics['gauges'].items(): msg = '%s.%s %s %s\n' % (self.counters_prefix, k, v, timestamp) stat_string += msg + stats += 1 for k, v in metrics['timers'].items(): v.update({'prefix': self.timers_prefix, 'key': k, 'ts': timestamp}) stat_string += TIMER_MSG % v + stats += 1 stat_string += "statsd.numStats %s %d\n" % (stats, timestamp) self._send_metrics(stat_string) @@ -47,8 +53,9 @@ def _send_metrics(self, stat_string): ]) graphite = socket.socket() + try: - graphite.connect((self.graphite_host, self.graphite_port)) + graphite.connect((self.host, self.port)) graphite.sendall(bytes(bytearray(stat_string, "utf-8"))) graphite.close() except socket.error as e: diff --git a/pystatsd/server.py b/pystatsd/server.py index 4534fda..0a3f57f 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -38,7 +38,7 @@ class Server(object): def __init__(self, pct_threshold=90, debug=False, flush_interval=10000, expire=0, no_aggregate_counters=False, deleteGauges=False, - backends=[], options={}): + backends=[]): self.buf = 8192 self.flush_interval = flush_interval self.pct_threshold = pct_threshold @@ -50,7 +50,12 @@ def __init__(self, pct_threshold=90, debug=False, flush_interval=10000, self.backends = backends self.deleteGauges = deleteGauges - options.update({'debug': debug, 'flush_interval': flush_interval}) + options = { + 'debug': debug, + 'flush_interval': flush_interval, + 'pct_threshold': pct_threshold, + 'expire': expire + } # initialize each backend for backend in backends: diff --git a/tests/__init__.py b/tests/__init__.py index 84ed79a..3fc6c8c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,4 @@ from .client import * from .server import * +from .ganglia_backend import * +from. graphite_backend import * diff --git a/tests/ganglia_backend.py b/tests/ganglia_backend.py new file mode 100644 index 0000000..845070a --- /dev/null +++ b/tests/ganglia_backend.py @@ -0,0 +1,71 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Ganglia + +class GangliaBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Ganglia backend + """ + def setUp(self): + self.patchers = [] + + gmetric_patcher = mock.patch('pystatsd.backends.ganglia.gmetric.Gmetric') + self.mock_gmetric = gmetric_patcher.start() + self.patchers.append(gmetric_patcher) + + self.options = { + 'ganglia_host': 'localhost', + 'ganglia_port': 8649, + 'ganglia_protocol': 'udp', + 'ganglia_spoof_host': 'statsd:statsd' + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.ganglia = Ganglia(self.options) + self.ganglia.init(self.config) + + def test_ganglia_create(self): + self.assertEqual(self.ganglia.host, self.options['ganglia_host']) + self.assertEqual(self.ganglia.port, self.options['ganglia_port']) + self.assertEqual(self.ganglia.protocol, self.options['ganglia_protocol']) + self.assertEqual(self.ganglia.spoof_host, self.options['ganglia_spoof_host']) + self.assertEqual(self.ganglia.dmax, int(self.config['flush_interval']*1.2)) + + def test_ganglia_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.ganglia.flush(ts, metrics) + + self.mock_gmetric.assert_called_with( + self.options['ganglia_host'], self.options['ganglia_port'], self.options['ganglia_protocol']) + + send_fn = self.mock_gmetric.return_value.send + + send_fn.assert_any_call('gorets', 1.1, "double", "count", "both", 60, + self.ganglia.dmax, "_counters", self.ganglia.spoof_host) + send_fn.assert_any_call('gaugor', 333.0, "double", "count", "both", 60, + self.ganglia.dmax, "_gauges", self.ganglia.spoof_host) + + for m in ['min', 'max', 'mean', '90pct']: + send_fn.assert_any_call('glork_'+m, 0.32, 'double', 'seconds', 'both', + 60, self.ganglia.dmax, 'glork', self.ganglia.spoof_host) + + send_fn.assert_any_call('glork_count', 1, 'double', 'count', 'both', + 60, self.ganglia.dmax, 'glork', self.ganglia.spoof_host) + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() diff --git a/tests/graphite_backend.py b/tests/graphite_backend.py new file mode 100644 index 0000000..b2a16da --- /dev/null +++ b/tests/graphite_backend.py @@ -0,0 +1,91 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Graphite + +class GraphiteBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Graphite backend + """ + def setUp(self): + self.patchers = [] + + socket_patcher = mock.patch('pystatsd.backends.graphite.socket.socket') + self.mock_socket = socket_patcher.start() + self.patchers.append(socket_patcher) + + self.options = { + 'graphite_host': 'localhost', + 'graphite_port': 2003, + 'counters_prefix': 'stats', + 'timers_prefix': 'stats.timers', + 'global_prefix': None + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.graphite = Graphite(self.options) + self.graphite.init(self.config) + + def test_graphite_create(self): + self.assertEqual(self.graphite.host, self.options['graphite_host']) + self.assertEqual(self.graphite.port, self.options['graphite_port']) + self.assertEqual(self.graphite.counters_prefix, self.options['counters_prefix']) + self.assertEqual(self.graphite.timers_prefix, self.options['timers_prefix']) + self.assertEqual(self.graphite.global_prefix, self.options['global_prefix']) + + def test_graphite_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.graphite.flush(ts, metrics) + + # check connection information is correct + self.mock_socket.return_value.connect.assert_called_with(( + self.options['graphite_host'], self.options['graphite_port'])) + + # get sendall call argument + sendall = self.mock_socket.return_value.mock_calls[1] + self.assertEqual(sendall[0], 'sendall') + self.assertEqual(len(sendall[1]), 1) + + data = sendall[1][0].decode("utf-8").strip().split("\n") + + # check each metric + for metric in data: + fields = metric.split() + id = fields[0].split('.') + + if id[0] == 'statsd' and id[1] == 'numStats': + self.assertEqual(int(fields[1]), len(metrics)) + else: + self.assertEqual(id[0], self.options['counters_prefix']) + + # counters and gauges + if len(id) == 2: + self.assertEqual(metrics.get('gauges').get(id[1]) + or metrics.get('counters').get(id[1]), float(fields[1])) + # timers + elif len(id) == 4: + self.assertTrue(metrics.get('timers').get(id[2]) != None) + + # check timestamp + self.assertEqual(int(fields[2]), ts) + + # check connection has been closed + self.mock_socket.return_value.close.assert_called_with() + + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() diff --git a/tests/server.py b/tests/server.py index 7b4f508..522c6c4 100644 --- a/tests/server.py +++ b/tests/server.py @@ -33,10 +33,7 @@ def test_server_create(self): self.assertEqual(len(server.timers), 0) self.assertEqual(len(server.gauges), 0) - self.backend.init.assert_called_with({ - 'debug': False, - 'flush_interval': 10000 - }) + self.backend.init.assert_called_with({'debug': False, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90}) def test_server_process(self): """ From ffde141d018030dbb831fcfb27cca33e215daa11 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Sun, 11 Jan 2015 09:43:51 -0200 Subject: [PATCH 6/7] Included tests for gmetric --- pystatsd/backends/__init__.py | 1 + pystatsd/backends/gmetric.py | 35 ++++++++++++++++++++ pystatsd/server.py | 9 ++++-- tests/__init__.py | 1 + tests/gmetric_backend.py | 61 +++++++++++++++++++++++++++++++++++ 5 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 pystatsd/backends/gmetric.py create mode 100644 tests/gmetric_backend.py diff --git a/pystatsd/backends/__init__.py b/pystatsd/backends/__init__.py index 1c21a1f..b311a18 100644 --- a/pystatsd/backends/__init__.py +++ b/pystatsd/backends/__init__.py @@ -1,3 +1,4 @@ from .ganglia import Ganglia +from .gmetric import Gmetric from .graphite import Graphite from .console import Console diff --git a/pystatsd/backends/gmetric.py b/pystatsd/backends/gmetric.py new file mode 100644 index 0000000..3832833 --- /dev/null +++ b/pystatsd/backends/gmetric.py @@ -0,0 +1,35 @@ +import logging + +from subprocess import call +from .ganglia import Ganglia + +log = logging.getLogger(__name__) + +class Gmetric(object): + def __init__(self, options): + self.gmetric_exec = options.get('gmetric_exec', '/usr/bin/gmetric') + self.gmetric_options = options.get('gmetric_options', '-d') + + def init(self, options): + self.debug = options.get('debug') + self.flush_interval = options.get('flush_interval') + + def flush(self, timestamp, metrics): + for k, v in metrics['counters'].items(): + self.send(k, v, "_counters", "count") + + for k, v in metrics['gauges'].items(): + self.send(k, v, "_gauges", "gauge") + + for k, v in metrics['timers'].items(): + # We are gonna convert all times into seconds, then let rrdtool add proper SI unit. This avoids things like + # 3521 k ms which is 3.521 seconds + group = k + self.send(k + "_mean", v['mean'] / 1000, group, "seconds") + self.send(k + "_min", v['min'] / 1000 , group, "seconds") + self.send(k + "_max", v['max'] / 1000, group, "seconds") + self.send(k + "_count", v['count'] , group, "count") + self.send(k + "_" + str(v['pct_threshold']) + "pct", v['max_threshold'] / 1000, group, "seconds") + + def send(self, k, v, group, units): + call([self.gmetric_exec, self.gmetric_options, "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v) ]) diff --git a/pystatsd/server.py b/pystatsd/server.py index 0a3f57f..baad5ae 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -4,7 +4,7 @@ import time import types import logging -from . import gmetric +#from . import gmetric from subprocess import call from warnings import warn # from xdrlib import Packer, Unpacker @@ -257,16 +257,21 @@ def run_server(): parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='') parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125) parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite") + + # Graphite parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003) parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost') + # Uses embedded Ganglia Library parser.add_argument('--ganglia-port', dest='ganglia_port', help='Unicast port to connect to ganglia on', type=int, default=8649) parser.add_argument('--ganglia-host', dest='ganglia_host', help='Unicast host to connect to ganglia on', type=str, default='localhost') parser.add_argument('--ganglia-spoof-host', dest='ganglia_spoof_host', help='host to report metrics as to ganglia', type=str, default='statsd:statsd') + # Use gmetric parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric") parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60") - # + + # Other options parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000) parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true') parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) diff --git a/tests/__init__.py b/tests/__init__.py index 3fc6c8c..9446d13 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,4 +1,5 @@ from .client import * from .server import * from .ganglia_backend import * +from .gmetric_backend import * from. graphite_backend import * diff --git a/tests/gmetric_backend.py b/tests/gmetric_backend.py new file mode 100644 index 0000000..3c04681 --- /dev/null +++ b/tests/gmetric_backend.py @@ -0,0 +1,61 @@ +import time +import unittest +import mock +import socket +import sys + +from pystatsd.backends import Gmetric + +class GmetricBackendTestCase(unittest.TestCase): + """ + Tests the basic operations of the Ganglia backend + """ + def setUp(self): + self.patchers = [] + + gmetric_patcher = mock.patch('pystatsd.backends.gmetric.call') + self.mock_gmetric = gmetric_patcher.start() + self.patchers.append(gmetric_patcher) + + self.options = { + 'gmetric_exec': '/usr/bin/gmetric', + 'gmetric_options': '-d' + } + + self.config = {'debug': True, 'flush_interval': 10000, 'expire': 0, 'pct_threshold': 90} + + self.gmetric = Gmetric(self.options) + self.gmetric.init(self.config) + + def test_ganglia_create(self): + self.assertEqual(self.gmetric.gmetric_exec, self.options['gmetric_exec']) + self.assertEqual(self.gmetric.gmetric_options, self.options['gmetric_options']) + + def test_ganglia_flush(self): + ts = int(time.time()) + metrics = { + 'timers': {'glork': { + 'count': 1, 'max_threshold': 320.0, 'max': 320.0, 'min': 320.0, + 'pct_threshold': 90, 'mean': 320.0} + }, + 'gauges': {'gaugor': 333.0}, + 'counters': {'gorets': 1.1} + } + + self.gmetric.flush(ts, metrics) + + self.mock_gmetric.assert_any_call(self._send_args('gorets', 1.1, "_counters", "count")) + self.mock_gmetric.assert_any_call(self._send_args('gaugor', 333.0, "_gauges", "gauge")) + + for m in ['min', 'max', 'mean', '90pct']: + self.mock_gmetric.assert_any_call(self._send_args('glork_'+m, 0.32, "glork", "seconds")) + + self.mock_gmetric.assert_any_call(self._send_args('glork_count', 1, 'glork', 'count')) + + def _send_args(self, k, v, group, units): + return [self.options['gmetric_exec'], self.options['gmetric_options'], + "-u", units, "-g", group, "-t", "double", "-n", k, "-v", str(v)] + + def tearDown(self): + for patcher in self.patchers: + patcher.stop() From 4cf19b9d649090f2667b97143f67ce6a1b64db53 Mon Sep 17 00:00:00 2001 From: Maycon Bordin Date: Sun, 11 Jan 2015 10:04:24 -0200 Subject: [PATCH 7/7] Adapted the ServerDaemon to use the backends instead --- pystatsd/backends/__init__.py | 12 ++++++++++ pystatsd/backends/console.py | 2 +- pystatsd/server.py | 43 ++++++++++++++++------------------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/pystatsd/backends/__init__.py b/pystatsd/backends/__init__.py index b311a18..b74f83e 100644 --- a/pystatsd/backends/__init__.py +++ b/pystatsd/backends/__init__.py @@ -2,3 +2,15 @@ from .gmetric import Gmetric from .graphite import Graphite from .console import Console + +def create_instance(transport, options): + if transport == 'graphite': + return Graphite(options) + elif transport == 'ganglia': + return Ganglia(options) + elif transport == 'ganglia-gmetric': + return Gmetric(options) + elif transport == 'console': + return Console(options) + else: + return None diff --git a/pystatsd/backends/console.py b/pystatsd/backends/console.py index 72d9dd4..c22201e 100644 --- a/pystatsd/backends/console.py +++ b/pystatsd/backends/console.py @@ -4,7 +4,7 @@ class Console(object): def __init__(self, options={}): - pass + print("Console started") def init(self, cfg): self.debug = cfg.get('debug') diff --git a/pystatsd/server.py b/pystatsd/server.py index baad5ae..70e10f3 100644 --- a/pystatsd/server.py +++ b/pystatsd/server.py @@ -4,10 +4,10 @@ import time import types import logging -#from . import gmetric + from subprocess import call from warnings import warn -# from xdrlib import Packer, Unpacker +from .backends import create_instance log = logging.getLogger(__name__) @@ -229,23 +229,14 @@ class ServerDaemon(Daemon): def run(self, options): if setproctitle: setproctitle('pystatsd') - server = Server(pct_threshold=options.pct, - debug=options.debug, - transport=options.transport, - graphite_host=options.graphite_host, - graphite_port=options.graphite_port, - global_prefix=options.global_prefix, - ganglia_host=options.ganglia_host, - ganglia_spoof_host=options.ganglia_spoof_host, - ganglia_port=options.ganglia_port, - gmetric_exec=options.gmetric_exec, - gmetric_options=options.gmetric_options, - flush_interval=options.flush_interval, - no_aggregate_counters=options.no_aggregate_counters, - counters_prefix=options.counters_prefix, - timers_prefix=options.timers_prefix, - expire=options.expire) - + + backend = create_instance(options.transport, vars(options)) + + + server = Server(options.pct, options.debug, options.flush_interval, + options.expire, options.no_aggregate_counters, options.delete_gauges, + backends=[backend]) + server.serve(options.name, options.port) @@ -256,11 +247,14 @@ def run_server(): parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='debug mode', default=False) parser.add_argument('-n', '--name', dest='name', help='hostname to run on ', default='') parser.add_argument('-p', '--port', dest='port', help='port to run on (default: 8125)', type=int, default=8125) - parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library) or ganglia-gmetric (uses gmetric)', type=str, default="graphite") + parser.add_argument('-r', '--transport', dest='transport', help='transport to use graphite, ganglia (uses embedded library), ganglia-gmetric (uses gmetric) or console', type=str, default="graphite") # Graphite parser.add_argument('--graphite-port', dest='graphite_port', help='port to connect to graphite on (default: 2003)', type=int, default=2003) parser.add_argument('--graphite-host', dest='graphite_host', help='host to connect to graphite on (default: localhost)', type=str, default='localhost') + parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) + parser.add_argument('--counters-prefix', dest='counters_prefix', help='prefix to append before sending counter data to graphite (default: stats)', type=str, default='stats') + parser.add_argument('--timers-prefix', dest='timers_prefix', help='prefix to append before sending timing data to graphite (default: stats.timers)', type=str, default='stats.timers') # Uses embedded Ganglia Library parser.add_argument('--ganglia-port', dest='ganglia_port', help='Unicast port to connect to ganglia on', type=int, default=8649) @@ -271,18 +265,19 @@ def run_server(): parser.add_argument('--ganglia-gmetric-exec', dest='gmetric_exec', help='Use gmetric executable. Defaults to /usr/bin/gmetric', type=str, default="/usr/bin/gmetric") parser.add_argument('--ganglia-gmetric-options', dest='gmetric_options', help='Options to pass to gmetric. Defaults to -d 60', type=str, default="-d 60") - # Other options + # Server options parser.add_argument('--flush-interval', dest='flush_interval', help='how often to send data to graphite in millis (default: 10000)', type=int, default=10000) parser.add_argument('--no-aggregate-counters', dest='no_aggregate_counters', help='should statsd report counters as absolute instead of count/sec', action='store_true') - parser.add_argument('--global-prefix', dest='global_prefix', help='prefix to append to all stats sent to graphite. Useful for hosted services (ex: Hosted Graphite) or stats namespacing (default: None)', type=str, default=None) - parser.add_argument('--counters-prefix', dest='counters_prefix', help='prefix to append before sending counter data to graphite (default: stats)', type=str, default='stats') - parser.add_argument('--timers-prefix', dest='timers_prefix', help='prefix to append before sending timing data to graphite (default: stats.timers)', type=str, default='stats.timers') parser.add_argument('-t', '--pct', dest='pct', help='stats pct threshold (default: 90)', type=int, default=90) + parser.add_argument('--delete-gauges', dest='delete_gauges', action='store_true', help='Delete gauges once they have been sent to the backends', default=False) + + # Process options parser.add_argument('-D', '--daemon', dest='daemonize', action='store_true', help='daemonize', default=False) parser.add_argument('--pidfile', dest='pidfile', action='store', help='pid file', default='/var/run/pystatsd.pid') parser.add_argument('--restart', dest='restart', action='store_true', help='restart a running daemon', default=False) parser.add_argument('--stop', dest='stop', action='store_true', help='stop a running daemon', default=False) parser.add_argument('--expire', dest='expire', help='time-to-live for old stats (in secs)', type=int, default=0) + options = parser.parse_args(sys.argv[1:]) log_level = logging.DEBUG if options.debug else logging.INFO