Source code for keg.cli

from collections import defaultdict
from contextlib import contextmanager
from itertools import chain
import urllib

import click
import flask
import flask.cli

from keg import current_app
from keg.extensions import gettext as _


try:
    import dotenv
except ImportError:
    dotenv = None

try:
    from flask.helpers import get_load_dotenv
except ImportError:
    def get_load_dotenv(default=False):
        return False


class KegAppGroup(flask.cli.AppGroup):
    def __init__(self, create_app, add_default_commands=True, load_dotenv=True, *args, **kwargs):
        self.create_app = create_app
        self.load_dotenv = load_dotenv

        flask.cli.AppGroup.__init__(self, *args, **kwargs)
        if add_default_commands:
            self.add_command(dev_command)

        self._loaded_plugin_commands = False

    def _load_plugin_commands(self):
        if self._loaded_plugin_commands:
            return
        entry_point_iterables = []

        # older Flask
        try:
            import pkg_resources
            entry_point_iterables.extend([
                pkg_resources.iter_entry_points('flask.commands'),
                pkg_resources.iter_entry_points('keg.commands'),
            ])
        except ImportError:
            return

        # newer Flask
        try:
            # uses importlib, but python has some API variations. Let flask handle that.
            from flask.cli import metadata
            entry_point_iterables.extend([
                metadata.entry_points(group="flask.commands"),
                metadata.entry_points(group="keg.commands"),
            ])
        except ImportError:
            pass

        for ep in chain.from_iterable(entry_point_iterables):
            self.add_command(ep.load(), ep.name)
        self._loaded_plugin_commands = True

    def _load_app(self, ctx):
        self._load_plugin_commands()

        info = ctx.ensure_object(flask.cli.ScriptInfo)
        info.load_app()

    def list_commands(self, ctx):
        self._load_app(ctx)
        rv = set(click.Group.list_commands(self, ctx))
        return sorted(rv)

    def get_command(self, ctx, name):
        self._load_app(ctx)
        return click.Group.get_command(self, ctx, name)

    def main(self, *args, **kwargs):
        if get_load_dotenv(self.load_dotenv):
            flask.cli.load_dotenv()

        obj = kwargs.get('obj')
        if obj is None:
            obj = flask.cli.ScriptInfo(create_app=self.create_app)
        kwargs['obj'] = obj
        # TODO: figure out if we want to use this next line.
        #kwargs.setdefault('auto_envvar_prefix', 'FLASK')
        return flask.cli.AppGroup.main(self, *args, **kwargs)


@click.group('develop', help=_('Developer info and utils.'))
def dev_command():
    pass


dev_command.add_command(flask.cli.run_command)
dev_command.add_command(flask.cli.shell_command)


@dev_command.command('routes', short_help=_('List the routes defined for this app.'))
@flask.cli.with_appcontext
def routes_command():
    output = []
    endpoint_len = 0
    methods_len = 0

    # calculate how wide the output columns need to be
    for rule in flask.current_app.url_map.iter_rules():
        methods = ','.join(rule.methods)
        endpoint_len = max(endpoint_len, len(rule.endpoint))
        methods_len = max(methods_len, len(methods))

    # generate the output
    for rule in flask.current_app.url_map.iter_rules():
        methods = ','.join(rule.methods)
        line = urllib.parse.unquote("{}   {}   {}".format(
            rule.endpoint.ljust(endpoint_len), methods.ljust(methods_len), rule))
        output.append(line)

    for line in sorted(output):
        click.echo(line)


@dev_command.command('templates', short_help=_('Show paths searched for a template.'))
@flask.cli.with_appcontext
def templates_command():
    jinja_loader = flask.current_app.jinja_env.loader
    paths = defaultdict(list)
    for template in jinja_loader.list_templates():
        for loader, template in jinja_loader._iter_loaders(template):
            for dpath in loader.searchpath:
                paths[dpath].append(template)
    for dpath in sorted(paths.keys()):
        template_paths = sorted(paths[dpath])
        click.echo(dpath)
        for tpath in template_paths:
            click.echo('    {}'.format(tpath))


@dev_command.command('config', short_help=_('List info related to config files, profiles, and'
                     ' values.'))
@flask.cli.with_appcontext
def config_command():
    app = flask.current_app
    config = app.config
    keys = list(config.keys())
    keys.sort()

    click.echo(_('Default config objects:'))
    for val in config.default_config_locations_parsed():
        click.echo('    {}'.format(val))

    click.echo(_('Config file locations:'))
    for val in config.config_file_paths():
        click.echo('    {}'.format(val))

    if config.config_paths_unreadable:
        click.echo(_('Could not access the following config paths:'))
        for path, exc in config.config_paths_unreadable:
            click.echo('    {}: {}'.format(path, str(exc)))

    click.echo(_('Config objects used:'))
    for val in config.configs_found:
        click.echo('    {}'.format(val))

    click.echo(_('Resulting app config (including Flask defaults):'))
    for key in keys:
        click.echo('    {} = {}'.format(key, config[key]))


class DatabaseGroup(click.MultiCommand):

    def list_commands(self, ctx):
        return ['clear', 'init']

    def get_command(self, ctx, name):
        if name == 'init':
            return database_init
        if name == 'clear':
            return database_clear


@dev_command.command('db', cls=DatabaseGroup, invoke_without_command=True,
                     help=_('Lists database related sub-commands.'))
@flask.cli.with_appcontext
@click.pass_context
def database_group(ctx):
    # only take action if no subcommand is involved.
    if ctx.invoked_subcommand is None:
        if not current_app.db_enabled:
            click.echo(_('Database not enabled for this app.  No subcommands available.'))
        else:
            # Database enabled, but no subcommand was given, therefore we want to just show
            # the help message, which would be the default behavior if we had not used the
            # invoke_Without_command option.
            click.echo(ctx.get_help())
            ctx.exit()


@click.command('init', short_help=_('Create all db objects, send related events.'))
@click.option('--clear-first', default=False, is_flag=True,
              help=_('Clear DB of all data and drop all objects before init.'))
@click.option('--yes', default=False, is_flag=True, help="Force confirmation")
@flask.cli.with_appcontext
def database_init(clear_first, yes):
    prompt = clear_first and not yes
    if prompt and not click.confirm(_('Are you sure? You will delete all the data!')):
        click.echo('Database untouched')
        return

    if clear_first:
        current_app.db_manager.db_init_with_clear()
        click.echo(_('Database cleared and initialized'))
    else:
        current_app.db_manager.db_init()
        click.echo(_('Database initialized'))


@click.command('clear', short_help=_('Clear DB of all data and drop all objects.'))
@click.option('--yes', default=False, is_flag=True, help="Force confirmation")
@flask.cli.with_appcontext
def database_clear(yes):
    if yes or click.confirm(_('Are you sure? You will delete all the data!')):
        current_app.db_manager.db_clear()
        click.echo(_('Database cleared'))
    else:
        click.echo(_('Database untouched'))


class CLILoader(object):
    """
        This loader takes care of the complexity of click object setup and instantiation in the
        correct order so that application level CLI options are available before the Keg app is
        instantiated (so the options can be used to configure the app).

        The order of events is:

        - instantiate KegAppGroup
        - KegAppGroup.main() is called
            1. the ScriptInfo object is instantiated
            2. normal click behavior starts, including argument parsing
                - arguments are always parsed due to invoke_without_command=True
            3. during argument parsing, option callbacks are excuted which will result in at least
               the profile name being saved in ScriptInfo().data
            4. Normal click .main() behavior will continue which could include processing commands
               decorated with flask.cli.with_appcontext() or calls to KegAppGroup.list_commands()
               or KegAppGroup.get_command().
                - ScriptInfo.init_app() will be called during any of these operations
                - ScriptInfo.init_app() will call self.create_app() below with the ScriptInfo
                  instance
    """
    def __init__(self, appcls):
        self.appcls = appcls
        # Don't store instance-level vars here.  This object is only instantiated once per Keg
        # sub-class but can be used across multiple app instance creations.  So, anything app
        # instance specific should go on the ScriptInfo instance (see self.options_callback()).

    def create_group(self):
        """ Create the top most click Group instance which is the entry point for any Keg app
            being called in a CLI context.

            The return value of this context gets set on Keg.cli
        """

        return KegAppGroup(
            self.create_app,
            params=self.create_script_options(),
            callback=self.main_callback,
            invoke_without_command=True,
        )

    def create_app(self, script_info=False):
        """ Instantiate our app, sending CLI option values through as needed.

            `script_info:` required for Flask 1.X create_app signature, but not valid in Flask 2.1.
            So keep it as an arg, but don't use it to get the object.  See #163.
        """
        script_info = click.get_current_context().obj
        init_kwargs = self.option_processor(script_info.data)
        return self.appcls().init(**init_kwargs)

    def option_processor(self, cli_options):
        """
            Turn cli_options, which is the result of all parsed options in create_script_options()
            into a dict which will be given as kwargs to the app's .init() method.
        """
        retval = dict(config_profile=cli_options.get('profile', None), config={})
        if cli_options.get('quiet', False):
            retval['config']['KEG_LOG_STREAM_ENABLED'] = False
        return retval

    def create_script_options(self):
        """ Create app level options, ideally that are used to configure the app itself.  """
        return [
            click.Option(['--profile'], is_eager=True, default=None, callback=self.options_callback,
                         help=_('Name of the configuration profile to use.'),),
            click.Option(['--quiet'], is_eager=True, is_flag=True, default=False,
                         callback=self.options_callback,
                         help=_('Set default logging level to logging.WARNING.')),
            click.Option(['--help-all'], is_eager=True, is_flag=True, expose_value=False,
                         callback=self.help_all_callback,
                         help=_('Show all commands with subcommands.')),
        ]

    def options_callback(self, ctx, param, value):
        """ This method is called after argument parsing, after ScriptInfo instantiation but before
            create_app() is called.  It's the only way to get the options into ScriptInfo.data
            before the Keg app instance is instantiated.
        """
        si = ctx.ensure_object(flask.cli.ScriptInfo)
        si.data[param.name] = value

    def help_all_callback(self, ctx, param, value):
        if not value or ctx.resilient_parsing:
            return
        formatter = ctx.make_formatter()

        ctx.command.format_usage(ctx, formatter)
        ctx.command.format_help_text(ctx, formatter)
        self.format_options(ctx.command, ctx, formatter)
        self.format_commands(ctx.command, ctx, formatter)
        ctx.command.format_epilog(ctx, formatter)

        click.echo(formatter.getvalue().rstrip('\n'))
        ctx.exit()

    def format_options(self, command, ctx, formatter):
        opts = []
        for param in command.get_params(ctx):
            if param.name == 'help':
                opts.append(('--help', _('Show help message.')))
            elif param.name == 'help_all':
                opts.append(('--help-all', _('Show this message and exit.')))
            else:
                rv = param.get_help_record(ctx)
                if rv is not None:
                    opts.append(rv)

        if opts:
            with formatter.section('Options'):
                formatter.write_dl(opts)

    def format_commands(self, command, ctx, formatter):
        if hasattr(command, 'list_commands'):
            subcommands = command.list_commands(ctx)

            if len(subcommands):
                # allow for 3 times the default spacing
                limit = formatter.width - 6 - max(len(subcommand) for subcommand in subcommands)

                rows = []
                commands = []
                for subcommand in subcommands:
                    cmd = command.get_command(ctx, subcommand)
                    if cmd is None:
                        continue
                    if cmd.hidden:
                        continue
                    commands.append(cmd)
                    help = cmd.get_short_help_str(limit)
                    rows.append((subcommand, help))

                with self.compact_section(formatter, 'Commands'):
                    self.write_dl_with_subcommands(ctx, formatter, rows, commands)

    def write_dl_with_subcommands(self, ctx, formatter, rows, commands, col_max=30, col_spacing=2):
        rows = list(rows)
        widths = click.formatting.measure_table(rows)
        if len(widths) != 2:
            raise TypeError('Expected two columns for definition list')

        first_col = min(widths[0], col_max) + col_spacing

        for (first, second), command in zip(
            click.formatting.iter_rows(rows, len(widths)), commands
        ):
            formatter.write('%*s%s' % (formatter.current_indent, '', first))
            if not second:
                formatter.write('\n')
                continue
            if click.formatting.term_len(first) <= first_col - col_spacing:
                formatter.write(' ' * (first_col - click.formatting.term_len(first)))
            else:
                formatter.write('\n')
                formatter.write(' ' * (first_col + formatter.current_indent))

            text_width = max(formatter.width - first_col - 2, 10)
            lines = iter(click.formatting.wrap_text(second, text_width).splitlines())
            if lines:
                formatter.write(next(lines) + '\n')
                for i, line in enumerate(lines):
                    formatter.write('%*s%s\n' % (
                        first_col + formatter.current_indent, '', line))
            else:
                formatter.write('\n')

            # Add subcommands under current command (recursive)
            with formatter.indentation():
                self.format_commands(command, ctx, formatter)

    @contextmanager
    def compact_section(self, formatter, name):
        formatter.write_heading(name)
        formatter.indent()
        try:
            yield
        finally:
            formatter.dedent()

    def main_callback(self, **kwargs):
        """
            Default Click behavior is to call the help method if no arguments are given to the
            top-most command.  That's good UX, but the way Click impliments it, the help is
            shown before argument parsing takes place.  That's bad, because it means our
            options_callback() is never called and the config profile isn't available when the
            help command calls KegAppGroup.list_commands().

            So, we force Click to always parse the args using `invoke_without_command=True` above,
            but when we do that, Click turns off the automatic display of help.  So, we just
            impliment help-like behavior in this method, which gives us the same net result
            as default Click behavior.
        """
        ctx = click.get_current_context()

        if ctx.invoked_subcommand is not None:
            # A subcommand is present, so arguments were passed.
            return

        click.echo(ctx.get_help(), color=ctx.color)
        ctx.exit()