Source code for baseplate.clients.redis

from math import ceil
from time import perf_counter
from typing import Any
from typing import Dict
from typing import Optional

import redis

# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
try:
    from redis.client import StrictPipeline as Pipeline  # type: ignore
except ImportError:
    from redis.client import Pipeline

from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import message_queue
from baseplate.lib import metrics

from baseplate.lib.prometheus_metrics import default_latency_buckets

PROM_PREFIX = "redis_client"
PROM_LABELS_PREFIX = "redis"

PROM_SHARED_LABELS = [
    f"{PROM_LABELS_PREFIX}_command",
    f"{PROM_LABELS_PREFIX}_database",
    f"{PROM_LABELS_PREFIX}_client_name",
    f"{PROM_LABELS_PREFIX}_type",
]
LATENCY_SECONDS = Histogram(
    f"{PROM_PREFIX}_latency_seconds",
    "Latency histogram for calls made by clients",
    [*PROM_SHARED_LABELS, f"{PROM_LABELS_PREFIX}_success"],
    buckets=default_latency_buckets,
)

REQUESTS_TOTAL = Counter(
    f"{PROM_PREFIX}_requests_total",
    "Total number of requests made by client",
    [*PROM_SHARED_LABELS, f"{PROM_LABELS_PREFIX}_success"],
)

ACTIVE_REQUESTS = Gauge(
    f"{PROM_PREFIX}_active_requests",
    "Number of active requests for a given client",
    PROM_SHARED_LABELS,
    multiprocess_mode="livesum",
)

PROM_POOL_PREFIX = f"{PROM_PREFIX}_pool"
PROM_LABELS = ["redis_pool"]

MAX_CONNECTIONS = Gauge(
    f"{PROM_POOL_PREFIX}_max_size",
    "Maximum number of connections allowed in this redis client connection pool",
    PROM_LABELS,
    multiprocess_mode="livesum",
)
IDLE_CONNECTIONS = Gauge(
    f"{PROM_POOL_PREFIX}_idle_connections",
    "Number of idle connections in this redis client connection pool",
    PROM_LABELS,
    multiprocess_mode="livesum",
)
OPEN_CONNECTIONS = Gauge(
    f"{PROM_POOL_PREFIX}_active_connections",
    "Number of open connections in this redis client connection pool",
    PROM_LABELS,
    multiprocess_mode="livesum",
)


[docs]def pool_from_config( app_config: config.RawConfig, prefix: str = "redis.", **kwargs: Any ) -> redis.ConnectionPool: """Make a ConnectionPool from a configuration dictionary. The keys useful to :py:func:`pool_from_config` should be prefixed, e.g. ``redis.url``, ``redis.max_connections``, etc. The ``prefix`` argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the :py:class:`redis.ConnectionPool` constructor. Supported keys: * ``url`` (required): a URL like ``redis://localhost/0``. * ``max_connections``: an integer maximum number of connections in the pool * ``socket_connect_timeout``: how long to wait for sockets to connect. e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) * ``socket_timeout``: how long to wait for socket operations, e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) """ assert prefix.endswith(".") parser = config.SpecParser( { "url": config.String, "max_connections": config.Optional(config.Integer, default=None), "socket_connect_timeout": config.Optional(config.Timespan, default=None), "socket_timeout": config.Optional(config.Timespan, default=None), } ) options = parser.parse(prefix[:-1], app_config) if options.max_connections is not None: kwargs.setdefault("max_connections", options.max_connections) if options.socket_connect_timeout is not None: kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds()) if options.socket_timeout is not None: kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds()) return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
[docs]class RedisClient(config.Parser): """Configure a Redis client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. :param redis_client_name: The name of this Redis client. Prefer to use `client_name` keyword argument to the `pool_from_config` function. Use this if your using a Redis host or proxy that doesn't support the `CLIENT SETNAME` function. See :py:func:`pool_from_config` for available configuration settings. """ def __init__(self, redis_client_name: str = "", **kwargs: Any): # This is for backwards compatibility. Originally we asked clients to # set the `client_name` attribute to get the `redis_client_name` # tag to appear on Prometheus metrics. Unfortunately this broke clients # that use a proxy to connect to Redis. # See: https://github.com/redis/redis-py/issues/2384 client_name = redis_client_name if client_name == "": client_name = kwargs.get("client_name", "") self.kwargs = kwargs self.redis_client_name = client_name def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory": connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs) return RedisContextFactory( connection_pool=connection_pool, name=key_path, redis_client_name=self.redis_client_name, )
[docs]class RedisContextFactory(ContextFactory): """Redis client context factory. This factory will attach a :py:class:`~baseplate.clients.redis.MonitoredRedisConnection` to an attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands are executed via this connection object, they will use connections from the provided :py:class:`redis.ConnectionPool` and automatically record diagnostic information. :param connection_pool: A connection pool. """ def __init__( self, connection_pool: redis.ConnectionPool, name: str = "redis", redis_client_name: str = "", ): self.connection_pool = connection_pool self.name = name self.redis_client_name = redis_client_name
[docs] def report_runtime_metrics(self, batch: metrics.Client) -> None: if not isinstance(self.connection_pool, redis.BlockingConnectionPool): return size = self.connection_pool.max_connections open_connections_num = len(self.connection_pool._connections) # type: ignore available = self.connection_pool.pool.qsize() in_use = size - available MAX_CONNECTIONS.labels(self.name).set(size) IDLE_CONNECTIONS.labels(self.name).set(available) OPEN_CONNECTIONS.labels(self.name).set(open_connections_num) batch.gauge("pool.size").replace(size) batch.gauge("pool.in_use").replace(in_use) batch.gauge("pool.open_and_available").replace(open_connections_num - in_use)
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredRedisConnection": return MonitoredRedisConnection( context_name=name, server_span=span, connection_pool=self.connection_pool, redis_client_name=self.redis_client_name, )
# pylint: disable=too-many-public-methods
[docs]class MonitoredRedisConnection(redis.StrictRedis): """Redis connection that collects diagnostic information. This connection acts like :py:class:`redis.StrictRedis` except that all operations are automatically wrapped with diagnostic collection. The interface is the same as that class except for the :py:meth:`~baseplate.clients.redis.MonitoredRedisConnection.pipeline` method. """ def __init__( self, context_name: str, server_span: Span, connection_pool: redis.ConnectionPool, redis_client_name: str = "", ): self.context_name = context_name self.server_span = server_span self.redis_client_name = redis_client_name super().__init__(connection_pool=connection_pool)
[docs] def execute_command(self, *args: Any, **kwargs: Any) -> Any: command = args[0] trace_name = f"{self.context_name}.{command}" labels = { f"{PROM_LABELS_PREFIX}_command": command, f"{PROM_LABELS_PREFIX}_client_name": self.redis_client_name, f"{PROM_LABELS_PREFIX}_database": self.connection_pool.connection_kwargs.get("db", ""), f"{PROM_LABELS_PREFIX}_type": "standalone", } with self.server_span.make_child(trace_name), ACTIVE_REQUESTS.labels( **labels ).track_inprogress(): start_time = perf_counter() success = "true" try: res = super().execute_command(command, *args[1:], **kwargs) if isinstance(res, redis.RedisError): success = "false" return res except: # noqa: E722 success = "false" raise finally: result_labels = {**labels, f"{PROM_LABELS_PREFIX}_success": success} REQUESTS_TOTAL.labels(**result_labels).inc() LATENCY_SECONDS.labels(**result_labels).observe(perf_counter() - start_time)
# pylint: disable=arguments-renamed
[docs] def pipeline( # type: ignore self, name: str, transaction: bool = True, shard_hint: Optional[str] = None ) -> "MonitoredRedisPipeline": """Create a pipeline. This returns an object on which you can call the standard Redis commands. Execution will be deferred until ``execute`` is called. This is useful for saving round trips. :param name: The name to attach to diagnostics for this pipeline. :param transaction: Whether or not the commands in the pipeline are wrapped with a transaction and executed atomically. """ return MonitoredRedisPipeline( f"{self.context_name}.pipeline_{name}", self.server_span, self.connection_pool, self.response_callbacks, transaction=transaction, shard_hint=shard_hint, redis_client_name=self.redis_client_name, )
# these commands are not yet implemented, but probably not unimplementable
[docs] def transaction(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
class MonitoredRedisPipeline(Pipeline): def __init__( self, trace_name: str, server_span: Span, connection_pool: redis.ConnectionPool, response_callbacks: Dict, redis_client_name: str = "", **kwargs: Any, ): self.trace_name = trace_name self.server_span = server_span self.redis_client_name = redis_client_name super().__init__(connection_pool, response_callbacks, **kwargs) # pylint: disable=arguments-differ def execute(self, **kwargs: Any) -> Any: with self.server_span.make_child(self.trace_name): success = "true" start_time = perf_counter() labels = { f"{PROM_LABELS_PREFIX}_command": "pipeline", f"{PROM_LABELS_PREFIX}_client_name": self.redis_client_name, f"{PROM_LABELS_PREFIX}_database": self.connection_pool.connection_kwargs.get( "db", "" ), f"{PROM_LABELS_PREFIX}_type": "standalone", } ACTIVE_REQUESTS.labels(**labels).inc() try: return super().execute(**kwargs) except: # noqa: E722 success = "false" raise finally: ACTIVE_REQUESTS.labels(**labels).dec() result_labels = { **labels, f"{PROM_LABELS_PREFIX}_success": success, } REQUESTS_TOTAL.labels(**result_labels).inc() LATENCY_SECONDS.labels(**result_labels).observe(perf_counter() - start_time)
[docs]class MessageQueue: """A Redis-backed variant of :py:class:`~baseplate.lib.message_queue.MessageQueue`. :param name: can be any string. :param client: should be a :py:class:`redis.ConnectionPool` or :py:class:`redis.BlockingConnectionPool` from which a client connection can be created from (preferably generated from the :py:func:`pool_from_config` helper). """ def __init__(self, name: str, client: redis.ConnectionPool): self.queue = name if isinstance(client, (redis.BlockingConnectionPool, redis.ConnectionPool)): self.client = redis.Redis(connection_pool=client) else: self.client = client
[docs] def get(self, timeout: Optional[float] = None) -> bytes: """Read a message from the queue. :param timeout: If the queue is empty, the call will block up to ``timeout`` seconds or forever if ``None``, if a float is given, it will be rounded up to be an integer :raises: :py:exc:`~baseplate.lib.message_queue.TimedOutError` The queue was empty for the allowed duration of the call. """ if isinstance(timeout, float): timeout = int(ceil(timeout)) if timeout == 0: message = self.client.lpop(self.queue) else: message = self.client.blpop(self.queue, timeout=timeout or 0) if message: message = message[1] if not message: raise message_queue.TimedOutError return message
[docs] def put( # pylint: disable=unused-argument self, message: bytes, timeout: Optional[float] = None ) -> None: """Add a message to the queue. :param message: will be typecast to a string upon storage and will come out of the queue as a string regardless of what type they are when passed into this method. """ self.client.rpush(self.queue, message)
[docs] def close(self) -> None: """Close queue when finished. Will delete the queue from the Redis server (Note, can still enqueue and dequeue as the actions will recreate the queue) """ self.client.delete(self.queue)