baseplate.frameworks.queue_consumer.kombu

Kombu is a library for interacting with queue brokers.

This module provides a QueueConsumerFactory that allows you to run a QueueConsumerServer that integrates Baseplate’s facilities with Kombu.

An abbreviated example of it in use:

import kombu
from baseplate import RequestContext
from typing import Any

def process_links(
    context: RequestContext,
    body: Any,
    message: kombu.Message,
):
    print(f"processing {body}")

def make_consumer_factory(app_config):
    baseplate = Baseplate()
    exchange = Exchange("reddit_exchange", "direct")
    connection = Connection(
      hostname="amqp://guest:guest@reddit.local:5672",
      virtual_host="/",
    )
    queue_name = "process_links_q"
    routing_keys = ["link_created"]
    return KombuQueueConsumerFactory.new(
        baseplate=baseplate,
        exchange=exchange,
        connection=connection,
        queue_name=queue_name,
        routing_keys=routing_keys,
        handler_fn=process_links,
    )

This will create a queue named 'process_links_q' and bind the routing key 'link_created'. It will then register a consumer for 'process_links_q' to read messages and feed them to process_links.

Factory

class baseplate.frameworks.queue_consumer.kombu.KombuQueueConsumerFactory(baseplate, name, connection, queues, handler_fn, health_check_fn=None, serializer=None)[source]

Factory for running a QueueConsumerServer using Kombu.

For simple cases where you just need a basic queue with all the default parameters for your message broker, you can use KombuQueueConsumerFactory.new.

If you need more control, you can create the Queue s yourself and use the constructor directly.

classmethod new(baseplate, exchange, connection, queue_name, routing_keys, handler_fn, health_check_fn=None, serializer=None)[source]

Return a new KombuQueueConsumerFactory.

This method will create the Queue s for you and is appropriate to use in simple cases where you just need a basic queue with all the default parameters for your message broker.

Parameters:
  • baseplate (Baseplate) – The Baseplate set up for your consumer.
  • exchange (Exchange) – The kombu.Exchange that you will bind your Queue s to.
  • exchange – The kombu.Connection to your message broker.
  • queue_name (str) – Name for your queue.
  • routing_keys (Sequence[str]) – List of routing keys that you will create Queue s to consume from.
  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.komub.Handler function that will process an individual message from a queue.
  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.
  • serializer (Optional[KombuSerializer[~T]]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.
Return type:

KombuQueueConsumerFactory

__init__(baseplate, name, connection, queues, handler_fn, health_check_fn=None, serializer=None)[source]

KombuQueueConsumerFactory constructor.

Parameters:
  • baseplate (Baseplate) – The Baseplate set up for your consumer.
  • exchange – The kombu.Exchange that you will bind your Queue s to.
  • queues (Sequence[Queue]) – List of Queue s to consume from.
  • queue_name – Name for your queue.
  • routing_keys – List of routing keys that you will create Queue s to consume from.
  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.komub.Handler function that will process an individual message from a queue.
  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.
  • serializer (Optional[KombuSerializer[~T]]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.

Errors

class baseplate.frameworks.queue_consumer.kombu.FatalMessageHandlerError[source]

An error that signals that the queue process should exit.

Raising an Exception that is a subclass of FatalMessageHandlerError will cause the KombuMessageHandler to re-raise the exception rather than swallowing it which will cause the handler thread/process to stop. This, in turn, will gracefully shut down the QueueConsumerServer currently running.

Exceptions of this nature should be reserved for errors that are due to problems with the environment rather than the message itself. For example, a node that cannot get its AWS credentials.