baseplate.frameworks.queue_consumer.kombu

Kombu is a library for interacting with queue brokers.

This module provides a QueueConsumerFactory that allows you to run a QueueConsumerServer that integrates Baseplate’s facilities with Kombu.

An abbreviated example of it in use:

import kombu
from baseplate import RequestContext
from typing import Any

def process_links(
    context: RequestContext,
    body: Any,
    message: kombu.Message,
):
    print(f"processing {body}")

def make_consumer_factory(app_config):
    baseplate = Baseplate(app_config)
    exchange = Exchange("reddit_exchange", "direct")
    connection = Connection(
      hostname="amqp://guest:guest@reddit.local:5672",
      virtual_host="/",
    )
    queue_name = "process_links_q"
    routing_keys = ["link_created"]
    return KombuQueueConsumerFactory.new(
        baseplate=baseplate,
        exchange=exchange,
        connection=connection,
        queue_name=queue_name,
        routing_keys=routing_keys,
        handler_fn=process_links,
    )

This will create a queue named 'process_links_q' and bind the routing key 'link_created'. It will then register a consumer for 'process_links_q' to read messages and feed them to process_links.

Factory

class baseplate.frameworks.queue_consumer.kombu.KombuQueueConsumerFactory(baseplate, name, connection, queues, handler_fn, error_handler_fn=None, health_check_fn=None, serializer=None, worker_kwargs=None, retry_mode=RetryMode.REQUEUE, retry_limit=None)[source]

Factory for running a QueueConsumerServer using Kombu.

For simple cases where you just need a basic queue with all the default parameters for your message broker, you can use KombuQueueConsumerFactory.new.

If you need more control, you can create the Queue s yourself and use the constructor directly.

classmethod new(baseplate, exchange, connection, queue_name, routing_keys, handler_fn, error_handler_fn=None, health_check_fn=None, serializer=None, worker_kwargs=None, retry_mode=RetryMode.REQUEUE, retry_limit=None)[source]

Return a new KombuQueueConsumerFactory.

This method will create the Queue s for you and is appropriate to use in simple cases where you just need a basic queue with all the default parameters for your message broker.

Parameters:
  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • exchange (Exchange) – The kombu.Exchange that you will bind your Queue s to.

  • exchange – The kombu.Connection to your message broker.

  • queue_name (str) – Name for your queue.

  • routing_keys (Sequence[str]) – List of routing keys that you will create Queue s to consume from.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A function that will process an individual message from a queue.

  • error_handler_fn (Optional[Callable[[RequestContext, Any, Message, Exception], None]]) – A function that will be called when an error is thrown while executing the handler_fn. This function will be responsible for calling message.ack or message.requeue as it will not be automatically called by KombuMessageHandler’s handle function.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.

  • serializer (Optional[KombuSerializer]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.

  • worker_kwargs (Optional[Dict[str, Any]]) – A dictionary of keyword arguments used to configure a queue consumer.

  • retry_mode (RetryMode) – Either RetryMode.REQUEUE (default): return message into the head of a queue, like old versions did. Or RetryMode.REPUBLISH: acknowledge the message and publish a new one, with the same content, but incremented retry counter.

  • retry_limit (Optional[int]) – An number of retry attempts for the message. When the limit is reached, the message is discarded.

Return type:

KombuQueueConsumerFactory

__init__(baseplate, name, connection, queues, handler_fn, error_handler_fn=None, health_check_fn=None, serializer=None, worker_kwargs=None, retry_mode=RetryMode.REQUEUE, retry_limit=None)[source]

KombuQueueConsumerFactory constructor.

Parameters:
  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • exchange – The kombu.Exchange that you will bind your Queue s to.

  • queues (Sequence[Queue]) – List of Queue s to consume from.

  • queue_name – Name for your queue.

  • routing_keys – List of routing keys that you will create Queue s to consume from.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A function that will process an individual message from a queue.

  • error_handler_fn (Optional[Callable[[RequestContext, Any, Message, Exception], None]]) – A function that will be called when an error is thrown while executing the handler_fn. This function will be responsible for calling message.ack or message.requeue as it will not be automatically called by KombuMessageHandler’s handle function.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.

  • serializer (Optional[KombuSerializer]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.

  • worker_kwargs (Optional[Dict[str, Any]]) – A dictionary of keyword arguments used to create queue consumers.

  • retry_mode (RetryMode) – Either RetryMode.REQUEUE (default): return message into the head of a queue, like old versions did. Or RetryMode.REPUBLISH: acknowledge the message and publish a new one, with the same content, but incremented retry counter.

  • retry_limit (Optional[int]) – An number of retry attempts for the message. When the limit is reached, the message is discarded. Retry limit for specific message could also be specified in message’s own header.

Errors

class baseplate.frameworks.queue_consumer.kombu.FatalMessageHandlerError[source]

An error that signals that the queue process should exit.

Raising an Exception that is a subclass of FatalMessageHandlerError will cause the KombuMessageHandler to re-raise the exception rather than swallowing it which will cause the handler thread/process to stop. This, in turn, will gracefully shut down the QueueConsumerServer currently running.

Exceptions of this nature should be reserved for errors that are due to problems with the environment rather than the message itself. For example, a node that cannot get its AWS credentials.