baseplate.frameworks.queue_consumer.kafka

This module provides a QueueConsumerFactory that allows you to run a QueueConsumerServer that integrates Baseplate’s facilities with Kafka.

An abbreviated example of it in use:

import confluent_kafka
from baseplate import RequestContext
from typing import Any
from typing import Dict

def process_links(
    context: RequestContext,
    data: Dict[str, Any],
    message: confluent_kafka.Message,
):
    print(f"processing {data}")

def make_consumer_factory(app_config):
    baseplate = Baseplate(app_config)
    return InOrderConsumerFactory.new(
        name="kafka_consumer.link_consumer_v0",
        baseplate=baseplate,
        bootstrap_servers="127.0.0.1:9092",
        group_id="service.link_consumer",
        topics=["new_links", "edited_links"],
        handler_fn=process_links,
    )

This will create a Kafka consumer group named 'service.link_consumer' that consumes from the topics 'new_links' and 'edited_links'. Messages read from those topics will be fed to process_links.

Factory

class baseplate.frameworks.queue_consumer.kafka.InOrderConsumerFactory(name, baseplate, consumer, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)[source]

Factory for running a QueueConsumerServer using Kafka.

The InOrderConsumerFactory attempts to achieve in order, exactly once message processing.

This will run a single KafkaConsumerWorker that reads messages from Kafka and puts them into an internal work queue. Then it will run a single KafkaMessageHandler that reads messages from the internal work queue, processes them with the handler_fn, and then commits each message’s offset to Kafka.

This one-at-a-time, in-order processing ensures that when a failure happens during processing we don’t commit its offset (or the offset of any later messages) and that when the server restarts it will receive the failed message and attempt to process it again. Additionally, because each message’s offset is committed immediately after processing we should never process a message more than once.

For most cases where you just need a basic consumer with sensible defaults you can use InOrderConsumerFactory.new.

If you need more control, you can create the Consumer yourself and use the constructor directly.

classmethod new(name, baseplate, bootstrap_servers, group_id, topics, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)

Return a new _BaseKafkaQueueConsumerFactory.

This method will create the Consumer for you and is appropriate to use in most cases where you just need a basic consumer with sensible defaults.

This method will also enforce naming standards for the Kafka consumer group and the baseplate server span.

Parameters
  • name (str) – A name for your consumer process. Must look like “kafka_consumer.{group_name}”

  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • bootstrap_servers (str) – A comma delimited string of kafka brokers.

  • group_id (str) – The kafka consumer group id. Must look like “{service_name}.{group_name}” to help prevent collisions between services.

  • topics (Sequence[str]) – An iterable of kafka topics to consume from.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.kafka.Handler function that will process an individual message.

  • kafka_consume_batch_size (int) – The number of messages the KafkaConsumerWorker reads from Kafka in each batch. Defaults to 1.

  • message_unpack_fn (Callable[[bytes], Any]) – A function that takes one argument, the bytes message body and returns the message in the format the handler expects. Defaults to json.loads.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.

Return type

_BaseKafkaQueueConsumerFactory

__init__(name, baseplate, consumer, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)

_BaseKafkaQueueConsumerFactory constructor.

Parameters
  • name (str) – A name for your consumer process. Must look like “kafka_consumer.{group_name}”

  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • consumer (Consumer) – An instance of Consumer.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.kafka.Handler function that will process an individual message.

  • kafka_consume_batch_size (int) – The number of messages the KafkaConsumerWorker reads from Kafka in each batch. Defaults to 1.

  • message_unpack_fn (Callable[[bytes], Any]) – A function that takes one argument, the bytes message body and returns the message in the format the handler expects. Defaults to json.loads.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.

class baseplate.frameworks.queue_consumer.kafka.FastConsumerFactory(name, baseplate, consumer, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)[source]

Factory for running a QueueConsumerServer using Kafka.

The FastConsumerFactory prioritizes high throughput over exactly once message processing.

This will run a single KafkaConsumerWorker that reads messages from Kafka and puts them into an internal work queue. Then it will run multiple KafkaMessageHandler`s that read messages from the internal work queue, processes them with the `handler_fn. The number of KafkaMessageHandler processes is controlled by the max_concurrency parameter in the ~baseplate.server.queue_consumer.QueueConsumerServer configuration. Kafka partition offsets are automatically committed by the confluent_kafka.Consumer every 5 seconds, so any message that has been read by the KafkaConsumerWorker could be committed, regardless of whether it has been processed.

This server should be able to achieve very high message processing throughput due to the multiple KafkaMessageHandler processes and less frequent, background partition offset commits. This does come at a price though: messages may be processed out of order, not at all, or multiple times. This is appropriate when processing throughput is important and it’s acceptable to skip messages or process messages more than once (maybe there is ratelimiting in the handler or somewhere downstream).

Messages processed out of order: Messages are added to the internal work queue in order, but one worker may finish processing a “later” message before another worker finishes processing an “earlier” message.

Messages never processed: If the server crashes it may not have processed some messages that have already had their offsets automatically committed. When the server restarts it won’t read those messages.

Messages processed more than once: If the server crashes it may have processed some messages but not yet committed their offsets. When the server restarts it will reprocess those messages.

For most cases where you just need a basic consumer with sensible defaults you can use FastConsumerFactory.new.

If you need more control, you can create the Consumer yourself and use the constructor directly.

classmethod new(name, baseplate, bootstrap_servers, group_id, topics, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)

Return a new _BaseKafkaQueueConsumerFactory.

This method will create the Consumer for you and is appropriate to use in most cases where you just need a basic consumer with sensible defaults.

This method will also enforce naming standards for the Kafka consumer group and the baseplate server span.

Parameters
  • name (str) – A name for your consumer process. Must look like “kafka_consumer.{group_name}”

  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • bootstrap_servers (str) – A comma delimited string of kafka brokers.

  • group_id (str) – The kafka consumer group id. Must look like “{service_name}.{group_name}” to help prevent collisions between services.

  • topics (Sequence[str]) – An iterable of kafka topics to consume from.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.kafka.Handler function that will process an individual message.

  • kafka_consume_batch_size (int) – The number of messages the KafkaConsumerWorker reads from Kafka in each batch. Defaults to 1.

  • message_unpack_fn (Callable[[bytes], Any]) – A function that takes one argument, the bytes message body and returns the message in the format the handler expects. Defaults to json.loads.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.

Return type

_BaseKafkaQueueConsumerFactory

__init__(name, baseplate, consumer, handler_fn, kafka_consume_batch_size=1, message_unpack_fn=<function loads>, health_check_fn=None)

_BaseKafkaQueueConsumerFactory constructor.

Parameters
  • name (str) – A name for your consumer process. Must look like “kafka_consumer.{group_name}”

  • baseplate (Baseplate) – The Baseplate set up for your consumer.

  • consumer (Consumer) – An instance of Consumer.

  • handler_fn (Callable[[RequestContext, Any, Message], None]) – A baseplate.frameworks.queue_consumer.kafka.Handler function that will process an individual message.

  • kafka_consume_batch_size (int) – The number of messages the KafkaConsumerWorker reads from Kafka in each batch. Defaults to 1.

  • message_unpack_fn (Callable[[bytes], Any]) – A function that takes one argument, the bytes message body and returns the message in the format the handler expects. Defaults to json.loads.

  • health_check_fn (Optional[Callable[[Dict[str, Any]], bool]]) – A baseplate.server.queue_consumer.HealthcheckCallback function that can be used to customize your health check.