Source code for watson.console.runner

# -*- coding: utf-8 -*-
import argparse
from collections import OrderedDict
import os
import re
import sys
from watson.common.imports import load_definition_from_string
from watson.common.contextmanagers import suppress
from watson.console import colors, styles


USAGE_REGEX = re.compile('(\w+[\:])(.+?(?=\[))(.*)')


[docs]class Runner(object): """A command line runner that allows new commands to be added and run on demand. Commands can be added either as a fully qualified name, or imported. Example: .. code-block:: python runner = Runner(commands=['module.commands.ACommand']) runner() """ _name = None _commands = None
[docs] def __init__(self, commands=None): self._commands = [] if commands: self.add_commands(commands)
@property def name(self): """Returns the name of the script that runner was executed from. """ return self._name @property def commands(self): """A list of all commands added to the runner. Returns: OrderedDict containing all the commands. """ commands = {} for command in self._commands: if isinstance(command, str): command = load_definition_from_string(command) commands[command.cased_name()] = command return OrderedDict(sorted(commands.items()))
[docs] def add_command(self, command): """Convenience method to add new commands after the runner has been initialized. Args: command (string|class): the command to add """ self._commands.append(command)
[docs] def add_commands(self, commands): """Convenience method to add multiple commands. Args: commands (list|tuple): the commands to add """ for command in commands: self.add_command(command)
[docs] def get_command(self, command_name): """Returns an initialized command from the attached commands. Args: command_name: The command name to retrieve """ if command_name not in self.commands: return None return self.commands[command_name]()
[docs] def update_usage(self, parser, namespace, is_subparser=False): """Updates the usage for the relevant parser. Forces the usage message to include the namespace and color. """ usage = parser.format_usage() r = USAGE_REGEX.search(usage) if not r: return parts = [s for s in r.groups()] if namespace: if is_subparser: method_part = parts[1].split(' ') method_part.insert(-2, namespace) parts[1] = ' '.join(method_part) else: parts.insert(2, '{} '.format(namespace)) parser.usage = colors.header(''.join(parts[1:]).strip())
[docs] def attach_commands(self, parser, namespace): """Register the commands against the parser. Args: parser: The parser to add commands to namespace: The namespace the commands should sit within """ subparsers = parser.add_subparsers() namespaces = {} for command_class in self.commands: command_class = self.get_command(command_class) command_namespace = command_class.cased_name() if command_namespace not in namespaces: namespaces[command_namespace] = [] command_help = command_class.help() # if namespace in self.commands: methods = [cmethod for cmethod in dir(command_class) if hasattr(getattr(command_class, cmethod), 'is_cli_command')] for a_command in methods: command = getattr(command_class, a_command) namespaces[command_namespace].append((a_command, command.__func_doc__)) if command_namespace == namespace: subparser = subparsers.add_parser( a_command, help=command.__func_doc__, description=command.__desc__) for arg, kwargs in command.__args__: subparser.add_argument(arg, **kwargs) subparser.set_defaults( command=( command_class, a_command, command.__args_mapping__)) self.update_usage( subparser, namespace, is_subparser=True) if command_namespace == namespace: # when viewing a command parser.description = command_help self.update_usage(parser, namespace) return if not namespace: # when viewing the namespaces subparsers.add_parser( command_namespace, description=command_help, help=command_help) self.update_usage(parser, namespace) if not namespace or namespace not in namespaces: # Override the default output from argparse to display all the # namespaces and their commands. parser.print_usage() self.write() longest_namespace = max(namespaces, key=len) length = len(longest_namespace) for namespace in sorted(namespaces): commands = namespaces[namespace] self.write(colors.fail(styles.bold(namespace.ljust(length)))) longest_command = max([command[0] for command in commands], key=len) longest_command_length = len(longest_command) for command, help in sorted(commands): command = styles.bold(command.ljust(longest_command_length)) self.write(' {}\t{}'.format(command, help)) self.write() sys.exit(0)
def write(self, message=''): sys.stdout.write(message + '\n')
[docs] def execute(self, args): """Execute the runner and any commands the user has specified. """ if not args: args = sys.argv[:] self._name = os.path.basename(args.pop(0)) execute = True help = '-h' method = help namespace = None # Always show help if invalid command try: namespace, method, *_ = args except: with suppress(Exception): namespace = args[0] execute = False args.append(method) if namespace: args = args[1:] if namespace == help: namespace = None # Add the relevant commands parser = argparse.ArgumentParser() try: self.attach_commands(parser, namespace) except ConsoleError as exc: self._handle_exc(exc) # Parse the input parsed_args = parser.parse_args(args) if execute: instance, method, args = parsed_args.command try: kwargs = {arg_name: getattr(parsed_args, arg_name) for attr, arg_name in args.items()} return getattr(instance, method)(**kwargs) except ConsoleError as exc: self._handle_exc(exc)
def _handle_exc(self, exc): exc_msg = str(exc).strip("'") sys.stderr.write(colors.fail('Error: {0}\n'.format(exc_msg))) sys.exit(1) def __call__(self, args=None): # Convenience to execute() return self.execute(args)
[docs]class ConsoleError(KeyError): """An error that should be raised from within the command. """