Source code for baseplate.clients.kombu
from typing import Any
from typing import Optional
from kombu import Connection
from kombu import Exchange
from kombu.pools import Producers
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
[docs]def connection_from_config(app_config: config.RawConfig, prefix: str, **kwargs: Any) -> Connection:
"""Make a Connection from a configuration dictionary.
The keys useful to :py:func:`connection_from_config` should be prefixed,
e.g. ``amqp.hostname`` etc. The ``prefix`` argument specifies the
prefix used to filter keys. Each key is mapped to a corresponding keyword
argument on the :py:class:`~kombu.connection.Connection` constructor. Any
keyword arguments given to this function will be passed through to the
:py:class:`~kombu.connection.Connection` constructor. Keyword arguments
take precedence over the configuration file.
Supported keys:
* ``hostname``
* ``virtual_host``
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{"hostname": config.String, "virtual_host": config.Optional(config.String)}
)
options = parser.parse(prefix[:-1], app_config)
return Connection(hostname=options.hostname, virtual_host=options.virtual_host, **kwargs)
[docs]def exchange_from_config(app_config: config.RawConfig, prefix: str, **kwargs: Any) -> Exchange:
"""Make an Exchange from a configuration dictionary.
The keys useful to :py:func:`exchange_from_config` should be prefixed,
e.g. ``amqp.exchange_name`` etc. The ``prefix`` argument specifies the
prefix used to filter keys. Each key is mapped to a corresponding keyword
argument on the :py:class:`~kombu.Exchange` constructor. Any keyword
arguments given to this function will be passed through to the
:py:class:`~kombu.Exchange` constructor. Keyword arguments take precedence
over the configuration file.
Supported keys:
* ``exchange_name``
* ``exchange_type``
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{"exchange_name": config.Optional(config.String), "exchange_type": config.String}
)
options = parser.parse(prefix[:-1], app_config)
return Exchange(name=options.exchange_name or "", type=options.exchange_type, **kwargs)
[docs]class KombuProducer(config.Parser):
"""Configure a Kombu producer.
This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.
See :py:func:`connection_from_config` and :py:func:`exchange_from_config`
for available configuration settings.
:param max_connections: The maximum number of connections.
"""
def __init__(self, max_connections: Optional[int] = None):
self.max_connections = max_connections
def parse(self, key_path: str, raw_config: config.RawConfig) -> "KombuProducerContextFactory":
connection = connection_from_config(raw_config, prefix=f"{key_path}.")
exchange = exchange_from_config(raw_config, prefix=f"{key_path}.")
return KombuProducerContextFactory(
connection, exchange, max_connections=self.max_connections
)
[docs]class KombuProducerContextFactory(ContextFactory):
"""KombuProducer context factory.
This factory will attach a proxy object which acts like a
:py:class:`kombu.Producer` to an attribute on the
:py:class:`~baseplate.RequestContext`. The
:py:meth:`~baseplate.clients.kombu.KombuProducer.publish` method will
automatically record diagnostic information.
:param connection: A configured connection object.
:param exchange: A configured exchange object
:param max_connections: The maximum number of connections.
"""
def __init__(
self, connection: Connection, exchange: Exchange, max_connections: Optional[int] = None
):
self.connection = connection
self.exchange = exchange
self.producers = Producers(limit=max_connections)
def make_object_for_context(self, name: str, span: Span) -> "_KombuProducer":
return _KombuProducer(name, span, self.connection, self.exchange, self.producers)
class _KombuProducer:
def __init__(
self,
name: str,
span: Span,
connection: Connection,
exchange: Exchange,
producers: Producers,
):
self.name = name
self.span = span
self.connection = connection
self.exchange = exchange
self.producers = producers
def publish(self, body: Any, routing_key: Optional[str] = None, **kwargs: Any) -> Any:
trace_name = "{}.{}".format(self.name, "publish")
child_span = self.span.make_child(trace_name)
child_span.set_tag("kind", "producer")
if routing_key:
child_span.set_tag("message_bus.destination", routing_key)
with child_span:
producer_pool = self.producers[self.connection]
with producer_pool.acquire(block=True) as producer:
return producer.publish(
body=body, routing_key=routing_key, exchange=self.exchange, **kwargs
)