chili-pepper
Table Of Contents
chili-pepper
Table Of Contents

Source code for chili_pepper.app

import builtins
import inspect
import json
import logging
from copy import deepcopy
from enum import Enum
from threading import Thread

import boto3

from chili_pepper.config import Config
from chili_pepper.deployer import Deployer
from chili_pepper.exception import ChiliPepperException

try:
    from typing import List, Optional, Dict
except ImportError:
    # python2.7 doesn't have typing, and I don't want to mess with mypy yet
    pass


[docs]class InvalidFunctionSignature(ChiliPepperException): """Function Signature does not match required specifications Cloud providers require that functions have a specific signature. This exception is raised when a task does not match the required signature. """ pass
[docs]class InvocationError(ChiliPepperException): """Raised when there was a problem invoking the serverless function """ pass
[docs]class Result: """Task result object Result wraps the information returned when the serverless function is invoked. """ def __init__(self, lambda_function_name, event): # type: (str, dict) -> None """ Args: lambda_function_name: The name of the invoked AWS Lambda function event: The event dictionary to pass to the AWS Lambda function """ self._logger = logging.getLogger(__name__) self._lambda_function_name = lambda_function_name self._event = event self._thread = None self._invoke_response = None
[docs] def start(self): """Start executing the serverless function Invokes the serverless function. For AWS, this invokes the Lambda in a thread, since the only way to get results is to call syncronously. By putting the invoke call in a therad, it will not block the main application thread. Returns: Thread: The thread running the lambda """ if self._thread is None: def lambda_run(): lambda_client = boto3.client("lambda") self._invoke_response = lambda_client.invoke(FunctionName=self._lambda_function_name, Payload=json.dumps(self._event)) return self._thread = Thread(target=lambda_run) self._thread.start() return self._thread
[docs] def get(self): """Get the response from the serverless execution. This is a potentially blocking call. It will retrieve the return payload from the serverless function. If it is called before the serverless function has finished, ``get`` will block until the serverless function returns. Raises: InvocationError: Raises if something goes retrieving the return payload of the serverless function. Returns: dict: The return payload of the serverless function """ if self._thread is None: self.start() self._thread.join() # lambda has now been invoked and _invoke_response *should* be populated # TODO error handling, logs from the lambda, etc # moto returns None for payload in python 3.6 if self._invoke_response["Payload"] is not None: payload = self._invoke_response["Payload"].read().decode("utf8") else: raise InvocationError("No invoke response, even though the AWS lambda function has been invoked.") self._logger.info("Got payload {payload} from thread {thread}".format(payload=payload, thread=self._thread)) return json.loads(payload)
[docs]class AppProvider(Enum): """Enum to identify the serverless provider. Currently unused. """ AWS = 1
[docs]class ChiliPepper:
[docs] def create_app(self, app_name, app_provider=AppProvider.AWS, config=None): # type: (str, AppProvider, Optional[Config]) -> App """[summary] Args: app_name ([type]): [description] app_provider ([type], optional): [description]. Defaults to AppProvider.AWS. config ([type], optional): [description]. Defaults to None. Raises: ChiliPepperException: [description] Returns: [type]: [description] """ if config is None: config = Config() if app_provider == AppProvider.AWS: return AwsApp(app_name, config) else: raise ChiliPepperException("Unknown app provider {app_provider}".format(app_provider=app_provider))
[docs]class TaskFunction: """A wrapper around python functions that can be serverlessly deployed and executed by chili-pepper """ def __init__(self, func, environment_variables=None): # type: (builtins.function, Optional[Dict]) -> None """ Args: func (builtins.function): The python function object environment_variables (dict, optional): Environment variables that will be passed to the serverles function. Defaults to None. """ self._func = func self._environment_variables = environment_variables if environment_variables is not None else dict() @property def func(self): # type: () -> builtins.function """ Returns: builtins.function: The python function """ return self._func @property def environment_variables(self): # type: () -> Dict """ Returns: dict: The environment variable overrides for this function """ return self._environment_variables def __eq__(self, other): # type: (TaskFunction) -> bool return ( hasattr(other, "func") and self.func == other.func and hasattr(other, "environment_variables") and self.environment_variables == other.environment_variables ) def __ne__(self, other): # type: (TaskFunction) -> bool # need to implement because of python2.7 # https://docs.python.org/2.7/reference/datamodel.html#object.__ne__ return not (self == other) def __str__(self): # type: () -> str # TODO unhardcode the class name return "TaskFunction {my_module}.{my_func_name}".format(my_module=self._func.__module__, my_func_name=self.func.__name__)
[docs]class App: """Cloud-agnostic App class App is the main class for applications that use Chili-Pepper. """ def __init__(self, app_name, config=None): # type: (str, Optional[Config]) -> None """ Args: app_name: The application name config: Optional default config object """ if config is None: config = Config() self._app_name = app_name self.conf = config self._logger = logging.getLogger(__name__) self._task_functions = list() @property def app_name(self): # type: () -> str """ The application name. """ return self._app_name @property def task_functions(self): # type: () -> List[TaskFunction] """ The task functions identified with the ``@app.task`` decorator """ return self._task_functions
[docs] def task(self, environment_variables=None): # type: (Optional[Dict]) -> builtins.func """ The decorator to denote tasks. It must be implemented by cloud-specific App child classes. Args: environment_variables: Environment variables to apply to the task """ raise NotImplementedError()
[docs]class AwsApp(App): @property def bucket_name(self): # type: () -> str """ The AWS S3 bucket name that holds the lambda deployment packages """ return self.conf["aws"]["bucket_name"] @property def runtime(self): # type: () -> str """ The AWS lambda runtime identifier. .. _AWS lambda runtime documentation: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html """ return self.conf["aws"]["runtime"] # TODO should runtime be set by sys.version_info?
[docs] def task(self, environment_variables=None): # type: (Optional[Dict]) -> builtins.func if environment_variables is None: environment_variables = dict() def _decorator(func,): # Ensure that the function signature matches what lambda expects # otherwise it will not be callabale from lambda # https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html # TODO make this cloud-agnostic try: function_signature = inspect.signature(func) function_parameter_list = list(function_signature.parameters.keys()) except AttributeError: # python2.7 has different inspect module function_arg_spec = inspect.getargspec(func) function_parameter_list = list(function_arg_spec.args) if function_parameter_list != ["event", "context"]: raise InvalidFunctionSignature( "Chili-pepper requires that you task functions has 2 parameters - 'event' and 'context' to match what Lambda expects. Your function " + func.__module__ + "." + func.__name__ + " has these parameters: " + str(function_parameter_list) ) # combine the default and default env vars task_environment_variables = deepcopy(self.conf["default_environment_variables"]) task_environment_variables.update(environment_variables) self._task_functions.append(TaskFunction(func, environment_variables=task_environment_variables)) def _delay_wrapper(event): # see https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html # the delay function arguments must be just the event argument # context is added by lambda """ plan of attack 1) Compute or look up the function name 2) call https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.invoke in a separate thread (since invoke only gives you useful feedback if you call it synchronously) 3) return a wrapper of the response, payload, logs, etc """ # TODO make this cloud agnostic, abstracting it depending on the cloud provider deployer = Deployer(self) lambda_function_name = deployer.get_function_id(func) # TODO alias/versioning support? result = Result(lambda_function_name, event) result.start() return result func.delay = _delay_wrapper return func return _decorator