Source code for baseplate.lib.service_discovery
"""Integration with Synapse's ``file_output`` service discovery method.
.. note:: Production Baseplate services have Synapse hooked up to a
local HAProxy instance which will automatically route connections to
services for you if you connect to the correct address/port on
localhost. That is the preferred method of connecting to services.
The contents of this module are useful for inspecting the service
inventory directly for cases where a blind TCP connection is
insufficient (e.g. to give service addresses to a client, or for
topology-aware clients like Cassandra).
A basic example of usage::
inventory = ServiceInventory("/var/lib/synapse/example.json")
backend = inventory.get_backend()
print(backend.endpoint.address)
"""
import json
from typing import IO
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from baseplate.lib.config import Endpoint
from baseplate.lib.config import EndpointConfiguration
from baseplate.lib.file_watcher import FileWatcher
from baseplate.lib.file_watcher import WatchedFileNotAvailableError
from baseplate.lib.random import WeightedLottery
[docs]class Backend(NamedTuple):
"""A description of a service backend.
This is a tuple of several values:
``id``
A unique integer ID identifying the backend.
``name``
The name of the backend.
``endpoint``
An :py:class:`~baseplate.lib.config.EndpointConfiguration` object
describing the network address of the backend.
``weight``
An integer weight indicating how much to prefer this backend
when choosing whom to connect to.
"""
id: int
name: str
endpoint: EndpointConfiguration
weight: int
class _Inventory(NamedTuple):
backends: List[Backend]
lottery: Optional[WeightedLottery[Backend]]
def _parse(watched_file: IO) -> _Inventory:
backends = []
for d in json.load(watched_file):
endpoint = Endpoint("%s:%d" % (d["host"], d["port"]))
weight = d["weight"] if d["weight"] is not None else 1
backend = Backend(d["id"], d["name"], endpoint, weight)
backends.append(backend)
lottery = None
if backends:
lottery = WeightedLottery(backends, weight_key=lambda b: b.weight)
return _Inventory(backends, lottery)
[docs]class NoBackendsAvailableError(Exception):
"""Raised when no backends are available for this service."""
[docs]class ServiceInventory:
"""The inventory enumerates available backends for a single service.
:param filename: The absolute path to the Synapse-generated inventory file
in JSON format.
"""
def __init__(self, filename: str):
self._filewatcher = FileWatcher(filename, _parse)
[docs] def get_backends(self) -> Sequence[Backend]:
"""Return a list of all available backends in the inventory.
If the inventory file becomes unavailable, the previously seen
inventory is returned.
"""
try:
# pylint: disable=maybe-no-member
return self._filewatcher.get_data().backends
except WatchedFileNotAvailableError:
return []
[docs] def get_backend(self) -> Backend:
"""Return a randomly chosen backend from the available backends.
If weights are specified in the inventory, they will be
respected when making the random selection.
:raises: :py:exc:`NoBackendsAvailableError` if the inventory
has no available endpoints.
"""
inventory: Optional[_Inventory]
try:
inventory = self._filewatcher.get_data()
except WatchedFileNotAvailableError:
inventory = None
# pylint: disable=maybe-no-member
if not inventory or not inventory.lottery:
raise NoBackendsAvailableError
return inventory.lottery.pick()