from math import ceil
from typing import Any
from typing import Dict
from typing import Optional
import redis.client
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import message_queue
[docs]def pool_from_config(
app_config: config.RawConfig, prefix: str = "redis.", **kwargs: Any
) -> redis.ConnectionPool:
"""Make a ConnectionPool from a configuration dictionary.
The keys useful to :py:func:`pool_from_config` should be prefixed, e.g.
``redis.url``, ``redis.max_connections``, etc. The ``prefix`` argument
specifies the prefix used to filter keys. Each key is mapped to a
corresponding keyword argument on the :py:class:`redis.ConnectionPool`
constructor.
Supported keys:
* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``socket_connect_timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
* ``socket_timeout``: how long to wait for socket operations, e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"url": config.String,
"max_connections": config.Optional(config.Integer, default=None),
"socket_connect_timeout": config.Optional(config.Timespan, default=None),
"socket_timeout": config.Optional(config.Timespan, default=None),
}
)
options = parser.parse(prefix[:-1], app_config)
if options.max_connections is not None:
kwargs.setdefault("max_connections", options.max_connections)
if options.socket_connect_timeout is not None:
kwargs.setdefault("socket_connect_timeout", options.socket_connect_timeout.total_seconds())
if options.socket_timeout is not None:
kwargs.setdefault("socket_timeout", options.socket_timeout.total_seconds())
return redis.BlockingConnectionPool.from_url(options.url, **kwargs)
[docs]class RedisClient(config.Parser):
"""Configure a Redis client.
This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.
See :py:func:`pool_from_config` for available configuration settings.
"""
def __init__(self, **kwargs: Any):
self.kwargs = kwargs
def parse(self, key_path: str, raw_config: config.RawConfig) -> "RedisContextFactory":
connection_pool = pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return RedisContextFactory(connection_pool)
[docs]class RedisContextFactory(ContextFactory):
"""Redis client context factory.
This factory will attach a
:py:class:`~baseplate.clients.redis.MonitoredRedisConnection` to an
attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands
are executed via this connection object, they will use connections from the
provided :py:class:`redis.ConnectionPool` and automatically record
diagnostic information.
:param connection_pool: A connection pool.
"""
def __init__(self, connection_pool: redis.ConnectionPool):
self.connection_pool = connection_pool
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredRedisConnection":
return MonitoredRedisConnection(name, span, self.connection_pool)
# pylint: disable=too-many-public-methods
[docs]class MonitoredRedisConnection(redis.StrictRedis):
"""Redis connection that collects diagnostic information.
This connection acts like :py:class:`redis.StrictRedis` except that all
operations are automatically wrapped with diagnostic collection.
The interface is the same as that class except for the
:py:meth:`~baseplate.clients.redis.MonitoredRedisConnection.pipeline`
method.
.. note:: Locks and PubSub are currently unsupported.
"""
def __init__(self, context_name: str, server_span: Span, connection_pool: redis.ConnectionPool):
self.context_name = context_name
self.server_span = server_span
super().__init__(connection_pool=connection_pool)
[docs] def execute_command(self, *args: Any, **kwargs: Any) -> Any:
command = args[0]
trace_name = f"{self.context_name}.{command}"
with self.server_span.make_child(trace_name):
return super().execute_command(command, *args[1:], **kwargs)
# pylint: disable=arguments-differ
[docs] def pipeline( # type: ignore
self, name: str, transaction: bool = True, shard_hint: Optional[str] = None
) -> "MonitoredRedisPipeline":
"""Create a pipeline.
This returns an object on which you can call the standard Redis
commands. Execution will be deferred until ``execute`` is called. This
is useful for saving round trips.
:param name: The name to attach to diagnostics for this pipeline.
:param transaction: Whether or not the commands in the pipeline
are wrapped with a transaction and executed atomically.
"""
return MonitoredRedisPipeline(
f"{self.context_name}.pipeline_{name}",
self.server_span,
self.connection_pool,
self.response_callbacks,
transaction=transaction,
shard_hint=shard_hint,
)
# these commands are not yet implemented, but probably not unimplementable
[docs] def transaction(self, *args: Any, **kwargs: Any) -> Any:
"""Not currently implemented."""
raise NotImplementedError
[docs] def lock(self, *args: Any, **kwargs: Any) -> Any:
"""Not currently implemented."""
raise NotImplementedError
[docs] def pubsub(self, *args: Any, **kwargs: Any) -> Any:
"""Not currently implemented."""
raise NotImplementedError
class MonitoredRedisPipeline(redis.client.StrictPipeline):
def __init__(
self,
trace_name: str,
server_span: Span,
connection_pool: redis.ConnectionPool,
response_callbacks: Dict,
**kwargs: Any,
):
self.trace_name = trace_name
self.server_span = server_span
super().__init__(connection_pool, response_callbacks, **kwargs)
# pylint: disable=arguments-differ
def execute(self, **kwargs: Any) -> Any: # type: ignore
with self.server_span.make_child(self.trace_name):
return super().execute(**kwargs)
[docs]class MessageQueue:
"""A Redis-backed variant of :py:class:`~baseplate.lib.message_queue.MessageQueue`.
:param name: can be any string.
:param client: should be a :py:class:`redis.ConnectionPool` or
:py:class:`redis.BlockingConnectionPool` from which a client
connection can be created from (preferably generated from the
:py:func:`pool_from_config` helper).
"""
def __init__(self, name: str, client: redis.ConnectionPool):
self.queue = name
if isinstance(client, (redis.BlockingConnectionPool, redis.ConnectionPool)):
self.client = redis.Redis(connection_pool=client)
else:
self.client = client
[docs] def get(self, timeout: Optional[float] = None) -> bytes:
"""Read a message from the queue.
:param timeout: If the queue is empty, the call will block up to
``timeout`` seconds or forever if ``None``, if a float is given,
it will be rounded up to be an integer
:raises: :py:exc:`~baseplate.lib.message_queue.TimedOutError` The queue
was empty for the allowed duration of the call.
"""
if isinstance(timeout, float):
timeout = int(ceil(timeout))
if timeout == 0:
message = self.client.lpop(self.queue)
else:
message = self.client.blpop(self.queue, timeout=timeout or 0)
if message:
message = message[1]
if not message:
raise message_queue.TimedOutError
return message
[docs] def put( # pylint: disable=unused-argument
self, message: bytes, timeout: Optional[float] = None
) -> None:
"""Add a message to the queue.
:param message: will be typecast to a string upon storage and will come
out of the queue as a string regardless of what type they are
when passed into this method.
"""
self.client.rpush(self.queue, message)
[docs] def unlink(self) -> None:
"""Not implemented for Redis variant."""
[docs] def close(self) -> None:
"""Close queue when finished.
Will delete the queue from the Redis server (Note, can still enqueue
and dequeue as the actions will recreate the queue)
"""
self.client.delete(self.queue)