baseplate.clients.kombu

This integration adds support for sending messages to queue brokers (like RabbitMQ) via Kombu. If you are looking to consume messages, check out the baseplate.frameworks.queue_consumer framework integration instead.

Example

To integrate it with your application, add the appropriate client declaration to your context configuration:

baseplate.configure_context(
   app_config,
   {
      ...
      "foo": KombuProducer(),
      ...
   }
)

configure it in your application’s configuration file:

[app:main]

...

# required: where to find the queue broker
foo.hostname = rabbit.local

# optional: the rabbitmq virtual host to use
foo.virtual_host = /

# required: which type of exchange to use
foo.exchange_type = topic

# optional: the name of the exchange to use (default is no name)
foo.exchange_name = bar

...

and then use the attached Producer-like object in request:

def my_method(request):
    request.foo.publish("boo!", routing_key="route_me")

Serialization

This integration also supports adding custom serializers to Kombu via the baseplate.clients.kombu.KombuSerializer interface and the baseplate.clients.kombu.register_serializer function. This serializer can be passed to the baseplate.clients.kombu.KombuProducerContextFactory for use by the baseplate.clients.kombu.KombuProducer to allow for automatic serialization when publishing.

In order to use a custom serializer, you must first register it with Kombu using the provided baseplate.clients.kombu.register_serializer function.

In-addition to the base interface, we also provide a serializer for Thrift objects: baseplate.clients.kombu.KombuThriftSerializer.

Example

serializer = KombuThriftSerializer[ThriftStruct](ThriftStruct)
register_serializer(serializer)

Interface

class baseplate.clients.kombu.KombuSerializer[source]

Interface for wrapping non-built-in serializers for Kombu.

baseplate.clients.kombu.register_serializer(serializer)[source]

Register serializer with the Kombu serialization registry.

The serializer will be registered using serializer.name and will be sent to the message broker with the header “application/x-{serializer.name}”. You need to call register_serializer before you can use a serializer for automatic serialization when publishing and deserializing when consuming.

Return type:None

Serializers

class baseplate.clients.kombu.KombuThriftSerializer(thrift_class, protocol_factory=<thrift.protocol.TBinaryProtocol.TBinaryProtocolAcceleratedFactory object>)[source]

Thrift object serializer for Kombu.

Configuration

class baseplate.clients.kombu.KombuProducer(max_connections=None, serializer=None, secrets=None)[source]

Configure a Kombu producer.

This is meant to be used with baseplate.Baseplate.configure_context().

See connection_from_config() and exchange_from_config() for available configuration settings.

Parameters:
baseplate.clients.kombu.connection_from_config(app_config, prefix, secrets=None, **kwargs)[source]

Make a Connection from a configuration dictionary.

The keys useful to connection_from_config() should be prefixed, e.g. amqp.hostname etc. The prefix argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the Connection constructor. Any keyword arguments given to this function will be passed through to the Connection constructor. Keyword arguments take precedence over the configuration file.

Supported keys:

  • credentials_secret
  • hostname
  • virtual_host
Return type:Connection
baseplate.clients.kombu.exchange_from_config(app_config, prefix, **kwargs)[source]

Make an Exchange from a configuration dictionary.

The keys useful to exchange_from_config() should be prefixed, e.g. amqp.exchange_name etc. The prefix argument specifies the prefix used to filter keys. Each key is mapped to a corresponding keyword argument on the Exchange constructor. Any keyword arguments given to this function will be passed through to the Exchange constructor. Keyword arguments take precedence over the configuration file.

Supported keys:

  • exchange_name
  • exchange_type
Return type:Exchange

Classes

class baseplate.clients.kombu.KombuProducerContextFactory(connection, exchange, max_connections=None, serializer=None)[source]

KombuProducer context factory.

This factory will attach a proxy object which acts like a kombu.Producer to an attribute on the RequestContext. The publish() method will automatically record diagnostic information.

Parameters:
  • connection (Connection) – A configured connection object.
  • exchange (Exchange) – A configured exchange object
  • max_connections (Optional[int]) – The maximum number of connections.