"""Classes to parse command line arguments.
"""
__author__ = 'Paul Landes'
from typing import Tuple, List, Any, Dict, Iterable, Sequence
from dataclasses import dataclass, field
from enum import Enum
import logging
import sys
from itertools import chain
from pathlib import Path
from io import TextIOBase
from optparse import OptionParser
from frozendict import frozendict
from zensols.persist import persisted, PersistableContainer, Deallocatable
from zensols.config import Dictable
from . import (
ApplicationError, OptionMetaData, PositionalMetaData, ActionMetaData,
UsageConfig, UsageActionOptionParser,
)
logger = logging.getLogger(__name__)
[docs]
class CommandLineError(ApplicationError):
"""Raised when command line parameters can not be parsed.
"""
pass
[docs]
class CommandLineConfigError(Exception):
"""Programmer error for command line parser configuration errors.
"""
pass
[docs]
@dataclass
class CommandAction(Dictable):
"""The output of the :class:`.CommandLineParser` for each parsed action.
"""
_DICTABLE_WRITABLE_DESCENDANTS = True
meta_data: ActionMetaData = field()
"""The action parsed from the command line."""
options: Dict[str, Any] = field()
"""The options given as switches."""
positional: Tuple[str, ...] = field()
"""The positional arguments parsed."""
@property
def name(self) -> str:
"""The name of the action."""
return self.meta_data.name
def __str__(self) -> str:
return f'{self.meta_data.name}: {self.options}/{self.positional}'
[docs]
@dataclass
class CommandActionSet(Deallocatable, Dictable):
"""The actions that are parsed by :class:`.CommandLineParser` as the output of
the parse phase. This is indexable by command action name and iterable
across all actions. Properties :obj:`first_pass_actions` and
:obj:`second_pass_action` give access to the split from the respective
types of actions.
"""
_DICTABLE_WRITABLE_DESCENDANTS = True
actions: Tuple[CommandAction, ...] = field()
"""The actions parsed. The first N actions are first pass where as the last is
the second pass action.
"""
@property
def first_pass_actions(self) -> Iterable[CommandAction]:
"""All first pass actions."""
return self.actions[0:-1]
@property
def second_pass_action(self) -> CommandAction:
"""The single second pass action."""
return self.actions[-1]
@property
def by_name(self) -> Dict[str, CommandAction]:
"""Command actions by name keys."""
return {a.name: a for a in self.actions}
[docs]
def deallocate(self):
super().deallocate()
self._try_deallocate(self.actions)
def __getitem__(self, name: str) -> CommandAction:
return self.by_name[name]
def __iter__(self) -> Iterable[CommandAction]:
return iter(self.actions)
def __len__(self) -> int:
return len(self.actions)
[docs]
@dataclass
class CommandLineConfig(PersistableContainer, Dictable):
"""Given to configure the :class:`.CommandLineParser`.
"""
actions: Tuple[ActionMetaData, ...] = field()
"""The action meta data used to parse and print help."""
@property
@persisted('_first_pass_actions')
def first_pass_actions(self) -> Tuple[ActionMetaData, ...]:
return tuple(filter(lambda a: a.first_pass, self.actions))
@property
@persisted('_second_pass_actions')
def second_pass_actions(self) -> Tuple[ActionMetaData, ...]:
return tuple(filter(lambda a: not a.first_pass, self.actions))
@property
@persisted('_actions_by_name')
def actions_by_name(self) -> Dict[str, ActionMetaData]:
return frozendict({a.name: a for a in self.actions})
@property
@persisted('_first_pass_options')
def first_pass_options(self) -> Tuple[OptionMetaData, ...]:
return tuple(chain.from_iterable(
map(lambda a: a.options, self.first_pass_actions)))
@property
@persisted('_first_pass_by_option')
def first_pass_by_option(self) -> Dict[str, ActionMetaData]:
actions = {}
for action in self.first_pass_actions:
for k, v in action.options_by_dest.items():
if k in actions:
raise CommandLineConfigError(
f"First pass duplicate option in '{action.name}': {k}")
actions[k] = action
return actions
[docs]
def deallocate(self):
super().deallocate()
self._try_deallocate(self.actions)
[docs]
@dataclass
class CommandLineParser(Deallocatable, Dictable):
"""Parse the command line. The parser iterates twice over the command line:
1. The first pass parses only *first pass* actions
(:obj:`.ActionMetaData.first_pass`). This step also is used to
discover the mnemonic/name of the single second pass action.
2. The second pass parse parses only a single action that is given on
the command line.
The name is given as a mnemonic of the action, unless there is only one
*second pass action* given, in which case all options and usage are given
at the top level and a mnemonic is not needed nor parsed.
:see :obj:`.ActionMetaData.first_pass`
"""
config: CommandLineConfig = field()
"""Configures the command line parser with the action meta data."""
version: str = field(default='v0')
"""The version of the application, which is used in the help and the
``--version`` switch.
"""
default_action: str = field(default=None)
"""The default mnemonic use when the user does not supply one."""
force_default: str = field(default=False)
"""Choice of action becomes ambiguous when the positional arguments are
given and the action name matches the argument. An error is raised when the
application is configured in this way.
When this attribute is ``True``, the command parsing will insert the default
action when the first non-option isn't found as an action. However, this
leads to the mentioned ambiguouity and is inefficient.
"""
application_doc: str = field(default=None)
"""The program documentation to use when it can not be deduced from the
action.
"""
usage_config: UsageConfig = field(default_factory=UsageConfig)
"""Configuraiton information for the command line help."""
def __post_init__(self):
if len(self.config.actions) == 0:
raise CommandLineConfigError(
'Must create parser with at least one action')
def _create_parser(self, actions: Tuple[ActionMetaData, ...]) -> \
OptionParser:
return UsageActionOptionParser(
actions=actions,
options=self.config.first_pass_options,
doc=self.application_doc,
default_action=self.default_action,
usage_config=self.usage_config,
version=('%prog ' + str(self.version)))
def _configure_parser(self, parser: OptionParser,
options: Iterable[OptionMetaData]):
opt_names = {}
for opt in options:
prev: OptionMetaData = opt_names.get(opt.long_name)
if prev is not None:
raise CommandLineConfigError(
f"Duplicate option: '{prev.long_name}': " +
f"<{prev}> -> <{opt}>")
opt_names[opt.long_name] = opt
op_opt = opt.create_option()
parser.add_option(op_opt)
def _get_first_pass_parser(self, add_all_opts: bool) -> \
UsageActionOptionParser:
opts = list(self.config.first_pass_options)
sp_actions = self.config.second_pass_actions
if len(sp_actions) == 1:
sp_actions = (sp_actions[0],)
opts.extend(sp_actions[0].options)
elif add_all_opts:
opts.extend(chain.from_iterable(
map(lambda a: a.options, sp_actions)))
parser = self._create_parser(sp_actions)
self._configure_parser(parser, set(opts))
return parser
def _get_second_pass_parser(self, action_meta: ActionMetaData) -> \
UsageActionOptionParser:
opts = list(self.config.first_pass_options)
opts.extend(action_meta.options)
parser = self._create_parser(self.config.second_pass_actions)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"creating parser for action: '{action_meta.name}' " +
f'opts: {action_meta.options}')
self._configure_parser(parser, opts)
return parser
[docs]
def write_help(self, writer: TextIOBase = None,
include_actions: bool = True,
action_metas: Sequence[ActionMetaData] = None,
action_format: str = 'long'):
"""Write the usage information and help text.
:param writer: the data sync, or :obj:`sys.stdout` if ``None``
:param include_actions: if ``True`` write each actions' usage as well
:param actions: the list of actions to output, or ``None`` for all
:param action_format: the action format, either ``short`` or ``long``
"""
writer = sys.stdout if writer is None else writer
parser: UsageActionOptionParser = self._get_first_pass_parser(False)
parser.print_help(writer, include_actions, action_metas, action_format)
[docs]
def error(self, msg: str):
"""Print a usage with the error message and exit the program as fail.
"""
parser: UsageActionOptionParser = self._get_first_pass_parser(False)
parser.error(msg)
def _parse_type(self, s: str, t: type, name: str) -> Any:
tpe = None
if issubclass(t, Enum):
tpe = t.__members__.get(s)
choices = ', '.join(map(lambda e: f"'{e.name}'", t))
if tpe is None:
raise CommandLineError(
f"No choice '{s}' for '{name}' (choose from {choices})")
else:
if not isinstance(s, (str, int, float, bool, Path)):
raise CommandLineConfigError(f'Unknown parse type: {s}: {t}')
try:
tpe = t(s)
except ValueError as e:
raise CommandLineError(f'Expecting type {t.__name__}: {e}')
return tpe
def _parse_options(self, action_meta: ActionMetaData,
op_args: Dict[str, Any]):
opts: Dict[str, OptionMetaData] = action_meta.options_by_dest
parsed = {}
for k, v in op_args.items():
opt: OptionMetaData = opts.get(k)
if v is not None and opt is not None:
v = self._parse_type(v, opt.dtype, opt.long_option)
parsed[k] = v
return parsed
def _parse_positional(self, metas: List[PositionalMetaData],
vals: List[str]) -> Tuple[Any, ...]:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'parsing positional args: {metas} <--> {vals}')
return tuple(
map(lambda x: self._parse_type(x[0], x[1].dtype, x[1].name),
zip(vals, metas)))
def _get_help_action(self, args: List[str]) -> \
Tuple[List[ActionMetaData], List[str]]:
goods: List[str] = []
bads: List[str] = []
arg: str
for arg in args:
action_meta: ActionMetaData = self.config.actions_by_name.get(arg)
if action_meta is None:
bads.append(arg)
else:
goods.append(action_meta)
return goods, bads
def _parse_first_pass(self, args: List[str],
actions: List[CommandAction]) -> \
Tuple[bool, str, Dict[str, Any], Dict[str, Any], Tuple[str, ...]]:
second_pass = False
fp_opts = set(map(lambda o: o.dest, self.config.first_pass_options))
# first fish out the action name (if given) as a positional parameter
parser: OptionParser = self._get_first_pass_parser(True)
(options, op_args) = parser.parse_args(args)
# make assoc array options in to a dict
options = vars(options)
if options['help'] is True:
goods: List[ActionMetaData]
bads: List[str]
goods, bads = self._get_help_action(op_args)
if len(bads) > 0:
raise CommandLineError(
f"No such action{'s' if len(bads) > 1 else ''}: " +
', '.join(bads))
elif len(goods) > 0:
self.write_help(include_actions=True, action_metas=goods)
else:
action_format: str = 'short' if '-h' in args else 'long'
self.write_help(include_actions=True,
action_format=action_format)
sys.exit(0)
else:
del options['help']
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'first pass: {options}:{op_args}')
# find first pass actions (i.e. whine log level '-w' settings)
added_first_pass = set()
fp_ops: Dict[str, ActionMetaData] = self.config.first_pass_by_option
for k, v in options.items():
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'looking for first pass option: {k} ' +
f'in {tuple(fp_ops.keys())}')
fp_action_meta = fp_ops.get(k)
if (fp_action_meta is not None) and \
(fp_action_meta.name not in added_first_pass):
aos = {k: options[k] for k in (set(options.keys()) & fp_opts)}
aos = self._parse_options(fp_action_meta, aos)
action = CommandAction(fp_action_meta, aos, ())
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding first pass action: {action}')
actions.append(action)
added_first_pass.add(fp_action_meta.name)
# if only one option for second pass actions are given, the user need
# not give the action mnemonic/name, instead, just add all its options
# to the top level
if len(self.config.second_pass_actions) == 1:
action_name = self.config.second_pass_actions[0].name
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'using singleton fp action: {action_name} ' +
f'with options {options}')
elif len(op_args) == 0:
if self.default_action is None:
# no positional arguments mean we don't know which action to
# use
raise CommandLineError('No action given')
else:
action_name = self.default_action
op_args = []
args = [action_name] + args
second_pass = True
else:
# otherwise, use the first positional parameter as the mnemonic and
# the remainder as positional parameters for that action
action_name = op_args[0]
op_args = op_args[1:]
second_pass = True
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'need second pass for {action_name}, ' +
f'option args: {op_args}')
return second_pass, action_name, fp_opts, options, op_args, args
def _parse_second_pass(self, action_name: str, second_pass: bool,
args: List[str], options: Dict[str, Any],
op_args: Tuple[str, ...]):
# now that we have parsed the action name, get the meta data
action_meta: ActionMetaData = \
self.config.actions_by_name.get(action_name)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"action '{action_name}' found: {action_meta}")
if action_meta is None:
raise CommandLineError(f'No such action: {action_name}')
pos_arg_diff = len(op_args) - len(action_meta.positional)
single_sp = None
if len(self.config.second_pass_actions) == 1:
single_sp = self.config.second_pass_actions[0].name
unnecessary_mnemonic = pos_arg_diff == 1 and single_sp == op_args[0]
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'positional arg difference: {pos_arg_diff}, ' +
f'single second pass mnemonic: {single_sp}, ' +
f'unnecessary_mnemonic: {unnecessary_mnemonic}')
if unnecessary_mnemonic:
raise CommandLineError(
f"Action '{action_meta.name}' expects " +
f"{len(action_meta.positional)} argument(s), but " +
f"'{single_sp}' is counted as a positional argument " +
'and should be omitted')
if pos_arg_diff != 0:
raise CommandLineError(
f"Action '{action_meta.name}' expects " +
f"{len(action_meta.positional)} " +
f"argument(s) but got {len(op_args)}: {', '.join(op_args)}")
# if there is more than one second pass action, we must re-parse using
# the specific options and positional argument for that action
if second_pass:
parser: OptionParser = self._get_second_pass_parser(action_meta)
(options, op_args) = parser.parse_args(args)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'second pass opts: {options}:{op_args}')
# sanity check to match parsed mnemonic and action name
assert (op_args[0] == action_meta.name)
# remove the action name
op_args = op_args[1:]
options = vars(options)
del options['help']
options = self._parse_options(action_meta, options)
return action_meta, options, op_args
def _validate_setup(self):
"""Make sure we don't have a default action with positional args."""
if self.default_action is not None and not self.force_default:
action_meta: ActionMetaData
for action_meta in self.config.second_pass_actions:
if len(action_meta.positional) > 0:
raise CommandLineConfigError(
'No positional arguments allowed when default ' +
f"action '{self.default_action}' " +
f'given for method {action_meta.name}')
[docs]
def deallocate(self):
super().deallocate()
self._try_deallocate(self.config)
[docs]
def parse(self, args: List[str]) -> CommandActionSet:
"""Parse command line arguments.
:param args: the arguments given on the command line; which is usually
``sys.args[1:]``
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'parsing: {args}')
# action instances
actions: List[CommandAction] = []
# some top level sanity checks
self._validate_setup()
# first pass parse
second_pass, action_name, fp_opts, options, op_args, args = \
self._parse_first_pass(args, actions)
if self.default_action is not None and \
self.force_default and\
action_name not in self.config.actions_by_name:
args = [self.default_action] + args
second_pass, action_name, fp_opts, options, op_args, args = \
self._parse_first_pass(args, actions)
# second pass parse
action_meta, options, op_args = self._parse_second_pass(
action_name, second_pass, args, options, op_args)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'removing first pass options: {fp_opts} ' +
f'from {options}')
# the second pass action should _not_ get the first pass options
options = {k: options[k] for k in (set(options.keys()) - fp_opts)}
# parse positional arguments much like the OptionParser did options
pos_args = self._parse_positional(action_meta.positional, op_args)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'creating action with {options} {pos_args}')
# create and add the second pass action
action_inst = CommandAction(action_meta, options, pos_args)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f'adding action: {action_inst}')
actions.append(action_inst)
return CommandActionSet(tuple(actions))