Source code for keg.web

import inspect
import sys
from warnings import warn

from blazeutils.strings import case_cw2us, case_cw2dash
import flask
from flask import current_app, request
from flask.views import MethodView, http_method_funcs
try:
    from flask.views import MethodViewType
except ImportError:
    MethodViewType = None
from jinja2 import TemplateNotFound
from werkzeug.datastructures import MultiDict

from keg.extensions import lazy_gettext as _
from keg.utils import validate_arguments, ArgumentValidationError


class ImmediateResponse(Exception):
    def __init__(self, response):
        self.response = response


def handle_immediate_response(exc):
    return exc.response


def flash_abort(message):
    flask.flash(message, 'error')
    raise ImmediateResponse


[docs]def redirect(endpoint, *args, **kwargs): if endpoint.upper() == endpoint: keg_endpoints = flask.current_app.config['KEG_ENDPOINTS'] if endpoint.lower() in keg_endpoints: endpoint = keg_endpoints[endpoint.lower()] resp = flask.redirect(flask.url_for(endpoint, *args, **kwargs)) raise ImmediateResponse(resp)
class ViewArgumentError(Exception): pass def _werkzeug_multi_dict_conv(md): ''' Werzeug Multi-Dicts are either flat or lists, but we want a single value if only one value or a list if multiple values ''' retval = {} for key, value in md.to_dict(flat=False).items(): if len(value) == 1: retval[key] = value[0] else: retval[key] = value return retval def _call_with_expected_args(view, calling_args, method, method_is_bound=True): """ handle argument conversion to what the method accepts """ if isinstance(method, str): if not hasattr(view, method): return method = getattr(view, method) try: # validate_arguments is made for a function, not a class method # so we need to "trick" it by sending self here, but then # removing it before the bound method is called below pos_args = (view,) if method_is_bound else tuple() args, kwargs = validate_arguments(method, pos_args, calling_args.copy()) except ArgumentValidationError as e: msg = _('Argument mismatch occured: method={method}, missing={missing}, ' 'extra_keys={extra_keys}, extra_pos={extra_pos}.' ' Arguments available: {calling_args}', method=method, missing=e.missing, extra_keys=e.extra, extra_pos=e.extra_positional, calling_args=calling_args) raise ViewArgumentError(msg) if method_is_bound: # remove "self" from args since its a bound method args = args[1:] return method(*args, **kwargs) class _OldViewMeta(MethodViewType or object): def __init__(cls, name, bases, d): MethodViewType.__init__(cls, name, bases, d) # Assuming child views will always have a blueprint OR they are intended to be used like # abstract classes and will never be routed to directly. Apps/libs will need to call # assign_blueprint directly if a blueprint is not set in the view class definition, but # the view is intended to receive routes. if cls.blueprint is not None: cls.assign_blueprint(cls.blueprint) _ViewMeta = _OldViewMeta if MethodViewType is not None else type
[docs]class BaseView(MethodView, metaclass=_ViewMeta): """ Base class for all Keg views to inherit from. `BaseView` automatically calculates and installs routing, templating, and responding methods for HTTP verb named functions. .. literalinclude:: /examples/BaseView.py """ blueprint = None url = None template_name = None _index_method = None require_authentication = False auto_assign = tuple() # names of qs arguments that should be merged w/ URL arguments and passed to view methods expected_qs_args = [] def __init_subclass__(cls, **kwargs): """Flask before 2.2.0 used a metaclass to perform view setup, but this changed to using `init_subclass`. If the old way is enabled, no need to do anything but call the super here.""" super().__init_subclass__(**kwargs) if MethodViewType is None and cls.blueprint is not None: cls.assign_blueprint(cls.blueprint) def __init__(self, responding_method=None): self.responding_method = responding_method
[docs] def calc_responding_method(self): if self.responding_method: method_name = self.responding_method # sometimes the real method name is based on the name of the method combined with the # HTTP method/verb. method_name_verb = '{}_{}'.format(method_name, request.method.lower()) if not hasattr(self, method_name) and hasattr(self, method_name_verb): method_name = method_name_verb else: method_name = request.method.lower() # If the request method is HEAD and we don't have a handler for it # retry with GET. if request.method == 'HEAD' and not hasattr(self, 'head'): method_name = 'get' method_obj = getattr(self, method_name, None) assert method_obj is not None, _('Unimplemented method {name}', name=method_name) return method_obj
[docs] def dispatch_request(self, **kwargs): self.template_args = {} calling_args = self.process_calling_args(kwargs) self._calling_args = calling_args _call_with_expected_args(self, calling_args, 'pre_auth') _call_with_expected_args(self, calling_args, 'check_auth') _call_with_expected_args(self, calling_args, 'pre_loaders') self.call_loaders(calling_args) _call_with_expected_args(self, calling_args, 'pre_method') method_obj = self.calc_responding_method() response = _call_with_expected_args(self, calling_args, method_obj) # The following condition is intended to guard against None or no return # from the responding method. But, an empty string is a valid response that # would fail the falsy test, so check that one specifically. if not response and response != '': self.process_auto_assign() _call_with_expected_args(self, calling_args, 'pre_render') response = self.render() calling_args['_response'] = response pre_response = _call_with_expected_args(self, calling_args, 'pre_response') if pre_response is not None: return pre_response return response
[docs] def process_calling_args(self, urlargs): # start with query string arguments that are expected args = MultiDict() if self.expected_qs_args: for k in request.args.keys(): if k in self.expected_qs_args: args.setlist(k, request.args.getlist(k)) # add URL arguments, replacing GET arguments if they are there. URL # arguments get precedence and we don't want to just .update() # because that would allow arbitrary get arguments to affect the # values of the URL arguments for k, v in urlargs.items(): args[k] = v return _werkzeug_multi_dict_conv(args)
[docs] def call_loaders(self, calling_args): for attr_name, method_obj in inspect.getmembers(self, inspect.ismethod): if not attr_name.endswith('_loader'): continue arg_key = attr_name[:-7] retval = _call_with_expected_args(self, calling_args, method_obj) if retval is None: flask.abort(404) # have to add it to internal variable b/c calling_args is a copy, not the actual object self._calling_args[arg_key] = retval
[docs] def check_auth(self): pass
# if self.require_authentication and not current_user.is_authenticated(): # flask.abort(401)
[docs] def process_auto_assign(self): for key in self.auto_assign: self.assign(key, getattr(self, key))
[docs] def calc_class_fname(self, use_us=False): if use_us: return case_cw2us(self.__class__.__name__) return case_cw2dash(self.__class__.__name__)
[docs] def calc_template_name(self, use_us=False): if self.template_name is not None: return self.template_name template_path = '{}.html'.format(self.calc_class_fname(use_us=use_us)) blueprint_name = request.blueprint if blueprint_name: template_path = '{}/{}'.format(blueprint_name, template_path) # This block and `use_us` usage can be removed when we drop # template names generated with underscores. jinja_env = current_app.jinja_env try: jinja_env.get_template(template_path) except TemplateNotFound: raise_original_exception = False if not use_us: # Fall back to underscore-named templates for now, but warn if # that succeeds try: template_path = self.calc_template_name(use_us=True) warn( 'Templates named by underscore-notated class names are ' 'deprecated and will not be supported. Rename the template ' 'files using dashes.', DeprecationWarning, stacklevel=2 ) except TemplateNotFound: # Neither template exists. Raise the exception from the dashed name raise_original_exception = True if use_us or raise_original_exception: raise return template_path
[docs] def assign(self, key, value): self.template_args[key] = value
[docs] def render(self): return flask.render_template(self.calc_template_name(), **self.template_args)
[docs] @classmethod def calc_url(cls, use_blueprint=True): # calc_url will generally return the full url to reach a view. Turn use_blueprint off when # setting up the blueprint itself (see init_routes and init_blueprint) prefix = (cls.blueprint.url_prefix or '') if cls.blueprint and use_blueprint else '' if cls.url is not None: return prefix + cls.url return prefix + '/' + case_cw2dash(cls.__name__)
[docs] @classmethod def calc_endpoint(cls, use_blueprint=True): # calc_endpoint will generally return the full endpoint for a view. Turn use_blueprint off # when setting up the blueprint itself (see init_routes and init_blueprint) prefix = (cls.blueprint.name + '.') if cls.blueprint and use_blueprint else '' return prefix + case_cw2dash(cls.__name__)
[docs] @classmethod def init_routes(cls): cls.url_rules = [] cls.view_funcs = {} def method_with_rules(obj): # isroutine() matches functions (PY3) and unbound methods (PY2). return inspect.isroutine(obj) and hasattr(obj, '_keg_rules') for method_name, method_obj in inspect.getmembers(cls, predicate=method_with_rules): for rule, options in method_obj._keg_rules: method_route = MethodRoute(method_name, rule, options, cls.calc_url(use_blueprint=False), cls.calc_endpoint(use_blueprint=False)) mr_options = method_route.options() if method_route.endpoint not in cls.view_funcs: view_func = cls.as_view(method_route.view_func_name, method_route.sanitized_method_name('_')) cls.view_funcs[method_route.endpoint] = view_func mr_options['view_func'] = cls.view_funcs[method_route.endpoint] cls.blueprint.add_url_rule(method_route.rule(), **mr_options) if method_name in http_method_funcs and cls.methods: # A @route() is being used on a method with the same name as an HTTP verb. # Since this method is being explicitly routed, the automatic rule that would # be created due to MethodView logic should not apply. cls.methods.remove(method_name.upper())
[docs] @classmethod def init_blueprint(cls, rules): endpoint = cls.calc_endpoint(use_blueprint=False) class_url = cls.calc_url(use_blueprint=False) if cls.methods: # Flask assigns the endpoint to the __name__ atribute of the function it creates and # therefore the endpoint must be a `str`. str_endpoint = str(endpoint) view_func = cls.as_view(str_endpoint) for rule, options in rules: if rule and rule.startswith('/'): class_url = rule elif not rule: rule = class_url else: rule = '{}/{}'.format(class_url, rule) cls.blueprint.add_url_rule(rule, endpoint=endpoint, view_func=view_func, **options) if not rules: cls.blueprint.add_url_rule(class_url, endpoint=endpoint, view_func=view_func) for rule, options in cls.url_rules: cls.blueprint.add_url_rule(rule, **options)
[docs] @classmethod def assign_blueprint(cls, blueprint): if not blueprint: raise Exception('blueprint {} could not be assigned'.format(blueprint)) cls.blueprint = blueprint rules = [] if hasattr(cls, '_rules'): rules = cls._rules del cls._rules cls.init_routes() cls.init_blueprint(rules)
class MethodRoute(object): def __init__(self, method_name, rule, options, parent_url, parent_endpoint): self.method_name = method_name self._rule = rule # We use destructive operations on options, so make a copy. self._options = options.copy() self.parent_url = parent_url self.parent_endpoint = parent_endpoint def sanitized_method_name(self, separator='-'): method_identifier = self.method_name.replace('_', separator) # method names can be given like foo_get() and foo_post() which indicate the HTTP verb # used but shouldn't affect the URL. parts = method_identifier.split(separator) method_name_suffix = parts[-1] if len(parts) > 1 and method_name_suffix in http_method_funcs: last_position = (len(method_name_suffix) + 1) * -1 return method_identifier[:last_position] return method_identifier @property def endpoint(self): return '{}:{}'.format(self.parent_endpoint, self.sanitized_method_name()) @property def view_func_name(self): return str('{}_{}'.format(self.parent_endpoint, self.sanitized_method_name('_'))) def rule(self): method_url = '{}/{}'.format(self.parent_url, self.sanitized_method_name()) # Handle no rule given by @route(). if not self._rule: return method_url # Handle a relative rule given by @route(). if not self._rule.startswith('/'): return '{}/{}'.format(method_url, self._rule) return self._rule def options(self): self._options['endpoint'] = self.endpoint return self._options def _methods_calc(get, post, post_only, methods): if methods is None: methods = set() else: methods = set(methods) if get and not post_only: methods.add('GET') if post or post_only: methods.add('POST') return methods def route(rule=None, get=True, post=False, post_only=False, methods=None, **options): options['methods'] = _methods_calc(get, post, post_only, methods) def wrapper(func): if not hasattr(func, '_keg_rules'): func._keg_rules = [] func._keg_rules.append((rule, options)) return func return wrapper def rule(rule=None, get=True, post=False, post_only=False, methods=None, **options): parent_locals = sys._getframe(1).f_locals rules = parent_locals.setdefault('_rules', []) options['methods'] = _methods_calc(get, post, post_only, methods) rules.append((rule, options))