Source code for baseplate.lib.experiments

import json
import logging

from enum import Enum
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Set

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.edge_context import User
from import DebugLogger
from import EventLogger
from baseplate.lib.experiments.providers import parse_experiment
from baseplate.lib.experiments.providers.base import Experiment
from baseplate.lib.file_watcher import FileWatcher
from baseplate.lib.file_watcher import WatchedFileNotAvailableError

logger = logging.getLogger(__name__)

class EventType(Enum):
    EXPOSE = "expose"
    BUCKET = "choose"

[docs]class ExperimentsContextFactory(ContextFactory): """Experiment client context factory. This factory will attach a new :py:class:`baseplate.lib.experiments.Experiments` to an attribute on the :py:class:`~baseplate.RequestContext`. :param path: Path to the experiment configuration file. :param event_logger: The logger to use to log experiment eligibility events. If not provided, a :py:class:`` will be created and used. :param timeout: How long, in seconds, to block instantiation waiting for the watched experiments file to become available (defaults to not blocking). :param backoff: retry backoff time for experiments file watcher. Defaults to None, which is mapped to DEFAULT_FILEWATCHER_BACKOFF. """ def __init__( self, path: str, event_logger: Optional[EventLogger] = None, timeout: Optional[float] = None, backoff: Optional[float] = None, ): self._filewatcher = FileWatcher(path, json.load, timeout=timeout, backoff=backoff) self._event_logger = event_logger def make_object_for_context(self, name: str, span: Span) -> "Experiments": return Experiments( config_watcher=self._filewatcher, server_span=span, context_name=name, event_logger=self._event_logger, )
[docs]class Experiments: """Access to experiments with automatic refresh when changed. This experiments client allows access to the experiments cached on disk by the experiment configuration fetcher daemon. It will automatically reload the cache when changed. This client also handles logging bucketing events to the event pipeline when it is determined that the request is part of an active variant. """ def __init__( self, config_watcher: FileWatcher, server_span: Span, context_name: str, event_logger: Optional[EventLogger] = None, ): self._config_watcher = config_watcher self._span = server_span self._context_name = context_name self._already_bucketed: Set[str] = set() self._experiment_cache: Dict[str, Optional[Experiment]] = {} if event_logger: self._event_logger = event_logger else: self._event_logger = DebugLogger() def _get_config(self, name: str) -> Optional[Dict[str, str]]: try: config_data = self._config_watcher.get_data() return config_data[name] except WatchedFileNotAvailableError as exc: logger.warning("Experiment config unavailable: %s", str(exc)) except KeyError:"Experiment <%r> not found in experiment config", name) except TypeError as exc: logger.warning("Could not load experiment config: %s", str(exc)) return None def _get_experiment(self, name: str) -> Optional[Experiment]: if name not in self._experiment_cache: experiment_config = self._get_config(name) if not experiment_config: experiment = None else: try: experiment = parse_experiment(experiment_config) except Exception as err: logger.error("Invalid configuration for experiment %s: %s", name, err) return None self._experiment_cache[name] = experiment return self._experiment_cache[name]
[docs] def get_all_experiment_names(self) -> Sequence[str]: """Return a list of all valid experiment names from the configuration file. :return: List of all valid experiment names. """ cfg = self._config_watcher.get_data() experiment_names = list(cfg.keys()) return experiment_names
[docs] def is_valid_experiment(self, name: str) -> bool: """Return true if the provided experiment name is a valid experiment. :param name: Name of the experiment you want to check. :return: Whether or not a particular experiment is valid. """ return self._get_experiment(name) is not None
[docs] def variant( self, name: str, user: Optional[User] = None, bucketing_event_override: Optional[bool] = None, **kwargs: str, ) -> Optional[str]: r"""Return which variant, if any, is active. If a variant is active, a bucketing event will be logged to the event pipeline unless any one of the following conditions are met: 1. bucketing_event_override is set to False. 2. The experiment specified by "name" explicitly disables bucketing events. 3. We have already logged a bucketing event for the value specified by ``experiment.get_unique_id(\*\*kwargs)`` within the current request. Since checking the status an experiment will fire a bucketing event, it is best to only check the variant when you are making the decision that will expose the experiment to the user. If you absolutely must check the status of an experiment before you are sure that the experiment will be exposed to the user, you can use `bucketing_event_override` to disabled bucketing events for that check. :param name: Name of the experiment you want to run. :param user: User object for the user you want to check the experiment variant for. If you set user, the experiment parameters for that user ("user_id", "logged_in", and "user_roles") will be extracted and added to the inputs to the call to Experiment.variant. The user's event_fields will also be extracted and added to the bucketing event if one is logged. It is recommended that you provide a value for user rather than setting the user parameters manually in ``kwargs``. :param bucketing_event_override: Set if you need to override the default behavior for sending bucketing events. This parameter should be set sparingly as it breaks the assumption that you will fire a bucketing event when you first check the state of an experiment. If set to False, will never send a bucketing event. If set to None, no override will be applied. Set to None by default. Note that setting bucketing_event_override to True has no effect, it will behave the same as when it is set to None. :param kwargs: Arguments that will be passed to experiment.variant to determine bucketing, targeting, and overrides. These values will also be passed to the logger. :return: Variant name if a variant is active, None otherwise. """ experiment = self._get_experiment(name) if experiment is None: return None inputs = dict(kwargs) if user: inputs.update(user.event_fields()) variant = experiment.variant(**inputs) bucketing_id = experiment.get_unique_id(**inputs) do_log = True if not bucketing_id: do_log = False if variant is None: do_log = False if bucketing_event_override is False: do_log = False if bucketing_id and bucketing_id in self._already_bucketed: do_log = False do_log = do_log and experiment.should_log_bucketing() if do_log: assert bucketing_id self._event_logger.log( experiment=experiment, variant=variant, user_id=inputs.get("user_id"), logged_in=inputs.get("logged_in"), cookie_created_timestamp=inputs.get("cookie_created_timestamp"), app_name=inputs.get("app_name"), event_type=EventType.BUCKET, inputs=inputs, span=self._span, ) self._already_bucketed.add(bucketing_id) return variant
[docs] def expose( self, experiment_name: str, variant_name: str, user: Optional[User] = None, **kwargs: str ) -> None: """Log an event to indicate that a user has been exposed to an experimental treatment. :param experiment_name: Name of the experiment that was exposed. :param variant_name: Name of the variant that was exposed. :param user: User object for the user you want to check the experiment variant for. If unset, it is expected that user_id and logged_in values will be set in the keyword arguments. :param kwargs: Additional arguments that will be passed to logger. """ experiment = self._get_experiment(experiment_name) if experiment is None: return inputs = dict(kwargs) if user: inputs.update(user.event_fields()) self._event_logger.log( experiment=experiment, variant=variant_name, user_id=inputs.get("user_id"), logged_in=inputs.get("logged_in"), cookie_created_timestamp=inputs.get("cookie_created_timestamp"), app_name=inputs.get("app_name"), event_type=EventType.EXPOSE, inputs=inputs, span=self._span, )
[docs]def experiments_client_from_config( app_config: config.RawConfig, event_logger: EventLogger, prefix: str = "experiments." ) -> ExperimentsContextFactory: """Configure and return an :py:class:`ExperimentsContextFactory` object. The keys useful to :py:func:`experiments_client_from_config` should be prefixed, e.g. ``experiments.path``, etc. Supported keys: ``path``: the path to the experiment configuration file generated by the experiment configuration fetcher daemon. ``timeout`` (optional): the time that we should wait for the file specified by ``path`` to exist. Defaults to `None` which is `infinite`. :param raw_config: The application configuration which should have settings for the experiments client. :param event_logger: The EventLogger to be used to log bucketing events. :param prefix: the prefix used to filter keys (defaults to "experiments."). :param backoff: retry backoff time for experiments file watcher. Defaults to None, which is mapped to DEFAULT_FILEWATCHER_BACKOFF. """ assert prefix.endswith(".") config_prefix = prefix[:-1] cfg = config.parse_config( app_config, { config_prefix: { "path": config.Optional(config.String, default="/var/local/experiments.json"), "timeout": config.Optional(config.Timespan), "backoff": config.Optional(config.Timespan), } }, ) options = getattr(cfg, config_prefix) # pylint: disable=maybe-no-member if options.timeout: timeout = options.timeout.total_seconds() else: timeout = None if options.backoff: backoff = options.backoff.total_seconds() else: backoff = None return ExperimentsContextFactory(options.path, event_logger, timeout=timeout, backoff=backoff)