Source code for baseplate.clients.redis

from math import ceil
from typing import Any
from typing import Dict
from typing import Optional

import redis

from prometheus_client import Gauge

# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0
try:
    from redis.client import StrictPipeline as Pipeline  # type: ignore
except ImportError:
    from redis.client import Pipeline

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import message_queue
from baseplate.lib import metrics


[docs]def pool_from_config( app_config: config.RawConfig, prefix: str = "redis.", **kwargs: Any ) -> redis.ConnectionPool: """Make a ConnectionPool from a configuration dictionary. The keys useful to :py:func:`pool_from_config` should be prefixed, e.g. ``redis.url``, ``redis.max_connections``, etc. The ``prefix`` argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the :py:class:`redis.ConnectionPool` constructor. Supported keys: * ``url`` (required): a URL like ``redis://localhost/0``. * ``max_connections``: an integer maximum number of connections in the pool * ``socket_connect_timeout``: how long to wait for sockets to connect. e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) * ``socket_timeout``: how long to wait for socket operations, e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) """ assert prefix.endswith(".") parser = config.SpecParser( { "url": config.String, "max_connections": config.Optional(config.Integer, default=None), "socket_connect_timeout": config.Optional(config.Timespan, default=None), "socket_timeout": config.Optional(config.Timespan, default=None), } ) options = parser.parse(prefix[:-1], app_config) if options.max_connections is not None: kwargs.setdefault("max_connections", options.max_connections) if options.socket_connect_timeout is not None: kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds()) if options.socket_timeout is not None: kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds()) return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
[docs]class RedisClient(config.Parser): """Configure a Redis client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. See :py:func:`pool_from_config` for available configuration settings. """ def __init__(self, **kwargs: Any): self.kwargs = kwargs def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory": connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs) return RedisContextFactory(connection_pool, key_path)
[docs]class RedisContextFactory(ContextFactory): """Redis client context factory. This factory will attach a :py:class:`~baseplate.clients.redis.MonitoredRedisConnection` to an attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands are executed via this connection object, they will use connections from the provided :py:class:`redis.ConnectionPool` and automatically record diagnostic information. :param connection_pool: A connection pool. """ PROM_PREFIX = "bp_redis_pool" PROM_LABELS = ["pool"] max_connections = Gauge( f"{PROM_PREFIX}_max_size", "Maximum number of connections allowed in this redisbp pool", PROM_LABELS, ) idle_connections = Gauge( f"{PROM_PREFIX}_idle_connections", "Number of idle connections in this redisbp pool", PROM_LABELS, ) open_connections = Gauge( f"{PROM_PREFIX}_active_connections", "Number of open connections in this redisbp pool", PROM_LABELS, ) def __init__(self, connection_pool: redis.ConnectionPool, name: str = "redis"): self.connection_pool = connection_pool if isinstance(connection_pool, redis.BlockingConnectionPool): self.max_connections.labels(name).set_function(lambda: connection_pool.max_connections) self.idle_connections.labels(name).set_function(connection_pool.pool.qsize) self.open_connections.labels(name).set_function( lambda: len(connection_pool._connections) # type: ignore )
[docs] def report_runtime_metrics(self, batch: metrics.Client) -> None: if not isinstance(self.connection_pool, redis.BlockingConnectionPool): return size = self.connection_pool.max_connections open_connections = len(self.connection_pool._connections) # type: ignore available = self.connection_pool.pool.qsize() in_use = size - available batch.gauge("pool.size").replace(size) batch.gauge("pool.in_use").replace(in_use) batch.gauge("pool.open_and_available").replace(open_connections - in_use)
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredRedisConnection": return MonitoredRedisConnection(name, span, self.connection_pool)
# pylint: disable=too-many-public-methods
[docs]class MonitoredRedisConnection(redis.StrictRedis): """Redis connection that collects diagnostic information. This connection acts like :py:class:`redis.StrictRedis` except that all operations are automatically wrapped with diagnostic collection. The interface is the same as that class except for the :py:meth:`~baseplate.clients.redis.MonitoredRedisConnection.pipeline` method. """ def __init__(self, context_name: str, server_span: Span, connection_pool: redis.ConnectionPool): self.context_name = context_name self.server_span = server_span super().__init__(connection_pool=connection_pool)
[docs] def execute_command(self, *args: Any, **kwargs: Any) -> Any: command = args[0] trace_name = f"{self.context_name}.{command}" with self.server_span.make_child(trace_name): return super().execute_command(command, *args[1:], **kwargs)
# pylint: disable=arguments-differ
[docs] def pipeline( # type: ignore self, name: str, transaction: bool = True, shard_hint: Optional[str] = None ) -> "MonitoredRedisPipeline": """Create a pipeline. This returns an object on which you can call the standard Redis commands. Execution will be deferred until ``execute`` is called. This is useful for saving round trips. :param name: The name to attach to diagnostics for this pipeline. :param transaction: Whether or not the commands in the pipeline are wrapped with a transaction and executed atomically. """ return MonitoredRedisPipeline( f"{self.context_name}.pipeline_{name}", self.server_span, self.connection_pool, self.response_callbacks, transaction=transaction, shard_hint=shard_hint, )
# these commands are not yet implemented, but probably not unimplementable
[docs] def transaction(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
class MonitoredRedisPipeline(Pipeline): def __init__( self, trace_name: str, server_span: Span, connection_pool: redis.ConnectionPool, response_callbacks: Dict, **kwargs: Any, ): self.trace_name = trace_name self.server_span = server_span super().__init__(connection_pool, response_callbacks, **kwargs) # pylint: disable=arguments-differ def execute(self, **kwargs: Any) -> Any: with self.server_span.make_child(self.trace_name): return super().execute(**kwargs)
[docs]class MessageQueue: """A Redis-backed variant of :py:class:`~baseplate.lib.message_queue.MessageQueue`. :param name: can be any string. :param client: should be a :py:class:`redis.ConnectionPool` or :py:class:`redis.BlockingConnectionPool` from which a client connection can be created from (preferably generated from the :py:func:`pool_from_config` helper). """ def __init__(self, name: str, client: redis.ConnectionPool): self.queue = name if isinstance(client, (redis.BlockingConnectionPool, redis.ConnectionPool)): self.client = redis.Redis(connection_pool=client) else: self.client = client
[docs] def get(self, timeout: Optional[float] = None) -> bytes: """Read a message from the queue. :param timeout: If the queue is empty, the call will block up to ``timeout`` seconds or forever if ``None``, if a float is given, it will be rounded up to be an integer :raises: :py:exc:`~baseplate.lib.message_queue.TimedOutError` The queue was empty for the allowed duration of the call. """ if isinstance(timeout, float): timeout = int(ceil(timeout)) if timeout == 0: message = self.client.lpop(self.queue) else: message = self.client.blpop(self.queue, timeout=timeout or 0) if message: message = message[1] if not message: raise message_queue.TimedOutError return message
[docs] def put( # pylint: disable=unused-argument self, message: bytes, timeout: Optional[float] = None ) -> None: """Add a message to the queue. :param message: will be typecast to a string upon storage and will come out of the queue as a string regardless of what type they are when passed into this method. """ self.client.rpush(self.queue, message)
[docs] def close(self) -> None: """Close queue when finished. Will delete the queue from the Redis server (Note, can still enqueue and dequeue as the actions will recreate the queue) """ self.client.delete(self.queue)