Source code for baseplate.frameworks.pyramid

import base64
import logging
import sys

from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import Mapping
from typing import Optional

import pyramid.events
import pyramid.request
import pyramid.tweens
import webob.request

from pyramid.config import Configurator
from pyramid.registry import Registry
from pyramid.request import Request
from pyramid.response import Response

from baseplate import Baseplate
from baseplate import RequestContext
from baseplate import Span
from baseplate import TraceInfo
from baseplate.lib.edgecontext import EdgeContextFactory
from baseplate.thrift.ttypes import IsHealthyProbe


logger = logging.getLogger(__name__)


class SpanFinishingAppIterWrapper:
    """Wrapper for Response.app_iter that finishes the span when the iterator is done.

    The WSGI spec expects applications to return an iterable object. In the
    common case, the iterable is a single-item list containing a byte string of
    the full response. However, if the application wants to stream a response
    back to the client (e.g. it's sending a lot of data, or it wants to get
    some bytes on the wire really quickly before some database calls finish)
    the iterable can take a while to finish iterating.

    This wrapper allows us to keep the server span open until the iterable is
    finished even though our view callable returned long ago.

    """

    def __init__(self, span: Span, app_iter: Iterable[bytes]) -> None:
        self.span = span
        self.app_iter = iter(app_iter)

    def __iter__(self) -> Iterator[bytes]:
        return self

    def __next__(self) -> bytes:
        try:
            return next(self.app_iter)
        except StopIteration:
            self.span.finish()
            raise
        except:  # noqa: E722
            self.span.finish(exc_info=sys.exc_info())
            raise

    def close(self) -> None:
        if hasattr(self.app_iter, "close"):
            self.app_iter.close()  # type: ignore


def _make_baseplate_tween(
    handler: Callable[[Request], Response], _registry: Registry
) -> Callable[[Request], Response]:
    def baseplate_tween(request: Request) -> Response:
        try:
            response = handler(request)
        except:  # noqa: E722
            if hasattr(request, "span") and request.span:
                request.span.finish(exc_info=sys.exc_info())
            raise
        else:
            if request.span:
                request.span.set_tag("http.status_code", response.status_code)
                response.app_iter = SpanFinishingAppIterWrapper(request.span, response.app_iter)
        finally:
            # avoid a reference cycle
            request.start_server_span = None
        return response

    return baseplate_tween


class BaseplateEvent:
    def __init__(self, request: Request):
        self.request = request


[docs]class ServerSpanInitialized(BaseplateEvent): """Event that Baseplate fires after creating the ServerSpan for a Request. This event will be emitted before the Request is passed along to it's handler. Baseplate initializes the ServerSpan in response to a :py:class:`pyramid.events.ContextFound` event emitted by Pyramid so while we can guarantee what Baseplate has done when this event is emitted, we cannot guarantee that any other subscribers to :py:class:`pyramid.events.ContextFound` have been called or not. """
[docs]class HeaderTrustHandler: """Abstract class used by :py:class:`BaseplateConfigurator` to validate headers. See :py:class:`StaticTrustHandler` for the default implementation. """
[docs] def should_trust_trace_headers(self, request: Request) -> bool: """Return whether baseplate should parse the trace headers from the inbound request. :param request: The request :returns: Whether baseplate should parse the trace headers from the inbound request. """ raise NotImplementedError
[docs] def should_trust_edge_context_payload(self, request: Request) -> bool: """Return whether baseplate should trust the edge context headers from the inbound request. :param request: The request :returns: Whether baseplate should trust the inbound edge context headers """ raise NotImplementedError
[docs]class StaticTrustHandler(HeaderTrustHandler): """Default implementation for handling headers. This class is created automatically by BaseplateConfigurator unless you supply your own HeaderTrustHandler :param trust_headers: Whether or not to trust trace and edge context headers from inbound requests. This value will be returned by should_trust_trace_headers and should_trust_edge_context_payload. .. warning:: Do not set ``trust_headers`` to ``True`` unless you are sure your application is only accessible by trusted sources (usually backend-only services). """ def __init__(self, trust_headers: bool = False): self.trust_headers = trust_headers
[docs] def should_trust_trace_headers(self, request: Request) -> bool: return self.trust_headers
[docs] def should_trust_edge_context_payload(self, request: Request) -> bool: return self.trust_headers
# pylint: disable=too-many-ancestors class BaseplateRequest(RequestContext, pyramid.request.Request): def __init__(self, *args: Any, **kwargs: Any) -> None: context_config = kwargs.pop("context_config", None) RequestContext.__init__(self, context_config=context_config) pyramid.request.Request.__init__(self, *args, **kwargs) class RequestFactory: def __init__(self, baseplate: Baseplate): self.baseplate = baseplate def __call__(self, environ: Dict[str, str]) -> BaseplateRequest: return BaseplateRequest(environ, context_config=self.baseplate._context_config) def blank(self, path: str) -> BaseplateRequest: environ = webob.request.environ_from_url(path) return BaseplateRequest(environ, context_config=self.baseplate._context_config)
[docs]class BaseplateConfigurator: """Configuration extension to integrate Baseplate into Pyramid. :param baseplate: The Baseplate instance for your application. :param edge_context_factory: A configured factory for handling edge request context. :param header_trust_handler: An object which will be used to verify whether baseplate should parse the request context headers, for example trace ids. See StaticTrustHandler for the default implementation. """ def __init__( self, baseplate: Baseplate, edge_context_factory: Optional[EdgeContextFactory] = None, header_trust_handler: Optional[HeaderTrustHandler] = None, ): self.baseplate = baseplate self.edge_context_factory = edge_context_factory self.header_trust_handler = header_trust_handler or StaticTrustHandler(trust_headers=False) def _on_application_created(self, event: pyramid.events.ApplicationCreated) -> None: # attach the baseplate object to the application the server gets event.app.baseplate = self.baseplate def _on_new_request(self, event: pyramid.events.ContextFound) -> None: request = event.request # this request didn't match a route we know if not request.matched_route: # TODO: some metric for 404s would be good return trace_info = None if self.header_trust_handler.should_trust_trace_headers(request): try: trace_info = self._get_trace_info(request.headers) except (KeyError, ValueError): pass if self.header_trust_handler.should_trust_edge_context_payload(request): edge_payload: Optional[bytes] try: edge_payload_str = request.headers["X-Edge-Request"] edge_payload = base64.b64decode(edge_payload_str.encode()) except (KeyError, ValueError): edge_payload = None request.raw_edge_context = edge_payload if self.edge_context_factory: request.edge_context = self.edge_context_factory.from_upstream(edge_payload) span = self.baseplate.make_server_span( request, name=request.matched_route.name, trace_info=trace_info, ) span.set_tag("http.url", request.url) span.set_tag("http.method", request.method) span.set_tag("peer.ipv4", request.remote_addr) span.start() request.registry.notify(ServerSpanInitialized(request)) def _get_trace_info(self, headers: Mapping[str, str]) -> TraceInfo: sampled = bool(headers.get("X-Sampled") == "1") flags = headers.get("X-Flags", None) return TraceInfo.from_upstream( headers["X-Trace"], headers["X-Parent"], headers["X-Span"], sampled, int(flags) if flags is not None else None, ) def includeme(self, config: Configurator) -> None: config.set_request_factory(RequestFactory(self.baseplate)) config.add_subscriber(self._on_new_request, pyramid.events.ContextFound) config.add_subscriber(self._on_application_created, pyramid.events.ApplicationCreated) # Position of the tween is important. We need it to cover all code # that can written in the app. This means that it should be above # (wrap) both the exception view handling tween (EXCVIEW) and the # request handling code (MAIN). # # The final order ends up being: # 1. Request ingress (tweens.INGRESS) # 2. baseplate tween # 3. Exception view handler (tweens.EXCVIEW) # 4. App handler code (tweens.MAIN) config.add_tween( "baseplate.frameworks.pyramid._make_baseplate_tween", over=pyramid.tweens.EXCVIEW )
[docs]def get_is_healthy_probe(request: Request) -> int: """Get the thrift enum value of the probe used in http is_healthy request.""" code = request.params.get("type", str(IsHealthyProbe.READINESS)) try: return int(code) except ValueError: pass # If it's not an int, try to find it in the enum map instead. try: return IsHealthyProbe._NAMES_TO_VALUES[code.upper()] except KeyError: logger.warning( "Unrecognized health check type %s, fallback to READINESS", code, ) return IsHealthyProbe.READINESS