import base64
import ipaddress
import sys
import time
from typing import Any
from typing import Optional
from typing import Type
from typing import Union
from advocate import AddrValidator
from advocate import ValidatingHTTPAdapter
from prometheus_client import Counter
from prometheus_client import Gauge
from prometheus_client import Histogram
from requests import PreparedRequest
from requests import Request
from requests import Response
from requests import Session
from requests.adapters import HTTPAdapter
from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.prometheus_metrics import default_latency_buckets
from baseplate.lib.prometheus_metrics import getHTTPSuccessLabel
[docs]def http_adapter_from_config(
app_config: config.RawConfig, prefix: str, **kwargs: Any
) -> HTTPAdapter:
"""Make an HTTPAdapter from a configuration dictionary.
The keys useful to :py:func:`http_adapter_from_config` should be prefixed,
e.g. ``http.pool_connections``, ``http.max_retries``, etc. The ``prefix``
argument specifies the prefix used. Each key is mapped to a corresponding
keyword argument on the :py:class:`~requests.adapters.HTTPAdapter`
constructor.
Supported keys:
* ``pool_connections``: The number of connections to cache (default: 10).
* ``pool_maxsize``: The maximum number of connections to keep in the pool
(default: 10).
* ``max_retries``: How many times to retry DNS lookups or connection
attempts, but never sending data (default: 0).
* ``pool_block``: Whether the connection pool will block when trying to get
a connection (default: false).
Additionally, the rules for Advocate's address filtering can be configured
with the ``filter`` sub-keys:
* ``filter.ip_allowlist``: A comma-delimited list of IP addresses (1.2.3.4)
or CIDR-notation (1.2.3.0/24) ranges that the client can always connect to
(default: anything not on the local network).
* ``filter.ip_denylist``: A comma-delimited list of IP addresses or
CIDR-notation ranges the client may never connect to (default: the local network).
* ``filter.port_allowlist``: A comma-delimited list of TCP port numbers
that the client can connect to (default: 80, 8080, 443, 8443, 8000).
* ``filter.port_denylist``: A comma-delimited list of TCP port numbers that
the client may never connect to (default: none).
* ``filter.hostname_denylist``: A comma-delimited list of hostnames that
the client may never connect to (default: none).
* ``filter.allow_ipv6``: Should the client be allowed to connect to IPv6
hosts? (default: false, note: IPv6 is tricky to apply filtering rules
comprehensively to).
"""
assert prefix.endswith(".")
parser = config.SpecParser(
{
"pool_connections": config.Optional(config.Integer, default=10),
"pool_maxsize": config.Optional(config.Integer, default=10),
"max_retries": config.Optional(config.Integer, default=0),
"pool_block": config.Optional(config.Boolean, default=False),
"filter": {
"ip_allowlist": config.Optional(config.TupleOf(ipaddress.ip_network)),
"ip_denylist": config.Optional(config.TupleOf(ipaddress.ip_network)),
"port_allowlist": config.Optional(config.TupleOf(int)),
"port_denylist": config.Optional(config.TupleOf(int)),
"hostname_denylist": config.Optional(config.TupleOf(config.String)),
"allow_ipv6": config.Optional(config.Boolean, default=False),
},
}
)
options = parser.parse(prefix[:-1], app_config)
if options.pool_connections is not None:
kwargs.setdefault("pool_connections", options.pool_connections)
if options.pool_maxsize is not None:
kwargs.setdefault("pool_maxsize", options.pool_maxsize)
if options.max_retries is not None:
kwargs.setdefault("max_retries", options.max_retries)
if options.pool_block is not None:
kwargs.setdefault("pool_block", options.pool_block)
kwargs.setdefault(
"validator",
AddrValidator(
ip_whitelist=options.filter.ip_allowlist,
ip_blacklist=options.filter.ip_denylist,
port_whitelist=options.filter.port_allowlist,
port_blacklist=options.filter.port_denylist,
hostname_blacklist=options.filter.hostname_denylist,
allow_ipv6=options.filter.allow_ipv6,
),
)
return ValidatingHTTPAdapter(**kwargs)
PROM_NAMESPACE = "http_client"
HTTP_LABELS_COMMON = [
"http_method",
"http_client_name",
]
HTTP_LABELS_TERMINAL = [*HTTP_LABELS_COMMON, "http_success"]
# Latency histogram of HTTP calls made by clients
# buckets are defined above (from 100µs to ~14.9s)
LATENCY_SECONDS = Histogram(
f"{PROM_NAMESPACE}_latency_seconds",
"Latency histogram of HTTP calls made by clients",
HTTP_LABELS_TERMINAL,
buckets=default_latency_buckets,
)
# Counter counting total HTTP requests started by a given client
REQUESTS_TOTAL = Counter(
f"{PROM_NAMESPACE}_requests_total",
"Total number of HTTP requests started by a given client",
[*HTTP_LABELS_TERMINAL, "http_response_code"],
)
# Gauge showing current number of active requests by a given client
ACTIVE_REQUESTS = Gauge(
f"{PROM_NAMESPACE}_active_requests",
"Number of active requests for a given client",
HTTP_LABELS_COMMON,
multiprocess_mode="livesum",
)
[docs]class BaseplateSession:
"""A proxy for :py:class:`requests.Session`.
Requests sent with this client will be instrumented automatically.
"""
def __init__(
self, adapter: HTTPAdapter, name: str, span: Span, client_name: Optional[str] = None
) -> None:
self.adapter = adapter
self.name = name
self.span = span
self.client_name = client_name
[docs] def delete(self, url: str, **kwargs: Any) -> Response:
"""Send a DELETE request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("DELETE", url, **kwargs)
[docs] def get(self, url: str, **kwargs: Any) -> Response:
"""Send a GET request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("GET", url, **kwargs)
[docs] def head(self, url: str, **kwargs: Any) -> Response:
"""Send a HEAD request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("HEAD", url, **kwargs)
[docs] def options(self, url: str, **kwargs: Any) -> Response:
"""Send an OPTIONS request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("OPTIONS", url, **kwargs)
[docs] def patch(self, url: str, **kwargs: Any) -> Response:
"""Send a PATCH request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("PATCH", url, **kwargs)
[docs] def post(self, url: str, **kwargs: Any) -> Response:
"""Send a POST request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("POST", url, **kwargs)
[docs] def put(self, url: str, **kwargs: Any) -> Response:
"""Send a PUT request.
See :py:func:`requests.request` for valid keyword arguments.
"""
return self.request("PUT", url, **kwargs)
[docs] def prepare_request(self, request: Request) -> PreparedRequest:
"""Construct a :py:class:`~requests.PreparedRequest` for later use.
The prepared request can be stored or manipulated and then used with
:py:meth:`send`.
"""
return request.prepare()
[docs] def request(self, method: str, url: Union[str, bytes], **kwargs: Any) -> Response:
"""Send a request.
:param method: The HTTP method of the request, e.g. ``GET``, ``PUT``, etc.
:param url: The URL to send the request to.
See :py:func:`requests.request` for valid keyword arguments.
"""
send_kwargs = {
"timeout": kwargs.pop("timeout", None),
"allow_redirects": kwargs.pop("allow_redirects", None),
"verify": kwargs.pop("verify", True),
"stream": kwargs.pop("stream", False),
}
request = Request(method=method.upper(), url=url, **kwargs)
prepared = self.prepare_request(request)
return self.send(prepared, **send_kwargs)
def _add_span_context(self, span: Span, request: PreparedRequest) -> None:
pass
[docs] def send(self, request: PreparedRequest, **kwargs: Any) -> Response:
"""Send a :py:class:`~requests.PreparedRequest`."""
active_request_label_values = {
"http_method": request.method.lower() if request.method else "",
"http_client_name": self.client_name if self.client_name is not None else self.name,
}
start_time = time.perf_counter()
try:
with self.span.make_child(f"{self.name}.request").with_tags(
{
"http.url": request.url,
"http.method": request.method.lower() if request.method else "",
"http.slug": self.client_name if self.client_name is not None else self.name,
}
) as span, ACTIVE_REQUESTS.labels(**active_request_label_values).track_inprogress():
self._add_span_context(span, request)
# we cannot re-use the same session every time because sessions re-use the same
# CookieJar and so we'd muddle cookies cross-request. if the application wants
# to keep track of cookies, it should do so itself.
#
# note: we're still getting connection pooling because we're re-using the adapter.
session = Session()
session.mount("http://", self.adapter)
session.mount("https://", self.adapter)
response = session.send(request, **kwargs)
http_status_code = response.status_code
span.set_tag("http.status_code", http_status_code)
return response
finally:
if sys.exc_info()[0] is not None:
status_code = ""
http_success = "false"
elif response and response.status_code:
http_success = getHTTPSuccessLabel(response.status_code)
status_code = str(response.status_code)
else:
status_code = ""
http_success = ""
latency_label_values = {**active_request_label_values, "http_success": http_success}
requests_total_label_values = {
**latency_label_values,
"http_response_code": str(status_code),
}
LATENCY_SECONDS.labels(**latency_label_values).observe(time.perf_counter() - start_time)
REQUESTS_TOTAL.labels(**requests_total_label_values).inc()
class InternalBaseplateSession(BaseplateSession):
def _add_span_context(self, span: Span, request: PreparedRequest) -> None:
request.headers["X-Trace"] = str(span.trace_id)
request.headers["X-Parent"] = str(span.parent_id)
request.headers["X-Span"] = str(span.id)
if span.sampled:
request.headers["X-Sampled"] = "1"
if span.flags is not None:
request.headers["X-Flags"] = str(span.flags)
try:
edge_context = span.context.raw_edge_context
except AttributeError:
pass
else:
if edge_context:
request.headers["X-Edge-Request"] = base64.b64encode(edge_context).decode()
[docs]class RequestsContextFactory(ContextFactory):
"""Requests client context factory.
This factory will attach a
:py:class:`~baseplate.clients.requests.BaseplateSession` to an attribute
on the :py:class:`~baseplate.RequestContext`. When HTTP requests are sent
via this session, they will use connections from the provided
:py:class:`~requests.adapters.HTTPAdapter` connection pools and
automatically record diagnostic information.
Note that though the connection pool is shared across calls, a new
:py:class:`~requests.Session` is created for each request so that cookies
and other state are not accidentally shared between requests. If you do
want to persist state, you will need to do it in your application.
:param adapter: A transport adapter for making HTTP requests. See
:py:func:`http_adapter_from_config`.
:param session_cls: The type for the actual session object to put on the
request context.
:param client_name: Custom name to be emitted under the http_client_name label
for prometheus metrics. Defaults back to session_cls.name if None
"""
def __init__(
self,
adapter: HTTPAdapter,
session_cls: Type[BaseplateSession],
client_name: Optional[str] = None,
) -> None:
self.adapter = adapter
self.session_cls = session_cls
self.client_name = client_name
[docs] def make_object_for_context(self, name: str, span: Span) -> BaseplateSession:
return self.session_cls(self.adapter, name, span, client_name=self.client_name)
[docs]class InternalRequestsClient(config.Parser):
"""Configure a Requests client for use with internal Baseplate HTTP services.
Requests made with this client **will** include trace context and
:doc:`edge context </api/baseplate/lib/edgecontext>`. This client should
only be used to speak to trusted internal services. URLs that resolve to
public addresses will be rejected. It is not possible to override the
Advocate address validator used by this client.
.. warning:: Requesting user-specified URLs with this client could lead to
`Server-Side Request Forgery`_. Ensure that you only request trusted URLs
e.g. hard-coded or from a local configuration file.
.. _`Server-Side Request Forgery`: https://en.wikipedia.org/wiki/Server-side_request_forgery
This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.
See :py:func:`http_adapter_from_config` for available configuration settings.
:param client_name: Custom name to be emitted under the http_client_name label
for prometheus metrics. Defaults back to session_cls.name if None
"""
def __init__(self, client_name: Optional[str] = None, **kwargs: Any) -> None:
self.client_name = client_name
self.kwargs = kwargs
if "validator" in kwargs:
raise ValueError("validator is hard-coded for internal clients")
def parse(self, key_path: str, raw_config: config.RawConfig) -> RequestsContextFactory:
# use advocate to ensure this client only ever gets used
# with internal services over the internal network.
#
# the allowlist takes precedence. allow loopback and private addresses,
# deny the rest.
validator = AddrValidator(
ip_whitelist={
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
},
ip_blacklist={ipaddress.ip_network("0.0.0.0/0")},
port_blacklist=[0], # disable the default allowlist by giving an explicit denylist
allow_ipv6=False,
)
adapter = http_adapter_from_config(
raw_config, prefix=f"{key_path}.", validator=validator, **self.kwargs
)
return RequestsContextFactory(
adapter, session_cls=InternalBaseplateSession, client_name=self.client_name
)
[docs]class ExternalRequestsClient(config.Parser):
"""Configure a Requests client for use with external HTTP services.
Requests made with this client **will not** include trace context and
:doc:`edge context </api/baseplate/lib/edgecontext>`. This client is
suitable for use with third party or untrusted services.
This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.
See :py:func:`http_adapter_from_config` for available configuration settings.
:param client_name: Custom name to be emitted under the http_client_name label
for prometheus metrics. Defaults back to session_cls.name if None
"""
def __init__(self, client_name: Optional[str] = None, **kwargs: Any) -> None:
self.client_name = client_name
self.kwargs = kwargs
def parse(self, key_path: str, raw_config: config.RawConfig) -> RequestsContextFactory:
adapter = http_adapter_from_config(raw_config, f"{key_path}.", **self.kwargs)
return RequestsContextFactory(
adapter, session_cls=BaseplateSession, client_name=self.client_name
)