Source code for baseplate.frameworks.queue_consumer

import logging
import queue

from threading import Thread
from typing import Callable
from typing import NoReturn
from typing import Optional
from typing import Sequence
from typing import TYPE_CHECKING

import kombu

from kombu import Connection
from kombu import Exchange
from kombu import Message
from kombu import Queue
from kombu.mixins import ConsumerMixin
from kombu.transport.virtual import Channel

from baseplate import Baseplate
from baseplate import RequestContext
from baseplate import Span
from baseplate.lib.retry import RetryPolicy


logger = logging.getLogger(__name__)


Handler = Callable[[RequestContext, str, Message], None]


if TYPE_CHECKING:
    WorkQueue = queue.Queue[Message]  # pylint: disable=unsubscriptable-object
else:
    WorkQueue = queue.Queue


[docs]def consume( baseplate: Baseplate, exchange: Exchange, connection: Connection, queue_name: str, routing_keys: Sequence[str], handler: Handler, ) -> NoReturn: """Create a long-running process to consume messages from a queue. A queue with name ``queue_name`` is created and bound to the ``routing_keys`` so messages published to the ``routing_keys`` are routed to the queue. Next, the process registers a consumer that receives messages from the queue and feeds them to the ``handler``. The ``handler`` function must take 3 arguments: * ``context``: a baseplate context * ``message_body``: the text body of the message * ``message``: :py:class:`kombu.message.Message` The consumer will automatically ``ack`` each message after the handler method exits. If there is an error in processing and the message must be retried the handler should raise an exception to crash the process. This will prevent the ``ack`` and the message will be re-queued at the head of the queue. :param baseplate: A baseplate instance for the service. :param exchange: :param connection: :param queue_name: The name of the queue. :param routing_keys: List of routing keys. :param handler: The handler method. """ queues = [] for routing_key in routing_keys: queues.append(Queue(name=queue_name, exchange=exchange, routing_key=routing_key)) logger.info("registering %s as a handler for %r", handler.__name__, queues) kombu_consumer = KombuConsumer.new(connection, queues) logger.info("waiting for messages") while True: context = baseplate.make_context_object() with baseplate.make_server_span(context, queue_name) as span: message = kombu_consumer.get_message(span) handler(context, message.body, message) message.ack()
class _ConsumerWorker(ConsumerMixin): def __init__(self, connection: Connection, queues: Sequence[Queue], work_queue: WorkQueue): self.connection = connection self.queues = queues self.work_queue = work_queue def get_consumers(self, Consumer: kombu.Consumer, channel: Channel) -> Sequence[kombu.Consumer]: return [Consumer(queues=self.queues, on_message=self.on_message)] def on_message(self, message: Message) -> None: self.work_queue.put(message) def get_message(self, block: bool, timeout: Optional[float]) -> Message: try: return self.work_queue.get(block=block, timeout=timeout) except queue.Empty: return None
[docs]class BaseKombuConsumer: """Base object for consuming messages from a queue. A worker process accepts messages from the queue and puts them in a local work queue. The "real" consumer can then get messages with :py:meth:`~baseplate.frameworks.queue_consumer.BaseKombuConsumer.get_message` or :py:meth:`~baseplate.frameworks.queue_consumer.BaseKombuConsumer.get_batch`. It is that consumer's responsibility to ``ack`` or ``reject`` messages. Can be used directly, outside of standard baseplate context. """ def __init__(self, worker: _ConsumerWorker, worker_thread: Thread): self.worker = worker self.worker_thread = worker_thread
[docs] @classmethod def new( cls, connection: Connection, queues: Sequence[Queue], queue_size: int = 100 ) -> "BaseKombuConsumer": """Create and initialize a consumer. :param connection: The connection :param queues: List of queues. :param queue_size: The maximum number of messages to cache in the internal `queue.Queue` worker queue. Defaults to 100. For an infinite size (not recommended), use `queue_size=0`. """ work_queue: WorkQueue = queue.Queue(maxsize=queue_size) worker = _ConsumerWorker(connection, queues, work_queue) worker_thread = Thread(target=worker.run) worker_thread.name = "consumer message pump" worker_thread.daemon = True worker_thread.start() return cls(worker, worker_thread)
[docs] def get_message(self) -> Message: """Return a single message.""" batch = self.get_batch(max_items=1, timeout=None) return batch[0]
[docs] def get_batch(self, max_items: int, timeout: Optional[float]) -> Sequence[Message]: """Return a batch of messages. :param max_items: The maximum batch size. :param timeout: The maximum time to wait in seconds, or ``None`` for no timeout. """ if timeout == 0: block = False else: block = True batch = [] retry_policy = RetryPolicy.new(attempts=max_items, budget=timeout) for time_remaining in retry_policy: item = self.worker.get_message(block=block, timeout=time_remaining) if item is None: break batch.append(item) return batch
[docs]class KombuConsumer: """Consumer for use in baseplate. The :py:meth:`~baseplate.frameworks.queue_consumer.KombuConsumer.get_message` and :py:meth:`~baseplate.frameworks.queue_consumer.KombuConsumer.get_batch` methods will automatically record diagnostic information. """ def __init__(self, base_consumer: BaseKombuConsumer): self.base_consumer = base_consumer
[docs] @classmethod def new( cls, connection: Connection, queues: Sequence[Queue], queue_size: int = 100 ) -> "KombuConsumer": """Create and initialize a consumer. :param connection: The connection :param queues: List of queues. :param queue_size: The maximum number of messages to cache in the internal `queue.Queue` worker queue. Defaults to 100. For an infinite size (not recommended), use `queue_size=0`. """ base_consumer = BaseKombuConsumer.new(connection, queues, queue_size) return cls(base_consumer)
[docs] def get_message(self, server_span: Span) -> Message: """Return a single message. :param server_span: The span. """ child_span = server_span.make_child("kombu.get_message") child_span.set_tag("kind", "consumer") with child_span: messages = self.base_consumer.get_batch(max_items=1, timeout=None) message = messages[0] routing_key = message.delivery_info.get("routing_key", "") child_span.set_tag("routing_key", routing_key) consumer_tag = message.delivery_info.get("consumer_tag", "") child_span.set_tag("consumer_tag", consumer_tag) delivery_tag = message.delivery_info.get("delivery_tag", "") child_span.set_tag("delivery_tag", delivery_tag) exchange = message.delivery_info.get("exchange", "") child_span.set_tag("exchange", exchange) return message
[docs] def get_batch( self, server_span: Span, max_items: int, timeout: Optional[float] ) -> Sequence[Message]: """Return a batch of messages. :param server_span: The span. :param max_items: The maximum batch size. :param timeout: The maximum time to wait in seconds, or ``None`` for no timeout. """ child_span = server_span.make_child("kombu.get_batch") child_span.set_tag("kind", "consumer") with child_span: messages = self.base_consumer.get_batch(max_items, timeout) child_span.set_tag("message_count", len(messages)) return messages