baseplate.frameworks.queue_consumer.deprecated

Deprecated since version 1.1: This way of creating a Baseplate queue consumer is deprecated in favor of using a QueueConsumerServer and will be removed in the next major release. Instructions for ugrading are included below.

Upgrading to a QueueConsumerServer

To start, you will be running your queue consumer as a “server” now, so it will use baseplate-serve rather than baseplate-script as the entrypoint.

- baseplate-script run.ini consumer:run
+ baseplate-serve run.ini --bind 0.0.0.0:8080

This also means that you will need to update your config file with similar sections to what you would have for an HTTP or Thrift service.

[DEFAULT]
rabbitmq.hostname = amqp://rabbit.local:5672
rabbitmq.exchange_name = my-exchange
rabbitmq.exchange_type = direct

+ [app:main]
+ factory = my_service:make_consumer_factory
+
+ [server:main]
+ factory = baseplate.server.queue_consumer
+ max_concurrency = 1

You will also need to change your code to create a KombuQueueConsumerFactory with a make_consumer_factory function rather than using queue_consumer.consume as you did for this.

from kombu import Connection, Exchange
-
- from baseplate import queue_consumer
-
- def process_links(context, msg_body, msg):
-     print('processing %s' % msg_body)
-
- def run():
-     queue_consumer.consume(
-         baseplate=make_baseplate(cfg, app_config),
-         exchange=Exchange('reddit_exchange', 'direct'),
-         connection=Connection(
-         hostname='amqp://guest:guest@reddit.local:5672',
-         virtual_host='/',
-         ),
-         queue_name='process_links_q',
-         routing_keys=[
-             'link_created',
-             'link_deleted',
-             'link_updated',
-         ],
-         handler=process_links,
-     )
+
+ from baseplate import Baseplate
+ from baseplate.frameworks.queue_consumer.kombu import (
+     KombuQueueConsumerFactory,
+ )
+
+ def process_links(context, message):
+     body = message.decode()
+     print('processing %s' % body)
+
+ def make_consumer_factory(app_config):
+     baseplate = Baseplate()
+     exchange = Exchange('reddit_exchange', 'direct')
+     connection = Connection(
+       hostname='amqp://guest:guest@reddit.local:5672',
+       virtual_host='/',
+     )
+     queue_name = 'process_links_q'
+     routing_keys = ['link_created', 'link_deleted', 'link_updated']
+     return KombuQueueConsumerFactory.new(
+         baseplate=baseplate,
+         exchange=exchange,
+         connection=connection,
+         queue_name=queue_name,
+         routing_keys=routing_keys,
+         handler_fn=process_links,
+     )

Original docs

To create a long-running process to consume from a queue:

from kombu import Connection, Exchange
from baseplate import queue_consumer

def process_links(context, msg_body, msg):
    print('processing %s' % msg_body)

def run():
    queue_consumer.consume(
        baseplate=make_baseplate(cfg, app_config),
        exchange=Exchange('reddit_exchange', 'direct'),
        connection=Connection(
            hostname='amqp://guest:guest@reddit.local:5672',
            virtual_host='/',
        ),
        queue_name='process_links_q',
        routing_keys=[
            'link_created',
            'link_deleted',
            'link_updated',
        ],
        handler=process_links,
    )

This will create a queue named 'process_links_q' and bind the routing keys 'link_created', 'link_deleted', and 'link_updated'. It will then register a consumer for 'process_links_q' to read messages and feed them to process_links.

Register and run a queue consumer

baseplate.frameworks.queue_consumer.deprecated.consume(baseplate, exchange, connection, queue_name, routing_keys, handler)[source]

Create a long-running process to consume messages from a queue.

A queue with name queue_name is created and bound to the routing_keys so messages published to the routing_keys are routed to the queue.

Next, the process registers a consumer that receives messages from the queue and feeds them to the handler.

The handler function must take 3 arguments:

The consumer will automatically ack each message after the handler method exits. If there is an error in processing and the message must be retried the handler should raise an exception to crash the process. This will prevent the ack and the message will be re-queued at the head of the queue.

Parameters:
Return type:

_Specialform

class baseplate.frameworks.queue_consumer.deprecated.KombuConsumer(base_consumer)[source]

Consumer for use in baseplate.

The get_message() and get_batch() methods will automatically record diagnostic information.

classmethod new(connection, queues, queue_size=100)[source]

Create and initialize a consumer.

Parameters:
  • connection (Connection) – The connection
  • queues (Sequence[Queue]) – List of queues.
  • queue_size (int) – The maximum number of messages to cache in the internal queue.Queue worker queue. Defaults to 100. For an infinite size (not recommended), use queue_size=0.
Return type:

KombuConsumer

get_message(server_span)[source]

Return a single message.

Parameters:server_span (Span) – The span.
Return type:Message
get_batch(server_span, max_items, timeout)[source]

Return a batch of messages.

Parameters:
  • server_span (Span) – The span.
  • max_items (int) – The maximum batch size.
  • timeout (Optional[float]) – The maximum time to wait in seconds, or None for no timeout.
Return type:

Sequence[Message]

If you require more direct control

class baseplate.frameworks.queue_consumer.deprecated.BaseKombuConsumer(worker, worker_thread, work_queue)[source]

Base object for consuming messages from a queue.

A worker process accepts messages from the queue and puts them in a local work queue. The “real” consumer can then get messages with get_message() or get_batch(). It is that consumer’s responsibility to ack or reject messages.

Can be used directly, outside of standard baseplate context.

classmethod new(connection, queues, queue_size=100)[source]

Create and initialize a consumer.

Parameters:
  • connection (Connection) – The connection
  • queues (Sequence[Queue]) – List of queues.
  • queue_size (int) – The maximum number of messages to cache in the internal queue.Queue worker queue. Defaults to 100. For an infinite size (not recommended), use queue_size=0.
Return type:

BaseKombuConsumer

get_message(timeout=None)[source]

Return a single message.

Return type:Message
get_batch(max_items, timeout)[source]

Return a batch of messages.

Parameters:
  • max_items (int) – The maximum batch size.
  • timeout (Optional[float]) – The maximum time to wait in seconds, or None for no timeout.
Return type:

Sequence[Message]