baseplate.lib.events

Client library for sending events to the data processing system.

This is for use with the event collector system. Events generally track something that happens in production that we want to instrument for planning and analytical purposes.

Events are serialized and put onto a message queue on the same server. These serialized events are then consumed and published to the remote event collector by a separate daemon.

Building Events

Thrift Schema v2 Events

For modern Thrift-based events: import the event schemas into your project, instantiate and fill out an event object, and pass it into the queue:

import time
import uuid

from baseplate import Baseplate
from baseplate.lib.events import EventQueue
from baseplate.lib.events import serialize_v2_event

from event_schemas.event.ttypes import Event


def my_handler(request):
    event = Event(
        source="baseplate",
        action="test",
        noun="baseplate",
        client_timestamp=time.time() * 1000,
        uuid=str(uuid.uuid4()),
    )
    request.events_v2.put(ev2)


def make_wsgi_app(app_config):
    ...

    baseplate = Baseplate(app_config)
    baseplate.configure_context(
        {
            ...

            "events_v2": EventQueue("v2", serialize_v2_events),

            ...
        }
    )

    ...

Queuing Events

class baseplate.lib.events.EventQueue(name, event_serializer)[source]

A queue to transfer events to the publisher.

Parameters
  • name (str) – The name of the event queue to send to. This specifies which publisher should send the events which can be useful for routing to different event pipelines (prod/test/v2 etc.).

  • event_serializer (Callable[[~T], bytes]) – A callable that takes an event object and returns serialized bytes ready to send on the wire. See below for options.

put(event)[source]

Add an event to the queue.

The queue is local to the server this code is run on. The event publisher on the server will take these events and send them to the collector.

Parameters

event (~T) – The event to send. The type of event object passed in depends on the selected event_serializer.

Raises

EventTooLargeError The serialized event is too large.

Raises

EventQueueFullError The queue is full. Events are not being published fast enough.

Return type

None

The EventQueue also implements ContextFactory so it can be used with add_to_context():

event_queue = EventQueue("production", serialize_v2_event)
baseplate.add_to_context("events_production", event_queue)

It can then be used from the RequestContext during requests:

def some_service_method(self, context):
    event = Event(...)
    context.events_production.put(event)

Serializers

The event_serializer parameter to EventQueue is a callable which serializes a given event object. Baseplate comes with a serializer for the Thrift schema based V2 event system:

baseplate.lib.events.serialize_v2_event(event)[source]

Serialize a Thrift struct to bytes for the V2 event protocol.

Parameters

event (Any) – A Thrift struct from the event schemas.

Return type

bytes

Exceptions

exception baseplate.lib.events.EventError[source]

Base class for event related exceptions.

exception baseplate.lib.events.EventTooLargeError(size)[source]

Raised when a serialized event is too large to send.

exception baseplate.lib.events.EventQueueFullError[source]

Raised when the queue of events is full.

This usually indicates that the event publisher is having trouble talking to the event collector.

Publishing Events

Events that are put onto an EventQueue are consumed by a separate process and published to the remote event collector service. The publisher is in baseplate and can be run as follows:

$ python -m baseplate.sidecars.event_publisher --queue-name something config_file.ini

The publisher will look at the specified INI file to find its configuration. Given a queue name of something (as in the example above), it will expect a section in the INI file called [event-publisher:something] with content like below:

[event-publisher:something]
collector.hostname = some-domain.example.com

key.name = NameOfASecretKey
key.secret = Base64-encoded-blob-of-randomness

metrics.namespace = a.name.to.put.metrics.under
metrics.endpoint = the-statsd-host:1234