Source code for baseplate.clients.memcache

from time import perf_counter
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram
from pymemcache.client.base import PooledClient

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import metrics
from baseplate.lib.prometheus_metrics import default_latency_buckets


Serializer = Callable[[str, Any], Tuple[bytes, int]]
Deserializer = Callable[[str, bytes, int], Any]


[docs]def pool_from_config( app_config: config.RawConfig, prefix: str = "memcache.", serializer: Optional[Serializer] = None, deserializer: Optional[Deserializer] = None, ) -> PooledClient: """Make a PooledClient from a configuration dictionary. The keys useful to :py:func:`pool_from_config` should be prefixed, e.g. ``memcache.endpoint``, ``memcache.max_pool_size``, etc. The ``prefix`` argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the :py:class:`~pymemcache.client.base.PooledClient` constructor. Supported keys: * ``endpoint`` (required): a string representing a host and port to connect to memcached service, e.g. ``localhost:11211`` or ``127.0.0.1:11211``. * ``max_pool_size``: an integer for the maximum pool size to use, by default this is ``2147483648``. * ``connect_timeout``: how long (as :py:func:`~baseplate.lib.config.Timespan`) to wait for a connection to memcached server. Defaults to the underlying socket default timeout. * ``timeout``: how long (as :py:func:`~baseplate.lib.config.Timespan`) to wait for calls on the socket connected to memcache. Defaults to the underlying socket default timeout. :param app_config: the raw application configuration :param prefix: prefix for configuration keys :param serializer: function to serialize values to strings suitable for being stored in memcached. An example is :py:func:`~baseplate.clients.memcache.lib.make_dump_and_compress_fn`. :param deserializer: function to convert strings returned from memcached to arbitrary objects, must be compatible with ``serializer``. An example is :py:func:`~baseplate.clients.memcache.lib.decompress_and_load`. :returns: :py:class:`pymemcache.client.base.PooledClient` """ assert prefix.endswith(".") parser = config.SpecParser( { "endpoint": config.Endpoint, "max_pool_size": config.Optional(config.Integer, default=None), "connect_timeout": config.Optional(config.TimespanWithLegacyFallback, default=None), "timeout": config.Optional(config.TimespanWithLegacyFallback, default=None), "no_delay": config.Optional(config.Boolean, default=True), } ) options = parser.parse(prefix[:-1], app_config) return PooledClient( server=options.endpoint.address, connect_timeout=options.connect_timeout and options.connect_timeout.total_seconds(), timeout=options.timeout and options.timeout.total_seconds(), serializer=serializer, deserializer=deserializer, no_delay=options.no_delay, max_pool_size=options.max_pool_size, )
[docs]class MemcacheClient(config.Parser): """Configure a Memcached client. This is meant to be used with :py:meth:`baseplate.Baseplate.configure_context`. See :py:func:`pool_from_config` for available configuration settings. :param serializer: function to serialize values to strings suitable for being stored in memcached. An example is :py:func:`~baseplate.clients.memcache.lib.make_dump_and_compress_fn`. :param deserializer: function to convert strings returned from memcached to arbitrary objects, must be compatible with ``serializer``. An example is :py:func:`~baseplate.clients.memcache.lib.decompress_and_load`. """ def __init__( self, serializer: Optional[Serializer] = None, deserializer: Optional[Deserializer] = None ): self.serializer = serializer self.deserializer = deserializer def parse(self, key_path: str, raw_config: config.RawConfig) -> "MemcacheContextFactory": pool = pool_from_config( raw_config, prefix=f"{key_path}.", serializer=self.serializer, deserializer=self.deserializer, ) return MemcacheContextFactory(pool, key_path)
[docs]class MemcacheContextFactory(ContextFactory): """Memcache client context factory. This factory will attach a :py:class:`~baseplate.clients.memcache.MonitoredMemcacheConnection` to an attribute on the :py:class:`~baseplate.RequestContext`. When memcache commands are executed via this connection object, they will use connections from the provided :py:class:`~pymemcache.client.base.PooledClient` and automatically record diagnostic information. :param pooled_client: A pooled client. """ PROM_PREFIX = "memcached_client_pool" PROM_LABELS = ["memcached_pool"] pool_size_gauge = Gauge( f"{PROM_PREFIX}_max_size", "Maximum number of connections allowed in this pool", PROM_LABELS, ) used_connections_gauge = Gauge( f"{PROM_PREFIX}_active_connections", "Number of connections in this pool currently in use", PROM_LABELS, ) free_connections_gauge = Gauge( f"{PROM_PREFIX}_idle_connections", "Number of free connections in this pool", PROM_LABELS, ) def __init__(self, pooled_client: PooledClient, name: str = "default"): self.pooled_client = pooled_client self.name = name def report_memcache_runtime_metrics(self, batch: metrics.Client) -> None: pool = self.pooled_client.client_pool self.pool_size_gauge.labels(self.name).set(pool.max_size) self.free_connections_gauge.labels(self.name).set(len(pool.free)) self.used_connections_gauge.labels(self.name).set(len(pool.used)) batch.gauge("pool.in_use").replace(len(pool.used)) batch.gauge("pool.open_and_available").replace(len(pool.free)) batch.gauge("pool.size").replace(pool.max_size)
[docs] def make_object_for_context(self, name: str, span: Span) -> "MonitoredMemcacheConnection": return MonitoredMemcacheConnection(name, span, self.pooled_client)
Key = Union[str, bytes] PROM_NAMESPACE = "memcached" LABELS_COMMON = [ f"{PROM_NAMESPACE}_address", f"{PROM_NAMESPACE}_command", ] LATENCY_SECONDS = Histogram( f"{PROM_NAMESPACE}_client_latency_seconds", "Latency histogram of outoing memcached requests", [*LABELS_COMMON, f"{PROM_NAMESPACE}_success"], buckets=default_latency_buckets, ) REQUESTS_TOTAL = Counter( f"{PROM_NAMESPACE}_client_requests_total", "Total number of memcached requests", [*LABELS_COMMON, f"{PROM_NAMESPACE}_success"], ) ACTIVE_REQUESTS = Gauge( f"{PROM_NAMESPACE}_client_active_requests", "Number of active requests", LABELS_COMMON, multiprocess_mode="livesum", ) def _prom_instrument(func: Any) -> Any: def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: labels_common = { f"{PROM_NAMESPACE}_address": str(self.pooled_client.server), f"{PROM_NAMESPACE}_command": func.__name__, } success = "true" start_time = perf_counter() try: with ACTIVE_REQUESTS.labels(**labels_common).track_inprogress(): return func(self, *args, **kwargs) except: # noqa success = "false" raise finally: REQUESTS_TOTAL.labels(**{**labels_common, f"{PROM_NAMESPACE}_success": success}).inc() LATENCY_SECONDS.labels( **{**labels_common, f"{PROM_NAMESPACE}_success": success} ).observe(perf_counter() - start_time) return wrapper
[docs]class MonitoredMemcacheConnection: """Memcache connection that collects diagnostic information. This connection acts like a :py:class:`~pymemcache.client.base.PooledClient` except that operations are wrapped with diagnostic collection. Some methods may not yet be wrapped with monitoring. Please request assistance if any needed methods are not being monitored. """ def __init__(self, context_name: str, server_span: Span, pooled_client: PooledClient): self.context_name = context_name self.server_span = server_span self.pooled_client = pooled_client @_prom_instrument def close(self) -> None: with self._make_span("close"): return self.pooled_client.close() @_prom_instrument def set(self, key: Key, value: Any, expire: int = 0, noreply: Optional[bool] = None) -> bool: with self._make_span("set") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.set(key, value, expire=expire, noreply=noreply) @_prom_instrument def set_many( self, values: Dict[Key, Any], expire: int = 0, noreply: Optional[bool] = None ) -> List[str]: with self._make_span("set_many") as span: span.set_tag("key_count", len(values)) span.set_tag("keys", make_keys_str(values.keys())) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.set_many(values, expire=expire, noreply=noreply) @_prom_instrument def replace( self, key: Key, value: Any, expire: int = 0, noreply: Optional[bool] = None ) -> bool: with self._make_span("replace") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.replace(key, value, expire=expire, noreply=noreply) @_prom_instrument def append(self, key: Key, value: Any, expire: int = 0, noreply: Optional[bool] = None) -> bool: with self._make_span("append") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.append(key, value, expire=expire, noreply=noreply) @_prom_instrument def prepend( self, key: Key, value: Any, expire: int = 0, noreply: Optional[bool] = None ) -> bool: with self._make_span("prepend") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.prepend(key, value, expire=expire, noreply=noreply) @_prom_instrument def cas( self, key: Key, value: Any, cas: int, expire: int = 0, noreply: Optional[bool] = None ) -> Optional[bool]: with self._make_span("cas") as span: span.set_tag("key", key) span.set_tag("cas", cas) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.cas(key, value, cas, expire=expire, noreply=noreply) @_prom_instrument def get(self, key: Key, default: Any = None) -> Any: with self._make_span("get") as span: span.set_tag("key", key) kwargs = {} if default is not None: kwargs["default"] = default return self.pooled_client.get(key, **kwargs) @_prom_instrument def get_many(self, keys: Sequence[Key]) -> Dict[Key, Any]: with self._make_span("get_many") as span: span.set_tag("key_count", len(keys)) span.set_tag("keys", make_keys_str(keys)) return self.pooled_client.get_many(keys) @_prom_instrument def gets( self, key: Key, default: Optional[Any] = None, cas_default: Optional[Any] = None ) -> Tuple[Any, Any]: with self._make_span("gets") as span: span.set_tag("key", key) return self.pooled_client.gets(key, default=default, cas_default=cas_default) @_prom_instrument def gets_many(self, keys: Sequence[Key]) -> Dict[Key, Tuple[Any, Any]]: with self._make_span("gets_many") as span: span.set_tag("key_count", len(keys)) span.set_tag("keys", make_keys_str(keys)) return self.pooled_client.gets_many(keys) @_prom_instrument def delete(self, key: Key, noreply: Optional[bool] = None) -> bool: with self._make_span("delete") as span: span.set_tag("key", key) span.set_tag("noreply", noreply) return self.pooled_client.delete(key, noreply=noreply) @_prom_instrument def delete_many(self, keys: Sequence[Key], noreply: Optional[bool] = None) -> bool: with self._make_span("delete_many") as span: span.set_tag("key_count", len(keys)) span.set_tag("noreply", noreply) span.set_tag("keys", make_keys_str(keys)) return self.pooled_client.delete_many(keys, noreply=noreply) @_prom_instrument def add(self, key: Key, value: Any, expire: int = 0, noreply: Optional[bool] = None) -> bool: with self._make_span("add") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.add(key, value, expire=expire, noreply=noreply) @_prom_instrument def incr(self, key: Key, value: int, noreply: Optional[bool] = False) -> Optional[int]: with self._make_span("incr") as span: span.set_tag("key", key) span.set_tag("noreply", noreply) return self.pooled_client.incr(key, value, noreply=noreply) @_prom_instrument def decr(self, key: Key, value: int, noreply: Optional[bool] = False) -> Optional[int]: with self._make_span("decr") as span: span.set_tag("key", key) span.set_tag("noreply", noreply) return self.pooled_client.decr(key, value, noreply=noreply) @_prom_instrument def touch(self, key: Key, expire: int = 0, noreply: Optional[bool] = None) -> bool: with self._make_span("touch") as span: span.set_tag("key", key) span.set_tag("expire", expire) span.set_tag("noreply", noreply) return self.pooled_client.touch(key, expire=expire, noreply=noreply) @_prom_instrument def stats(self, *args: str) -> Dict[str, Any]: with self._make_span("stats"): return self.pooled_client.stats(*args) @_prom_instrument def flush_all(self, delay: int = 0, noreply: Optional[bool] = None) -> bool: with self._make_span("flush_all") as span: span.set_tag("delay", delay) span.set_tag("noreply", noreply) return self.pooled_client.flush_all(delay=delay, noreply=noreply) @_prom_instrument def quit(self) -> None: with self._make_span("quit"): return self.pooled_client.quit() def _make_span(self, method_name: str) -> Span: """Get a child span of the current server span. The returned span is tagged with ``method_name`` and given a name that corresponds to the current context name and called method. """ trace_name = f"{self.context_name}.{method_name}" span = self.server_span.make_child(trace_name) span.set_tag("method", method_name) return span
def make_keys_str(keys: Iterable[Key]) -> str: """Make a string representation of an iterable of keys.""" keys_str = ",".join(x.decode("utf-8") if isinstance(x, bytes) else x for x in keys) if len(keys_str) > 100: return keys_str[:100] + "..." return keys_str