Source code for baseplate.clients.redis

from math import ceil
from time import perf_counter
from typing import Any
from typing import Dict
from typing import Optional

import redis

# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
    from redis.client import StrictPipeline as Pipeline  # type: ignore
except ImportError:
    from redis.client import Pipeline

from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import message_queue
from baseplate.lib import metrics

from baseplate.lib.prometheus_metrics import default_latency_buckets

PROM_PREFIX = "redis_client"

    "Latency histogram for calls made by clients",

    "Total number of requests made by client",

    "Number of active requests for a given client",

PROM_LABELS = ["redis_pool"]

    "Maximum number of connections allowed in this redis client connection pool",
    "Number of idle connections in this redis client connection pool",
    "Number of open connections in this redis client connection pool",

[docs]def pool_from_config( app_config: config.RawConfig, prefix: str = "redis.", **kwargs: Any ) -> redis.ConnectionPool: """Make a ConnectionPool from a configuration dictionary. The keys useful to :py:func:`pool_from_config` should be prefixed, e.g. ``redis.url``, ``redis.max_connections``, etc. The ``prefix`` argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the :py:class:`redis.ConnectionPool` constructor. Supported keys: * ``url`` (required): a URL like ``redis://localhost/0``. * ``max_connections``: an integer maximum number of connections in the pool * ``socket_connect_timeout``: how long to wait for sockets to connect. e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) * ``socket_timeout``: how long to wait for socket operations, e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) """ assert prefix.endswith(".") parser = config.SpecParser( { "url": config.String, "max_connections": config.Optional(config.Integer, default=None), "socket_connect_timeout": config.Optional(config.Timespan, default=None), "socket_timeout": config.Optional(config.Timespan, default=None), } ) options = parser.parse(prefix[:-1], app_config) if options.max_connections is not None: kwargs.setdefault("max_connections", options.max_connections) if options.socket_connect_timeout is not None: kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds()) if options.socket_timeout is not None: kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds()) return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
[docs]class RedisClient(config.Parser): """Configure a Redis client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. :param redis_client_name: The name of this Redis client. Prefer to use `client_name` keyword argument to the `pool_from_config` function. Use this if your using a Redis host or proxy that doesn't support the `CLIENT SETNAME` function. See :py:func:`pool_from_config` for available configuration settings. """ def __init__(self, redis_client_name: str = "", **kwargs: Any): # This is for backwards compatibility. Originally we asked clients to # set the `client_name` attribute to get the `redis_client_name` # tag to appear on Prometheus metrics. Unfortunately this broke clients # that use a proxy to connect to Redis. # See: client_name = redis_client_name if client_name == "": client_name = kwargs.get("client_name", "") self.kwargs = kwargs self.redis_client_name = client_name def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory": connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs) return RedisContextFactory( connection_pool=connection_pool, name=key_path, redis_client_name=self.redis_client_name, )
[docs]class RedisContextFactory(ContextFactory): """Redis client context factory. This factory will attach a :py:class:`~baseplate.clients.redis.MonitoredRedisConnection` to an attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands are executed via this connection object, they will use connections from the provided :py:class:`redis.ConnectionPool` and automatically record diagnostic information. :param connection_pool: A connection pool. """ def __init__( self, connection_pool: redis.ConnectionPool, name: str = "redis", redis_client_name: str = "", ): self.connection_pool = connection_pool = name self.redis_client_name = redis_client_name
[docs] def report_runtime_metrics(self, batch: metrics.Client) -> None: if not isinstance(self.connection_pool, redis.BlockingConnectionPool): return size = self.connection_pool.max_connections open_connections_num = len(self.connection_pool._connections) # type: ignore available = self.connection_pool.pool.qsize() in_use = size - available MAX_CONNECTIONS.labels( IDLE_CONNECTIONS.labels( OPEN_CONNECTIONS.labels( batch.gauge("pool.size").replace(size) batch.gauge("pool.in_use").replace(in_use) batch.gauge("pool.open_and_available").replace(open_connections_num - in_use)
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredRedisConnection": return MonitoredRedisConnection( context_name=name, server_span=span, connection_pool=self.connection_pool, redis_client_name=self.redis_client_name, )
# pylint: disable=too-many-public-methods
[docs]class MonitoredRedisConnection(redis.StrictRedis): """Redis connection that collects diagnostic information. This connection acts like :py:class:`redis.StrictRedis` except that all operations are automatically wrapped with diagnostic collection. The interface is the same as that class except for the :py:meth:`~baseplate.clients.redis.MonitoredRedisConnection.pipeline` method. """ def __init__( self, context_name: str, server_span: Span, connection_pool: redis.ConnectionPool, redis_client_name: str = "", ): self.context_name = context_name self.server_span = server_span self.redis_client_name = redis_client_name super().__init__(connection_pool=connection_pool)
[docs] def execute_command(self, *args: Any, **kwargs: Any) -> Any: command = args[0] trace_name = f"{self.context_name}.{command}" labels = { f"{PROM_LABELS_PREFIX}_command": command, f"{PROM_LABELS_PREFIX}_client_name": self.redis_client_name, f"{PROM_LABELS_PREFIX}_database": self.connection_pool.connection_kwargs.get("db", ""), f"{PROM_LABELS_PREFIX}_type": "standalone", } with self.server_span.make_child(trace_name), ACTIVE_REQUESTS.labels( **labels ).track_inprogress(): start_time = perf_counter() success = "true" try: res = super().execute_command(command, *args[1:], **kwargs) if isinstance(res, redis.RedisError): success = "false" return res except: # noqa: E722 success = "false" raise finally: result_labels = {**labels, f"{PROM_LABELS_PREFIX}_success": success} REQUESTS_TOTAL.labels(**result_labels).inc() LATENCY_SECONDS.labels(**result_labels).observe(perf_counter() - start_time)
# pylint: disable=arguments-renamed
[docs] def pipeline( # type: ignore self, name: str, transaction: bool = True, shard_hint: Optional[str] = None ) -> "MonitoredRedisPipeline": """Create a pipeline. This returns an object on which you can call the standard Redis commands. Execution will be deferred until ``execute`` is called. This is useful for saving round trips. :param name: The name to attach to diagnostics for this pipeline. :param transaction: Whether or not the commands in the pipeline are wrapped with a transaction and executed atomically. """ return MonitoredRedisPipeline( f"{self.context_name}.pipeline_{name}", self.server_span, self.connection_pool, self.response_callbacks, transaction=transaction, shard_hint=shard_hint, redis_client_name=self.redis_client_name, )
# these commands are not yet implemented, but probably not unimplementable
[docs] def transaction(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
class MonitoredRedisPipeline(Pipeline): def __init__( self, trace_name: str, server_span: Span, connection_pool: redis.ConnectionPool, response_callbacks: Dict, redis_client_name: str = "", **kwargs: Any, ): self.trace_name = trace_name self.server_span = server_span self.redis_client_name = redis_client_name super().__init__(connection_pool, response_callbacks, **kwargs) # pylint: disable=arguments-differ def execute(self, **kwargs: Any) -> Any: with self.server_span.make_child(self.trace_name): success = "true" start_time = perf_counter() labels = { f"{PROM_LABELS_PREFIX}_command": "pipeline", f"{PROM_LABELS_PREFIX}_client_name": self.redis_client_name, f"{PROM_LABELS_PREFIX}_database": self.connection_pool.connection_kwargs.get( "db", "" ), f"{PROM_LABELS_PREFIX}_type": "standalone", } ACTIVE_REQUESTS.labels(**labels).inc() try: return super().execute(**kwargs) except: # noqa: E722 success = "false" raise finally: ACTIVE_REQUESTS.labels(**labels).dec() result_labels = { **labels, f"{PROM_LABELS_PREFIX}_success": success, } REQUESTS_TOTAL.labels(**result_labels).inc() LATENCY_SECONDS.labels(**result_labels).observe(perf_counter() - start_time)
[docs]class MessageQueue: """A Redis-backed variant of :py:class:`~baseplate.lib.message_queue.MessageQueue`. :param name: can be any string. :param client: should be a :py:class:`redis.ConnectionPool` or :py:class:`redis.BlockingConnectionPool` from which a client connection can be created from (preferably generated from the :py:func:`pool_from_config` helper). """ def __init__(self, name: str, client: redis.ConnectionPool): self.queue = name if isinstance(client, (redis.BlockingConnectionPool, redis.ConnectionPool)): self.client = redis.Redis(connection_pool=client) else: self.client = client
[docs] def get(self, timeout: Optional[float] = None) -> bytes: """Read a message from the queue. :param timeout: If the queue is empty, the call will block up to ``timeout`` seconds or forever if ``None``, if a float is given, it will be rounded up to be an integer :raises: :py:exc:`~baseplate.lib.message_queue.TimedOutError` The queue was empty for the allowed duration of the call. """ if isinstance(timeout, float): timeout = int(ceil(timeout)) if timeout == 0: message = self.client.lpop(self.queue) else: message = self.client.blpop(self.queue, timeout=timeout or 0) if message: message = message[1] if not message: raise message_queue.TimedOutError return message
[docs] def put( # pylint: disable=unused-argument self, message: bytes, timeout: Optional[float] = None ) -> None: """Add a message to the queue. :param message: will be typecast to a string upon storage and will come out of the queue as a string regardless of what type they are when passed into this method. """ self.client.rpush(self.queue, message)
[docs] def close(self) -> None: """Close queue when finished. Will delete the queue from the Redis server (Note, can still enqueue and dequeue as the actions will recreate the queue) """ self.client.delete(self.queue)