baseplate.frameworks.queue_consumer.kombu
¶
Kombu is a library for interacting with queue brokers.
This module provides a QueueConsumerFactory
that allows you to run a QueueConsumerServer
that integrates Baseplate’s facilities with Kombu.
An abbreviated example of it in use:
import kombu
from baseplate import RequestContext
from typing import Any
def process_links(
context: RequestContext,
body: Any,
message: kombu.Message,
):
print(f"processing {body}")
def make_consumer_factory(app_config):
baseplate = Baseplate()
exchange = Exchange("reddit_exchange", "direct")
connection = Connection(
hostname="amqp://guest:guest@reddit.local:5672",
virtual_host="/",
)
queue_name = "process_links_q"
routing_keys = ["link_created"]
return KombuQueueConsumerFactory.new(
baseplate=baseplate,
exchange=exchange,
connection=connection,
queue_name=queue_name,
routing_keys=routing_keys,
handler_fn=process_links,
)
This will create a queue named 'process_links_q'
and bind the routing key
'link_created'
. It will then register a consumer for 'process_links_q'
to read messages and feed them to process_links
.
Factory¶
-
class
baseplate.frameworks.queue_consumer.kombu.
KombuQueueConsumerFactory
(baseplate, name, connection, queues, handler_fn, health_check_fn=None, serializer=None)[source]¶ Factory for running a
QueueConsumerServer
using Kombu.For simple cases where you just need a basic queue with all the default parameters for your message broker, you can use KombuQueueConsumerFactory.new.
If you need more control, you can create the
Queue
s yourself and use the constructor directly.-
classmethod
new
(baseplate, exchange, connection, queue_name, routing_keys, handler_fn, health_check_fn=None, serializer=None)[source]¶ Return a new KombuQueueConsumerFactory.
This method will create the
Queue
s for you and is appropriate to use in simple cases where you just need a basic queue with all the default parameters for your message broker.Parameters: - baseplate (
Baseplate
) – The Baseplate set up for your consumer. - exchange (
Exchange
) – The kombu.Exchange that you will bind yourQueue
s to. - exchange – The kombu.Connection to your message broker.
- queue_name (
str
) – Name for your queue. - routing_keys (
Sequence
[str
]) – List of routing keys that you will createQueue
s to consume from. - handler_fn (
Callable
[[RequestContext
,Any
,Message
],None
]) – A baseplate.frameworks.queue_consumer.komub.Handler function that will process an individual message from a queue. - health_check_fn (
Optional
[Callable
[[Dict
[str
,Any
]],bool
]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check. - serializer (
Optional
[KombuSerializer
[~T]]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.
Return type: - baseplate (
-
__init__
(baseplate, name, connection, queues, handler_fn, health_check_fn=None, serializer=None)[source]¶ KombuQueueConsumerFactory constructor.
Parameters: - baseplate (
Baseplate
) – The Baseplate set up for your consumer. - exchange – The kombu.Exchange that you will bind your
Queue
s to. - queues (
Sequence
[Queue
]) – List ofQueue
s to consume from. - queue_name – Name for your queue.
- routing_keys – List of routing keys that you will create
Queue
s to consume from. - handler_fn (
Callable
[[RequestContext
,Any
,Message
],None
]) – A baseplate.frameworks.queue_consumer.komub.Handler function that will process an individual message from a queue. - health_check_fn (
Optional
[Callable
[[Dict
[str
,Any
]],bool
]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check. - serializer (
Optional
[KombuSerializer
[~T]]) – A baseplate.clients.kombu.KombuSerializer that should be used to decode the messages you are consuming.
- baseplate (
-
classmethod
Errors¶
-
class
baseplate.frameworks.queue_consumer.kombu.
FatalMessageHandlerError
[source]¶ An error that signals that the queue process should exit.
Raising an Exception that is a subclass of FatalMessageHandlerError will cause the KombuMessageHandler to re-raise the exception rather than swallowing it which will cause the handler thread/process to stop. This, in turn, will gracefully shut down the QueueConsumerServer currently running.
Exceptions of this nature should be reserved for errors that are due to problems with the environment rather than the message itself. For example, a node that cannot get its AWS credentials.