Source code for baseplate.clients.redis

from math import ceil
from typing import Any
from typing import Dict
from typing import Optional

import redis.client

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import message_queue


[docs]def pool_from_config( app_config: config.RawConfig, prefix: str = "redis.", **kwargs: Any ) -> redis.BlockingConnectionPool: """Make a ConnectionPool from a configuration dictionary. The keys useful to :py:func:`pool_from_config` should be prefixed, e.g. ``redis.url``, ``redis.max_connections``, etc. The ``prefix`` argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the :py:class:`redis.ConnectionPool` constructor. Supported keys: * ``url`` (required): a URL like ``redis://localhost/0``. * ``max_connections``: an integer maximum number of connections in the pool * ``socket_connect_timeout``: how long to wait for sockets to connect. e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) * ``socket_timeout``: how long to wait for socket operations, e.g. ``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) """ assert prefix.endswith(".") parser = config.SpecParser( { "url": config.String, "max_connections": config.Optional(config.Integer, default=None), "socket_connect_timeout": config.Optional(config.Timespan, default=None), "socket_timeout": config.Optional(config.Timespan, default=None), } ) options = parser.parse(prefix[:-1], app_config) if options.max_connections is not None: kwargs.setdefault("max_connections", options.max_connections) if options.socket_connect_timeout is not None: kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds()) if options.socket_timeout is not None: kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds()) return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
[docs]class RedisClient(config.Parser): """Configure a Redis client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. See :py:func:`pool_from_config` for available configuration settings. """ def __init__(self, **kwargs: Any): self.kwargs = kwargs def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory": connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs) return RedisContextFactory(connection_pool)
[docs]class RedisContextFactory(ContextFactory): """Redis client context factory. This factory will attach a :py:class:`~baseplate.clients.redis.MonitoredRedisConnection` to an attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands are executed via this connection object, they will use connections from the provided :py:class:`redis.ConnectionPool` and automatically record diagnostic information. :param connection_pool: A connection pool. """ def __init__(self, connection_pool: redis.ConnectionPool): self.connection_pool = connection_pool
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredRedisConnection": return MonitoredRedisConnection(name, span, self.connection_pool)
# pylint: disable=too-many-public-methods
[docs]class MonitoredRedisConnection(redis.StrictRedis): """Redis connection that collects diagnostic information. This connection acts like :py:class:`redis.StrictRedis` except that all operations are automatically wrapped with diagnostic collection. The interface is the same as that class except for the :py:meth:`~baseplate.clients.redis.MonitoredRedisConnection.pipeline` method. .. note:: Locks and PubSub are currently unsupported. """ def __init__(self, context_name: str, server_span: Span, connection_pool: redis.ConnectionPool): self.context_name = context_name self.server_span = server_span super().__init__(connection_pool=connection_pool) # pylint: disable=arguments-differ
[docs] def execute_command(self, command: str, *args: Any, **kwargs: Any) -> Any: trace_name = f"{self.context_name}.{command}" with self.server_span.make_child(trace_name): return super().execute_command(command, *args, **kwargs)
# pylint: disable=arguments-differ
[docs] def pipeline( self, name: str, transaction: bool = True, shard_hint: Optional[str] = None ) -> "MonitoredRedisPipeline": """Create a pipeline. This returns an object on which you can call the standard Redis commands. Execution will be deferred until ``execute`` is called. This is useful for saving round trips. :param name: The name to attach to diagnostics for this pipeline. :param transaction: Whether or not the commands in the pipeline are wrapped with a transaction and executed atomically. """ return MonitoredRedisPipeline( f"{self.context_name}.pipeline_{name}", self.server_span, self.connection_pool, self.response_callbacks, transaction=transaction, shard_hint=shard_hint, )
# these commands are not yet implemented, but probably not unimplementable
[docs] def transaction(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
[docs] def lock(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
[docs] def pubsub(self, *args: Any, **kwargs: Any) -> Any: """Not currently implemented.""" raise NotImplementedError
class MonitoredRedisPipeline(redis.client.StrictPipeline): def __init__( self, trace_name: str, server_span: Span, connection_pool: redis.ConnectionPool, response_callbacks: Dict, **kwargs: Any, ): self.trace_name = trace_name self.server_span = server_span super().__init__(connection_pool, response_callbacks, **kwargs) def execute(self, **kwargs: Any) -> Any: # pylint: disable=arguments-differ with self.server_span.make_child(self.trace_name): return super().execute(**kwargs)
[docs]class MessageQueue: """A Redis-backed variant of :py:class:`~baseplate.lib.message_queue.MessageQueue`. :param name: can be any string. :param client: should be a :py:class:`redis.ConnectionPool` or :py:class:`redis.BlockingConnectionPool` from which a client connection can be created from (preferably generated from the :py:func:`pool_from_config` helper). """ def __init__(self, name: str, client: redis.ConnectionPool): self.queue = name if isinstance(client, (redis.BlockingConnectionPool, redis.ConnectionPool)): self.client = redis.Redis(connection_pool=client) else: self.client = client
[docs] def get(self, timeout: Optional[float] = None) -> bytes: """Read a message from the queue. :param timeout: If the queue is empty, the call will block up to ``timeout`` seconds or forever if ``None``, if a float is given, it will be rounded up to be an integer :raises: :py:exc:`~baseplate.lib.message_queue.TimedOutError` The queue was empty for the allowed duration of the call. """ if isinstance(timeout, float): timeout = int(ceil(timeout)) if timeout == 0: message = self.client.lpop(self.queue) else: message = self.client.blpop(self.queue, timeout=timeout) if message: message = message[1] if not message: raise message_queue.TimedOutError return message
[docs] def put( # pylint: disable=unused-argument self, message: bytes, timeout: Optional[float] = None ) -> None: """Add a message to the queue. :param message: will be typecast to a string upon storage and will come out of the queue as a string regardless of what type they are when passed into this method. """ return self.client.rpush(self.queue, message)
[docs] def close(self) -> None: """Close queue when finished. Will delete the queue from the Redis server (Note, can still enqueue and dequeue as the actions will recreate the queue) """ self.client.delete(self.queue)