chili-pepper
chili-pepper

Source code for chili_pepper.app

import builtins
import inspect
import json
import logging
from base64 import b64decode
from copy import deepcopy
from enum import Enum
from threading import Thread

import awacs
import boto3

from chili_pepper.config import Config
from chili_pepper.deployer import Deployer
from chili_pepper.exception import ChiliPepperException

try:
    from typing import List, Optional, Dict
except ImportError:
    # python2.7 doesn't have typing, and I don't want to mess with mypy yet
    pass


[docs]class InvalidFunctionSignature(ChiliPepperException): """Function Signature does not match required specifications Cloud providers require that functions have a specific signature. This exception is raised when a task does not match the required signature. """ pass
[docs]class InvocationError(ChiliPepperException): """Raised when there was a problem invoking the serverless function """ pass
[docs]class MissingArgumentError(ChiliPepperException): """ Raised when there is a missing or incorrect argument in a method """ pass
[docs]class Result: """Task result object Result wraps the information returned when the serverless function is invoked. """ def __init__(self, lambda_function_name, event): # type: (str, dict) -> None """ Args: lambda_function_name: The name of the invoked AWS Lambda function event: The event dictionary to pass to the AWS Lambda function """ self._logger = logging.getLogger(__name__) self._lambda_function_name = lambda_function_name self._event = event self._thread = None self._invoke_response = None
[docs] def start(self): """Start executing the serverless function Invokes the serverless function. For AWS, this invokes the Lambda in a thread, since the only way to get results is to call synchronously. By putting the invoke call in a therad, it will not block the main application thread. Returns: Thread: The thread running the lambda """ if self._thread is None: def lambda_run(): lambda_client = boto3.client("lambda") self._invoke_response = lambda_client.invoke(FunctionName=self._lambda_function_name, Payload=json.dumps(self._event), LogType="Tail") return self._thread = Thread(target=lambda_run) self._thread.start() return self._thread
def _join_invocation(self): """ Ensure the lambda thread has been executed, and joined with the main thread """ if self._thread is None: self.start() self._thread.join()
[docs] def get(self): """Get the response from the serverless execution. This is a potentially blocking call. It will retrieve the return payload from the serverless function. If it is called before the serverless function has finished, ``get`` will block until the serverless function returns. Raises: InvocationError: Raises if something goes retrieving the return payload of the serverless function. Returns: dict: The return payload of the serverless function """ self._join_invocation() # lambda has now been invoked and _invoke_response *should* be populated # TODO error handling # moto returns None for payload in python 3.6 if self._invoke_response["Payload"] is not None: payload = self._invoke_response["Payload"].read().decode("utf8") else: raise InvocationError("No invoke response, even though the AWS lambda function has been invoked.") self._logger.info("Got payload {payload} from thread {thread}".format(payload=payload, thread=self._thread)) return json.loads(payload)
[docs] def get_log_result(self): """ Get the log result from the serverless invocation. This is potentially a blocking call. Returns: str: The last 4 KB of the execution log """ self._join_invocation() if self._invoke_response["LogResult"] is not None: log_result = b64decode(self._invoke_response["LogResult"]).decode("utf8") self._logger.debug("Log result was populated") else: log_result = "" self._logger.debug("Log result was NOT populated") return log_result
[docs]class AppProvider(Enum): """Enum to identify the serverless provider. Currently unused. """ AWS = 1
[docs]class ChiliPepper:
[docs] def create_app(self, app_name, app_provider=AppProvider.AWS, config=None): # type: (str, AppProvider, Optional[Config]) -> App """[summary] Args: app_name ([type]): [description] app_provider ([type], optional): [description]. Defaults to AppProvider.AWS. config ([type], optional): [description]. Defaults to None. Raises: ChiliPepperException: [description] Returns: [type]: [description] """ if config is None: config = Config() if app_provider == AppProvider.AWS: return AwsApp(app_name, config) else: raise ChiliPepperException("Unknown app provider {app_provider}".format(app_provider=app_provider))
[docs]class TaskFunction: """A wrapper around python functions that can be serverlessly deployed and executed by chili-pepper """ def __init__(self, func, environment_variables=None, memory=None, timeout=None, tags=None, activate_tracing=False): # type: (builtins.function, Optional[Dict], Optional[int], Optional[int], Optional[dict], bool) -> None """ Args: func (builtins.function): The python function object environment_variables (dict, optional): Environment variables that will be passed to the serverles function. Defaults to None. memory [int, optional]: Memory value to allocate for the serverless function timeout [int, optional]: Timeout value for the serverless function tags [dict, optional]: Tags to add to the serverless function """ self._func = func self._environment_variables = environment_variables if environment_variables is not None else dict() self._memory = memory self._timeout = timeout self._tags = tags if tags is not None else dict() self._activate_tracing = activate_tracing @property def func(self): # type: () -> builtins.function """ Returns: builtins.function: The python function """ return self._func @property def environment_variables(self): # type: () -> Dict """ Returns: dict: The environment variable overrides for this function """ return self._environment_variables @property def memory(self): # type: () -> Optional[int] """ https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-memorysize Returns: Optional[int]: The memory allocation to grant to this serverless function """ return self._memory @property def timeout(self): # type: () -> Optional[int] """ https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-timeout Returns: Optional[int]: Timeout value for the serverless function """ return self._timeout @property def tags(self): # type() -> Dict """ https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-tags Returns: Dict: Tags for the serverless function """ return self._tags @property def activate_tracing(self): # type() -> bool """ https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-function.html#cfn-lambda-function-tracingconfig This will return ``True`` if and only if ``True`` was passed to the constructor - any other value, even if it is Truthy, will not activate tracing Returns: bool: ``True`` if tracing should be activate, ``False`` otherwise """ if self._activate_tracing is True: return True else: return False def __eq__(self, other): # type: (TaskFunction) -> bool return ( hasattr(other, "func") and self.func == other.func and hasattr(other, "environment_variables") and self.environment_variables == other.environment_variables ) def __ne__(self, other): # type: (TaskFunction) -> bool # need to implement because of python2.7 # https://docs.python.org/2.7/reference/datamodel.html#object.__ne__ return not (self == other) def __str__(self): # type: () -> str # TODO unhardcode the class name return "TaskFunction {my_module}.{my_func_name}".format(my_module=self._func.__module__, my_func_name=self.func.__name__)
[docs]class App: """Cloud-agnostic App class App is the main class for applications that use Chili-Pepper. """ def __init__(self, app_name, config=None): # type: (str, Optional[Config]) -> None """ Args: app_name: The application name config: Optional default config object """ if config is None: config = Config() self._app_name = app_name self.conf = config self._logger = logging.getLogger(__name__) self._task_functions = list() @property def app_name(self): # type: () -> str """ The application name. """ return self._app_name @property def task_functions(self): # type: () -> List[TaskFunction] """ The task functions identified with the ``@app.task`` decorator """ return self._task_functions
[docs] def task(self, environment_variables=None): # type: (Optional[Dict]) -> builtins.func """ The decorator to denote tasks. It must be implemented by cloud-specific App child classes. Args: environment_variables: Environment variables to apply to the task """ raise NotImplementedError()
[docs]class AwsAllowPermission: """ Simple wrapper around an AWS IAM rule allowing permission(s) to resource(s) """ def __init__(self, allow_actions, allow_resources, sid=None): # type: (List[str], List[str], Optional[str]) """ Args: allow_permissions ([List[str]): A list of AWS permissions to allow allow_resources (List[str]): A list of AWS resource arns to grant permmission to sid (Optional[str]): The Sid of the iam statement https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_sid.html """ if len(allow_actions) == 0: raise MissingArgumentError("You must grant access to at least 1 action") if len(allow_resources) == 0: raise MissingArgumentError("You must grant access to at least 1 resource") self._allow_actions = allow_actions self._allow_resources = allow_resources self._sid = sid @property def allow_actions(self): """ Returns: List[str]: Actions to grant permissions """ return self._allow_actions @property def allow_resources(self): """ Returns: List[str]: The resources that should be granted permissions """ return self._allow_resources @property def sid(self): """ Returns: Optional[str]: The sid of this policy statement """ return self._sid
[docs] def statement(self): """ Generate the statement object for these permissions Returns: awsacs.aws.Statement: The statement object granting permissions """ statement_kwargs = { "Effect": awacs.aws.Allow, "Action": [awacs.aws.Action(*action.split(":")) for action in self.allow_actions], "Resource": self.allow_resources, } if self.sid is not None: statement_kwargs["Sid"] = self.sid return awacs.aws.Statement(**statement_kwargs)
[docs]class AwsApp(App): @property def bucket_name(self): # type: () -> str """ The AWS S3 bucket name that holds the lambda deployment packages Returns: str: The bucket name """ return self.conf["aws"]["bucket_name"] @property def runtime(self): # type: () -> str """ The AWS lambda runtime identifier. .. _AWS lambda runtime documentation: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html Returns: str: The runtime identifier """ return self.conf["aws"]["runtime"] # TODO should runtime be set by sys.version_info? @property def kms_key_arn(self): # type: () -> Optional[str] """ The KMS key arn to use, or None to use the default AWS key Returns: Optional[str]: The KMS key arn, or None to use the default key """ kms_key_config_key = "kms_key" if kms_key_config_key in self.conf["aws"]: return self.conf["aws"][kms_key_config_key] else: return None @property def default_tags(self): """ Returns: Dict: The default tags to assign to all resources created when deploying this App """ if "default_tags" in self.conf["aws"] and self.conf["aws"]["default_tags"] is not None: return self.conf["aws"]["default_tags"] else: return dict() @property def allow_policy_permissions(self): """ Extra permissions to allow functions in this app Returns: List[AwsAllowPermission]: The extra permissions that should be granted """ allow_policy_permissions_key = "extra_allow_permissions" if allow_policy_permissions_key in self.conf["aws"] and self.conf["aws"][allow_policy_permissions_key]: allow_permissions = self.conf["aws"][allow_policy_permissions_key] else: allow_permissions = list() if self.kms_key_arn: chili_pepper_kms_key_permission_sid = "ChiliPepperGrantAccessToKmsKey" if chili_pepper_kms_key_permission_sid not in [p.sid for p in allow_permissions]: allow_permissions.append(AwsAllowPermission(["kms:Decrypt"], [self.kms_key_arn], sid=chili_pepper_kms_key_permission_sid)) return allow_permissions @property def subnet_ids(self): """ Subnet IDs for the lambda function https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-vpcconfig.html Returns: List[str]: The subnet IDs for the lambda function """ if "subnet_ids" in self.conf["aws"] and self.conf["aws"]["subnet_ids"] is not None: return self.conf["aws"]["subnet_ids"] else: return list() @property def security_group_ids(self): """ Security Group IDs for the lambda function https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-vpcconfig.html Returns: List[str]: The security group IDs for the lambda function """ if "security_group_ids" in self.conf["aws"] and self.conf["aws"]["security_group_ids"] is not None: return self.conf["aws"]["security_group_ids"] else: return list()
[docs] def task(self, environment_variables=None, memory=None, timeout=None, tags=None, activate_tracing=False): # type: (Optional[Dict], Optional[int], Optional[int], Optional[dict], bool) -> builtins.func if environment_variables is None: environment_variables = dict() if tags is None: tags = dict() def _decorator(func,): # Ensure that the function signature matches what lambda expects # otherwise it will not be callabale from lambda # https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html # TODO make this cloud-agnostic try: function_signature = inspect.signature(func) function_parameter_list = list(function_signature.parameters.keys()) except AttributeError: # python2.7 has different inspect module function_arg_spec = inspect.getargspec(func) function_parameter_list = list(function_arg_spec.args) if function_parameter_list != ["event", "context"]: raise InvalidFunctionSignature( "Chili-pepper requires that you task functions has 2 parameters - 'event' and 'context' to match what Lambda expects. Your function " + func.__module__ + "." + func.__name__ + " has these parameters: " + str(function_parameter_list) ) # combine the default and passed env vars default_environment_vars = self.conf["default_environment_variables"] task_environment_variables = deepcopy(default_environment_vars if default_environment_vars is not None else dict()) task_environment_variables.update(environment_variables) # combine the default and passed tags task_tags = deepcopy(self.default_tags) task_tags.update(tags) self._task_functions.append( TaskFunction( func, environment_variables=task_environment_variables, memory=memory, timeout=timeout, tags=task_tags, activate_tracing=activate_tracing ) ) def _delay_wrapper(event): # see https://docs.aws.amazon.com/lambda/latest/dg/python-programming-model-handler-types.html # the delay function arguments must be just the event argument # context is added by lambda """ plan of attack 1) Compute or look up the function name 2) call https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.invoke in a separate thread (since invoke only gives you useful feedback if you call it synchronously) 3) return a wrapper of the response, payload, logs, etc """ # TODO make this cloud agnostic, abstracting it depending on the cloud provider deployer = Deployer(self) lambda_function_name = deployer.get_function_id(func) # TODO alias/versioning support? result = Result(lambda_function_name, event) result.start() return result func.delay = _delay_wrapper return func return _decorator