"""Application metrics via StatsD.
A client for the application metrics aggregator StatsD_. Metrics sent to
StatsD are aggregated and written to graphite. StatsD is generally used for
whole-system health monitoring and insight into usage patterns.
.. testsetup::
app_config = {"metrics.endpoint": "", "metrics.namespace": "example"}
do_something = lambda: ()
do_something_else = lambda: ()
do_another_thing = lambda: ()
Basic example usage:
.. testcode::
from baseplate.lib.metrics import metrics_client_from_config
client = metrics_client_from_config(app_config)
client.counter("events.connect").increment()
client.gauge("workers").replace(4)
with client.timer("something.todo"):
do_something()
do_something_else()
If you have multiple metrics to send, you can batch them up for efficiency:
.. testcode::
with client.batch() as batch:
batch.counter("froozles").increment()
batch.counter("blargs").decrement(delta=3)
with batch.timer("something"):
do_another_thing()
and the batch will be sent in as few packets as possible when the `with` block
ends.
.. _StatsD: https://github.com/statsd/statsd
"""
import collections
import errno
import logging
import socket
import time
from types import TracebackType
from typing import DefaultDict
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from baseplate.lib import config
logger = logging.getLogger(__name__)
def _metric_join(*nodes: bytes) -> bytes:
return b".".join(node.strip(b".") for node in nodes)
class TransportError(Exception):
pass
class MessageTooBigTransportError(TransportError):
def __init__(self, message_size: int):
super().__init__(f"could not send: message of {message_size} bytes too large")
self.message_size = message_size
class Transport:
def send(self, serialized_metric: bytes) -> None:
raise NotImplementedError
def flush(self) -> None:
raise NotImplementedError
class NullTransport(Transport):
"""A transport which doesn't send messages at all."""
def send(self, serialized_metric: bytes) -> None:
for metric_line in serialized_metric.splitlines():
logger.debug("Would send metric %r", metric_line)
def flush(self) -> None:
pass
class RawTransport(Transport):
"""A transport which sends messages on a socket."""
def __init__(self, endpoint: config.EndpointConfiguration):
self.socket = socket.socket(endpoint.family, socket.SOCK_DGRAM)
self.socket.connect(endpoint.address)
def send(self, serialized_metric: bytes) -> None:
try:
self.socket.sendall(serialized_metric)
except socket.error as exc:
if exc.errno == errno.EMSGSIZE:
raise MessageTooBigTransportError(len(serialized_metric))
raise TransportError(exc)
def flush(self) -> None:
pass
class BufferedTransport(Transport):
"""A transport which wraps another transport and buffers before sending."""
def __init__(self, transport: Transport):
self.transport = transport
self.buffer: List[bytes] = []
def send(self, serialized_metric: bytes) -> None:
self.buffer.append(serialized_metric)
def flush(self) -> None:
if self.buffer:
metrics, self.buffer = self.buffer, []
message = b"\n".join(metrics)
self.transport.send(message)
class BaseClient:
def __init__(self, transport: Transport, namespace: str):
self.transport = transport
self.namespace = namespace.encode("ascii")
def timer(self, name: str) -> "Timer":
"""Return a Timer with the given name.
:param name: The name the timer should have.
"""
timer_name = _metric_join(self.namespace, name.encode("ascii"))
return Timer(self.transport, timer_name)
def counter(self, name: str) -> "Counter":
"""Return a Counter with the given name.
The sample rate is currently up to your application to enforce.
:param name: The name the counter should have.
"""
counter_name = _metric_join(self.namespace, name.encode("ascii"))
return Counter(self.transport, counter_name)
def gauge(self, name: str) -> "Gauge":
"""Return a Gauge with the given name.
:param name: The name the gauge should have.
"""
gauge_name = _metric_join(self.namespace, name.encode("ascii"))
return Gauge(self.transport, gauge_name)
def histogram(self, name: str) -> "Histogram":
"""Return a Histogram with the given name.
:param name: The name the histogram should have.
"""
histogram_name = _metric_join(self.namespace, name.encode("ascii"))
return Histogram(self.transport, histogram_name)
[docs]class Client(BaseClient):
"""A client for StatsD."""
[docs] def batch(self) -> "Batch":
"""Return a client-like object which batches up metrics.
Batching metrics can reduce the number of packets that are sent to
the stats aggregator.
"""
return Batch(self.transport, self.namespace)
[docs]class Batch(BaseClient):
"""A batch of metrics to send to StatsD.
The batch also supports the `context manager protocol`_, for use with
Python's ``with`` statement. When the context is exited, the batch will
automatically :py:meth:`flush`.
.. _context manager protocol:
https://docs.python.org/3/reference/datamodel.html#context-managers
"""
# pylint: disable=super-init-not-called
def __init__(self, transport: Transport, namespace: bytes):
self.transport = BufferedTransport(transport)
self.namespace = namespace
self.counters: Dict[bytes, BatchCounter] = {}
def __enter__(self) -> "Batch":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.flush()
return None # don't swallow exception
[docs] def flush(self) -> None:
"""Immediately send the batched metrics."""
counters, self.counters = self.counters, {}
for counter in counters.values():
counter.flush()
try:
self.transport.flush()
except MessageTooBigTransportError as exc:
counters_by_total = list(
sorted((c for c in counters.values()), key=lambda c: c.total, reverse=True)
)
logger.warning(
"Metrics batch of %d bytes is too large to send, flush more often or reduce "
"amount done in this request. Top counters: %s",
exc.message_size,
", ".join(f"{c.name.decode()}={c.total:.0f}" for c in counters_by_total[:10]),
)
except TransportError as exc:
logger.warning("Failed to send metrics batch: %s", exc)
[docs] def counter(self, name: str) -> "Counter":
"""Return a BatchCounter with the given name.
The sample rate is currently up to your application to enforce.
:param name: The name the counter should have.
"""
counter_name = _metric_join(self.namespace, name.encode("ascii"))
batch_counter = self.counters.get(counter_name)
if batch_counter is None:
batch_counter = BatchCounter(self.transport, counter_name)
self.counters[counter_name] = batch_counter
return batch_counter
[docs]class Timer:
"""A timer for recording elapsed times.
The timer also supports the `context manager protocol`_, for use with
Python's ``with`` statement. When the context is entered the timer will
:py:meth:`start` and when exited, the timer will automatically
:py:meth:`stop`.
.. _context manager protocol:
https://docs.python.org/3/reference/datamodel.html#context-managers
"""
def __init__(self, transport: Transport, name: bytes):
self.transport = transport
self.name = name
self.start_time: Optional[float] = None
self.stopped: bool = False
self.sample_rate = 1.0
[docs] def start(self, sample_rate: float = 1.0) -> None:
"""Record the current time as the start of the timer."""
assert not self.start_time, "timer already started"
assert not self.stopped, "timer already stopped"
self.sample_rate = sample_rate
self.start_time = time.time()
[docs] def stop(self) -> None:
"""Stop the timer and record the total elapsed time."""
assert self.start_time, "timer not started"
assert not self.stopped, "timer already stopped"
now = time.time()
elapsed = now - self.start_time
self.send(elapsed, self.sample_rate)
self.stopped = True
[docs] def send(self, elapsed: float, sample_rate: float = 1.0) -> None:
"""Directly send a timer value without having to stop/start.
This can be useful when the timing was managed elsewhere and we just
want to report the result.
:param elapsed: The elapsed time in seconds to report.
"""
serialized = self.name + (f":{(elapsed * 1000.0):g}|ms".encode())
if sample_rate < 1.0:
sampling_info = f"@{sample_rate:g}".encode()
serialized = b"|".join([serialized, sampling_info])
self.transport.send(serialized)
def __enter__(self) -> None:
self.start()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.stop()
return None # don't swallow exception
[docs]class Counter:
"""A counter for counting events over time."""
def __init__(self, transport: Transport, name: bytes):
self.transport = transport
self.name = name
[docs] def increment(self, delta: float = 1.0, sample_rate: float = 1.0) -> None:
"""Increment the counter.
:param delta: The amount to increase the counter by.
:param sample_rate: What rate this counter is sampled at. [0-1].
"""
self.send(delta, sample_rate)
[docs] def decrement(self, delta: float = 1.0, sample_rate: float = 1.0) -> None:
"""Decrement the counter.
This is equivalent to :py:meth:`increment` with delta negated.
"""
self.increment(delta=-delta, sample_rate=sample_rate)
[docs] def send(self, delta: float, sample_rate: float) -> None:
"""Send the counter to the backend.
:param delta: The amount to increase the counter by.
:param sample_rate: What rate this counter is sampled at. [0-1].
"""
parts = [self.name + (f":{delta:g}".encode()), b"c"]
if sample_rate < 1.0:
parts.append(f"@{sample_rate:g}".encode())
serialized = b"|".join(parts)
self.transport.send(serialized)
class BatchCounter(Counter):
"""Counter implementation that batches multiple increment calls.
A new entry in the ``packets`` entry is created for each sample rate. For
example, if a counter is incremented multiple times with a sample rate of
1.0, there will be one entry in ``packets``. If that counter is implemented
again with a sample rate of 0.5, there will be two entries in ``packets``.
Each packet has an associated delta value.
Example usage::
counter = BatchCounter(transport, "counter_name")
counter.increment()
do_something_else()
counter.increment()
counter.flush()
The above example results in one packet indicating an increase of two
should be applied to "counter_name".
"""
def __init__(self, transport: Transport, name: bytes):
super().__init__(transport, name)
self.packets: DefaultDict[float, float] = collections.defaultdict(float)
def increment(self, delta: float = 1.0, sample_rate: float = 1.0) -> None:
"""Increment the counter.
:param delta: The amount to increase the counter by.
:param sample_rate: What rate this counter is sampled at. [0-1].
"""
self.packets[sample_rate] += delta
def decrement(self, delta: float = 1.0, sample_rate: float = 1.0) -> None:
"""Decrement the counter.
This is equivalent to :py:meth:`increment` with delta negated.
"""
self.increment(delta=-delta, sample_rate=sample_rate)
@property
def total(self) -> int:
return sum(self.packets.values()) # type: ignore
def flush(self) -> None:
for sample_rate, delta in self.packets.items():
super().send(delta, sample_rate)
[docs]class Histogram:
"""A bucketed distribution of integer values across a specific range.
Records data value counts across a configurable integer value range
with configurable buckets of value precision within that range.
Configuration of each histogram is managed by the backend service,
not by this interface. This implementation also depends on histograms
being supported by the StatsD backend. Specifically, the StatsD
backend must support the :code:`h` key, e.g. :code:`metric_name:320|h`.
"""
def __init__(self, transport: Transport, name: bytes) -> None:
self.transport = transport
self.name = name
[docs] def add_sample(self, value: float) -> None:
"""Add a new value to the histogram.
This records a new value to the histogram; the bucket it goes in
is determined by the backend service configurations.
"""
serialized = self.name + (f":{value:g}|h".encode())
self.transport.send(serialized)
[docs]class Gauge:
"""A gauge representing an arbitrary value.
.. note:: The StatsD protocol supports incrementing/decrementing gauges
from their current value. We do not support that here because this
feature is unpredictable in face of the StatsD server restarting and
the "current value" being lost.
"""
def __init__(self, transport: Transport, name: bytes):
self.transport = transport
self.name = name
[docs] def replace(self, new_value: float) -> None:
"""Replace the value held by the gauge.
This will replace the value held by the gauge with no concern for its
previous value.
.. note:: Due to the way the protocol works, it is not possible to
replace gauge values with negative numbers.
:param new_value: The new value to store in the gauge.
"""
assert new_value >= 0, "gauges cannot be replaced with negative numbers"
serialized = self.name + (f":{new_value:g}|g".encode())
self.transport.send(serialized)
def make_client(namespace: str, endpoint: config.EndpointConfiguration) -> Client:
"""Return a configured client.
:param namespace: The root key to prefix all metrics with.
:param endpoint: The endpoint to send metrics to or :py:data:`None`. If
:py:data:`None`, the returned client will discard all metrics.
:return: A configured client.
.. seealso:: :py:func:`baseplate.lib.metrics.metrics_client_from_config`.
"""
transport: Transport
if endpoint:
transport = RawTransport(endpoint)
else:
transport = NullTransport()
return Client(transport, namespace)
[docs]def metrics_client_from_config(raw_config: config.RawConfig) -> Client:
"""Configure and return a metrics client.
This expects two configuration options:
``metrics.namespace``
The root key to prefix all metrics in this application with.
``metrics.endpoint``
A ``host:port`` pair, e.g. ``localhost:2014``. If an empty string, a
client that discards all metrics will be returned.
:param raw_config: The application configuration which should have
settings for the metrics client.
:return: A configured client.
"""
cfg = config.parse_config(
raw_config,
{"metrics": {"namespace": config.String, "endpoint": config.Optional(config.Endpoint)}},
)
# pylint: disable=maybe-no-member
return make_client(cfg.metrics.namespace, cfg.metrics.endpoint)