baseplate.frameworks.queue_consumer
¶
Kombu is a library for interacting with queue brokers. This integration helps you consume messages from a queue using Kombu.
To create a long-running process to consume from a queue:
from kombu import Connection, Exchange
from baseplate import queue_consumer
def process_links(context, msg_body, msg):
print('processing %s' % msg_body)
queue_consumer.consume(
baseplate=make_baseplate(cfg, app_config),
exchange=Exchange('reddit_exchange', 'direct'),
connection=Connection(
hostname='amqp://guest:guest@reddit.local:5672',
virtual_host='/',
),
queue_name='process_links_q',
routing_keys=[
'link_created',
'link_deleted',
'link_updated',
],
handler=process_links,
)
This will create a queue named 'process_links_q'
and bind the routing keys
'link_created'
, 'link_deleted'
, and 'link_updated'
. It will then
register a consumer for 'process_links_q'
to read messages and feed them to
process_links
.
Register and run a queue consumer¶
-
baseplate.frameworks.queue_consumer.
consume
(baseplate, exchange, connection, queue_name, routing_keys, handler)[source]¶ Create a long-running process to consume messages from a queue.
A queue with name
queue_name
is created and bound to therouting_keys
so messages published to therouting_keys
are routed to the queue.Next, the process registers a consumer that receives messages from the queue and feeds them to the
handler
.The
handler
function must take 3 arguments:context
: a baseplate contextmessage_body
: the text body of the messagemessage
:kombu.message.Message
The consumer will automatically
ack
each message after the handler method exits. If there is an error in processing and the message must be retried the handler should raise an exception to crash the process. This will prevent theack
and the message will be re-queued at the head of the queue.Parameters: - baseplate (
Baseplate
) – A baseplate instance for the service. - exchange (
Exchange
) – - connection (
Connection
) – - queue_name (
str
) – The name of the queue. - routing_keys (
Sequence
[str
]) – List of routing keys. - handler (
Callable
[[RequestContext
,str
,Message
],None
]) – The handler method.
Return type: _Specialform
-
class
baseplate.frameworks.queue_consumer.
KombuConsumer
(base_consumer)[source]¶ Consumer for use in baseplate.
The
get_message()
andget_batch()
methods will automatically record diagnostic information.-
classmethod
new
(connection, queues, queue_size=100)[source]¶ Create and initialize a consumer.
Parameters: - connection (
Connection
) – The connection - queues (
Sequence
[Queue
]) – List of queues. - queue_size (
int
) – The maximum number of messages to cache in the internal queue.Queue worker queue. Defaults to 100. For an infinite size (not recommended), use queue_size=0.
Return type: - connection (
-
classmethod
If you require more direct control¶
-
class
baseplate.frameworks.queue_consumer.
BaseKombuConsumer
(worker, worker_thread)[source]¶ Base object for consuming messages from a queue.
A worker process accepts messages from the queue and puts them in a local work queue. The “real” consumer can then get messages with
get_message()
orget_batch()
. It is that consumer’s responsibility toack
orreject
messages.Can be used directly, outside of standard baseplate context.
-
classmethod
new
(connection, queues, queue_size=100)[source]¶ Create and initialize a consumer.
Parameters: - connection (
Connection
) – The connection - queues (
Sequence
[Queue
]) – List of queues. - queue_size (
int
) – The maximum number of messages to cache in the internal queue.Queue worker queue. Defaults to 100. For an infinite size (not recommended), use queue_size=0.
Return type: - connection (
-
classmethod