
Most of these account for the fact that mypy now reports that Hashable is not an allowed return type for sort key functions. That, plus the new ignore for the regression in config.py.
534 lines
18 KiB
Python
534 lines
18 KiB
Python
"""cliutil - Utilities for CLI tools"""
|
|
PKGNAME = 'conservancy_beancount'
|
|
LICENSE = """
|
|
Copyright © 2020, 2021 Brett Smith and other contributors
|
|
|
|
This program is free software: you can redistribute it and/or modify it.
|
|
Refer to the LICENSE.txt that came with the software for details.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
|
|
|
|
import argparse
|
|
import datetime
|
|
import enum
|
|
import io
|
|
import logging
|
|
import operator
|
|
import os
|
|
import pkg_resources
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import types
|
|
|
|
from pathlib import Path
|
|
|
|
import rt.exceptions as rt_error
|
|
import yaml
|
|
|
|
from . import data
|
|
from . import errors
|
|
from . import filters
|
|
from . import rtutil
|
|
|
|
from typing import (
|
|
cast,
|
|
Any,
|
|
BinaryIO,
|
|
Callable,
|
|
Container,
|
|
Generic,
|
|
IO,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
NamedTuple,
|
|
NoReturn,
|
|
Optional,
|
|
Sequence,
|
|
TextIO,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
from .beancount_types import (
|
|
MetaKey,
|
|
Sortable,
|
|
)
|
|
|
|
ET = TypeVar('ET', bound=enum.Enum)
|
|
OutputFile = Union[int, IO]
|
|
|
|
CPU_COUNT = len(os.sched_getaffinity(0))
|
|
STDSTREAM_PATH = Path('-')
|
|
VERSION = pkg_resources.require(PKGNAME)[0].version
|
|
|
|
class EnumArgument(Generic[ET]):
|
|
"""Wrapper class to use an enum as argument values
|
|
|
|
Use this class when the user can choose one of some arbitrary enum names
|
|
as an argument. It will let user abbreviate and use any case, and will
|
|
return the correct value as long as it's unambiguous. Typical usage
|
|
looks like::
|
|
|
|
enum_arg = EnumArgument(Enum)
|
|
arg_parser.add_argument(
|
|
'--choice',
|
|
type=enum_arg.enum_type, # or .value_type
|
|
help=f"Choices are {enum_arg.choices_str()}",
|
|
…
|
|
)
|
|
"""
|
|
# I originally wrote this as a mixin class, to eliminate the need for the
|
|
# explicit wrapping in the example above. But Python 3.6 doesn't really
|
|
# support mixins with Enums; see <https://bugs.python.org/issue29577>.
|
|
# This functionality could be moved to a mixin when we drop support for
|
|
# Python 3.6.
|
|
|
|
def __init__(self, base: Type[ET]) -> None:
|
|
self.base = base
|
|
|
|
def enum_type(self, arg: str) -> ET:
|
|
"""Return a single enum whose name matches the user argument"""
|
|
regexp = re.compile(re.escape(arg), re.IGNORECASE)
|
|
matches = frozenset(
|
|
choice
|
|
for name, choice in self.base.__members__.items()
|
|
if regexp.match(name)
|
|
)
|
|
count = len(matches)
|
|
if count == 1:
|
|
return next(iter(matches))
|
|
elif count:
|
|
names = ', '.join(repr(choice.name) for choice in matches)
|
|
raise ValueError(f"ambiguous argument {arg!r}: matches {names}")
|
|
else:
|
|
raise ValueError(f"unknown argument {arg!r}")
|
|
|
|
def value_type(self, arg: str) -> Any:
|
|
return self.enum_type(arg).value
|
|
|
|
def choices_str(self, sep: str=', ', fmt: str='{!r}') -> str:
|
|
"""Return a user-formatted string of enum names"""
|
|
sortkey: Callable[[ET], Sortable] = getattr(
|
|
self.base, '_choices_sortkey', self._choices_sortkey,
|
|
)
|
|
return sep.join(
|
|
fmt.format(choice.name.lower())
|
|
for choice in sorted(self.base, key=sortkey)
|
|
)
|
|
|
|
def _choices_sortkey(self, choice: ET) -> Sortable:
|
|
return choice.name
|
|
|
|
|
|
class ExceptHook:
|
|
def __init__(self, logger: Optional[logging.Logger]=None) -> None:
|
|
if logger is None:
|
|
logger = logging.getLogger()
|
|
self.logger = logger
|
|
|
|
def __call__(self,
|
|
exc_type: Type[BaseException],
|
|
exc_value: BaseException,
|
|
exc_tb: types.TracebackType,
|
|
) -> NoReturn:
|
|
error_type = type(exc_value).__name__
|
|
msg = ": ".join(str(arg) for arg in exc_value.args)
|
|
if isinstance(exc_value, KeyboardInterrupt):
|
|
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
|
os.kill(0, signal.SIGINT)
|
|
signal.pause()
|
|
elif isinstance(exc_value, (
|
|
rt_error.AuthorizationError,
|
|
rt_error.NotAllowed,
|
|
)):
|
|
exitcode = os.EX_NOPERM
|
|
error_type = "RT access denied"
|
|
elif isinstance(exc_value, rt_error.ConnectionError):
|
|
exitcode = os.EX_TEMPFAIL
|
|
error_type = "RT connection error"
|
|
elif isinstance(exc_value, rt_error.RtError):
|
|
exitcode = os.EX_UNAVAILABLE
|
|
error_type = f"RT {error_type}"
|
|
elif isinstance(exc_value, errors.RewriteRuleError):
|
|
exitcode = ExitCode.RewriteRulesError
|
|
msg = str(exc_value)
|
|
if exc_value.source is not None:
|
|
msg += f"\n\n source: {yaml.safe_dump(exc_value.source)}"
|
|
elif isinstance(exc_value, OSError):
|
|
if exc_value.filename is None:
|
|
exitcode = os.EX_OSERR
|
|
error_type = "OS error"
|
|
msg = exc_value.strerror
|
|
else:
|
|
# There are more specific exit codes for input problems vs.
|
|
# output problems, but without knowing how the file was
|
|
# intended to be used, we can't use them.
|
|
exitcode = os.EX_IOERR
|
|
error_type = "I/O error"
|
|
msg = f"{exc_value.filename}: {exc_value.strerror}"
|
|
else:
|
|
exitcode = os.EX_SOFTWARE
|
|
error_type = f"internal {error_type}"
|
|
self.logger.critical("%s%s%s", error_type, ": " if msg else "", msg)
|
|
self.logger.debug(
|
|
''.join(traceback.format_exception(exc_type, exc_value, exc_tb)),
|
|
)
|
|
raise SystemExit(exitcode)
|
|
|
|
|
|
class ExitCode(enum.IntEnum):
|
|
# BSD exit codes commonly used
|
|
NoConfiguration = os.EX_CONFIG
|
|
NoConfig = NoConfiguration
|
|
NoDataFiltered = os.EX_DATAERR
|
|
NoDataLoaded = os.EX_NOINPUT
|
|
OK = os.EX_OK
|
|
Ok = OK
|
|
RewriteRulesError = os.EX_DATAERR
|
|
|
|
# Our own exit codes, working down from that range
|
|
BeancountErrors = 63
|
|
|
|
|
|
class ExtendAction(argparse.Action):
|
|
"""argparse action to let a user build a list from a string
|
|
|
|
This is a fancier version of argparse's built-in ``action='append'``.
|
|
The user's input is turned into a list of strings, split by a regexp
|
|
pattern you provide. Typical usage looks like::
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--option', ...,
|
|
action=ExtendAction,
|
|
const=regexp_pattern, # default is '\\s*,\\s*'
|
|
...,
|
|
)
|
|
"""
|
|
DEFAULT_PATTERN = r'\s*,\s*'
|
|
|
|
def __call__(self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: Union[Sequence[Any], str, None]=None,
|
|
option_string: Optional[str]=None,
|
|
) -> None:
|
|
pattern: str = self.const or self.DEFAULT_PATTERN
|
|
value: Optional[List[str]] = getattr(namespace, self.dest, None)
|
|
if value is None:
|
|
value = []
|
|
setattr(namespace, self.dest, value)
|
|
if values is None:
|
|
values = []
|
|
elif isinstance(values, str):
|
|
values = [values]
|
|
for s in values:
|
|
value.extend(re.split(pattern, s))
|
|
|
|
|
|
class InfoAction(argparse.Action):
|
|
def __call__(self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: Union[Sequence[Any], str, None]=None,
|
|
option_string: Optional[str]=None,
|
|
) -> NoReturn:
|
|
if isinstance(self.const, str):
|
|
info = self.const
|
|
exitcode = 0
|
|
else:
|
|
info, exitcode = self.const
|
|
print(info)
|
|
raise SystemExit(exitcode)
|
|
|
|
|
|
class LogLevel(enum.IntEnum):
|
|
DEBUG = logging.DEBUG
|
|
INFO = logging.INFO
|
|
WARNING = logging.WARNING
|
|
ERROR = logging.ERROR
|
|
CRITICAL = logging.CRITICAL
|
|
WARN = WARNING
|
|
ERR = ERROR
|
|
CRIT = CRITICAL
|
|
|
|
def _choices_sortkey(self) -> Sortable:
|
|
return self.value
|
|
|
|
|
|
class SearchTerm(NamedTuple):
|
|
"""NamedTuple representing a user's metadata filter
|
|
|
|
SearchTerm knows how to parse and store posting metadata filters provided
|
|
by the user in `key=value` format. Reporting tools can use this to filter
|
|
postings that match the user's criteria, to report on subsets of the books.
|
|
|
|
Typical usage looks like::
|
|
|
|
argument_parser.add_argument(
|
|
'search_terms',
|
|
type=SearchTerm.arg_parser(),
|
|
…,
|
|
)
|
|
|
|
args = argument_parser.parse_args(…)
|
|
for query in args.search_terms:
|
|
postings = query.filter_postings(postings)
|
|
"""
|
|
meta_key: MetaKey
|
|
pattern: str
|
|
|
|
@classmethod
|
|
def arg_parser(cls,
|
|
default_key: Optional[str]=None,
|
|
ticket_default_key: Optional[str]=None,
|
|
) -> Callable[[str], 'SearchTerm']:
|
|
"""Build a SearchTerm parser
|
|
|
|
This method returns a function that can parse strings in ``key=value``
|
|
format and return a corresponding SearchTerm.
|
|
|
|
If you specify a default key, then strings that just specify a ``value``
|
|
will be parsed as if they said ``default_key=value``. Otherwise,
|
|
parsing strings without a metadata key will raise a ValueError.
|
|
|
|
If you specify a default key ticket links, then values in the format
|
|
``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as
|
|
if they said ``ticket_default_key=value``.
|
|
"""
|
|
if ticket_default_key is None:
|
|
ticket_default_key = default_key
|
|
def parse_search_term(arg: str) -> 'SearchTerm':
|
|
key: Optional[str] = None
|
|
if re.match(r'^[a-z][-\w]*=', arg):
|
|
key, _, raw_link = arg.partition('=')
|
|
else:
|
|
raw_link = arg
|
|
rt_ids = rtutil.RT.parse(raw_link)
|
|
if rt_ids is None:
|
|
rt_ids = rtutil.RT.parse('rt:' + raw_link)
|
|
if rt_ids is None:
|
|
if key is None:
|
|
key = default_key
|
|
pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
|
|
else:
|
|
ticket_id, attachment_id = rt_ids
|
|
if key is None:
|
|
if attachment_id is None:
|
|
key = ticket_default_key
|
|
else:
|
|
key = default_key
|
|
pattern = rtutil.RT.metadata_regexp(
|
|
ticket_id,
|
|
attachment_id,
|
|
first_link_only=key == 'rt-id' and attachment_id is None,
|
|
)
|
|
if key is None:
|
|
raise ValueError(f"invalid search term {arg!r}: no metadata key")
|
|
return cls(key, pattern)
|
|
return parse_search_term
|
|
|
|
def filter_postings(self, postings: Iterable[data.Posting]) -> Iterator[data.Posting]:
|
|
return filters.filter_meta_match(
|
|
postings, self.meta_key, re.compile(self.pattern),
|
|
)
|
|
|
|
def add_jobs_argument(parser: argparse.ArgumentParser) -> argparse.Action:
|
|
return parser.add_argument(
|
|
'--jobs', '-j',
|
|
metavar='NUM',
|
|
type=jobs_arg,
|
|
default=CPU_COUNT,
|
|
help="""Maximum number of processes to run concurrently.
|
|
Can specify a positive integer or a percentage of CPU cores. Default all cores.
|
|
""")
|
|
|
|
def add_loglevel_argument(parser: argparse.ArgumentParser,
|
|
default: LogLevel=LogLevel.INFO) -> argparse.Action:
|
|
arg_enum = EnumArgument(LogLevel)
|
|
return parser.add_argument(
|
|
'--loglevel',
|
|
metavar='LEVEL',
|
|
default=default.value,
|
|
type=arg_enum.value_type,
|
|
help="Show logs at this level and above."
|
|
f" Specify one of {arg_enum.choices_str()}."
|
|
f" Default {default.name.lower()!r}.",
|
|
)
|
|
|
|
def add_rewrite_rules_argument(parser: argparse.ArgumentParser) -> argparse.Action:
|
|
return parser.add_argument(
|
|
'--rewrite-rules', '--rewrites', '-r',
|
|
action='append',
|
|
default=[],
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Use rewrite rules from the given YAML file. You can specify
|
|
this option multiple times to load multiple sets of rewrite rules in order.
|
|
""")
|
|
|
|
def add_version_argument(parser: argparse.ArgumentParser) -> argparse.Action:
|
|
progname = parser.prog or sys.argv[0]
|
|
return parser.add_argument(
|
|
'--version', '--copyright', '--license',
|
|
action=InfoAction,
|
|
nargs=0,
|
|
const=f"{progname} version {VERSION}\n{LICENSE}",
|
|
help="Show program version and license information",
|
|
)
|
|
|
|
def can_run(
|
|
cmd: Sequence[str],
|
|
stdout: Optional[int]=subprocess.DEVNULL,
|
|
stderr: Optional[int]=None,
|
|
ok_returncodes: Container[int]=frozenset([0]),
|
|
) -> bool:
|
|
try:
|
|
with subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr) as proc:
|
|
# Typing says this can be None, but I don't think that's true
|
|
# given that we passed stdin=PIPE.
|
|
proc.stdin.close() # type:ignore[union-attr]
|
|
except (OSError, subprocess.SubprocessError):
|
|
return False
|
|
else:
|
|
return proc.returncode in ok_returncodes
|
|
|
|
def date_arg(arg: str) -> datetime.date:
|
|
return datetime.datetime.strptime(arg, '%Y-%m-%d').date()
|
|
|
|
def diff_year(date: datetime.date, diff: int) -> datetime.date:
|
|
new_year = date.year + diff
|
|
try:
|
|
return date.replace(year=new_year)
|
|
except ValueError:
|
|
# The original date is Feb 29, which doesn't exist in the new year.
|
|
if diff < 0:
|
|
return datetime.date(new_year, 2, 28)
|
|
else:
|
|
return datetime.date(new_year, 3, 1)
|
|
|
|
def year_or_date_arg(arg: str) -> Union[int, datetime.date]:
|
|
"""Get either a date or a year (int) from an argument string
|
|
|
|
This is a useful argument type for arguments that will be passed into
|
|
Books loader methods which can accept either a fiscal year or a full date.
|
|
"""
|
|
try:
|
|
year = int(arg, 10)
|
|
except ValueError:
|
|
ok = False
|
|
else:
|
|
ok = datetime.MINYEAR <= year <= datetime.MAXYEAR
|
|
if ok:
|
|
return year
|
|
else:
|
|
return date_arg(arg)
|
|
|
|
def jobs_arg(arg: str) -> int:
|
|
if arg.endswith('%'):
|
|
arg_n = round(CPU_COUNT * 100 / int(arg[:-1]))
|
|
else:
|
|
arg_n = int(arg)
|
|
if arg_n < 1:
|
|
raise ValueError("--jobs argument must be a positive integer or percentage")
|
|
else:
|
|
return arg_n
|
|
|
|
def make_entry_point(mod_name: str, prog_name: str=sys.argv[0]) -> Callable[[], int]:
|
|
"""Create an entry_point function for a tool
|
|
|
|
The returned function is suitable for use as an entry_point in setup.py.
|
|
It sets up the root logger and excepthook, then calls the module's main
|
|
function.
|
|
"""
|
|
def entry_point(): # type:ignore
|
|
prog_mod = sys.modules[mod_name]
|
|
setup_logger()
|
|
prog_mod.logger = logging.getLogger(prog_name)
|
|
sys.excepthook = ExceptHook(prog_mod.logger)
|
|
return prog_mod.main()
|
|
return entry_point
|
|
|
|
def setup_logger(logger: Union[str, logging.Logger]='',
|
|
stream: TextIO=sys.stderr,
|
|
fmt: str='%(name)s: %(levelname)s: %(message)s',
|
|
) -> logging.Logger:
|
|
"""Set up a logger with a StreamHandler with the given format"""
|
|
if isinstance(logger, str):
|
|
logger = logging.getLogger(logger)
|
|
formatter = logging.Formatter(fmt)
|
|
handler = logging.StreamHandler(stream)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
return logger
|
|
|
|
def set_loglevel(logger: logging.Logger, loglevel: int=logging.INFO) -> None:
|
|
"""Set the loglevel for a tool or module
|
|
|
|
If the given logger is not under a hierarchy, this function sets the
|
|
loglevel for the root logger, along with some specific levels for libraries
|
|
used by reporting tools. Otherwise, it's the same as
|
|
``logger.setLevel(loglevel)``.
|
|
"""
|
|
if '.' not in logger.name:
|
|
logger = logging.getLogger()
|
|
# pdfminer logs debug-like messages at the info level
|
|
logging.getLogger('pdfminer').setLevel(max(loglevel, logging.WARNING))
|
|
# At the debug level, the rt module logs the full body of every
|
|
# request and response. That's too much.
|
|
logging.getLogger('rt.rt').setLevel(max(loglevel, logging.INFO))
|
|
logger.setLevel(loglevel)
|
|
|
|
def bytes_output(path: Optional[Path]=None,
|
|
default: OutputFile=sys.stdout,
|
|
mode: str='w',
|
|
) -> BinaryIO:
|
|
"""Get a file-like object suitable for binary output
|
|
|
|
If ``path`` is ``None`` or ``-``, returns a file-like object backed by
|
|
``default``. If ``default`` is a file descriptor or text IO object, this
|
|
method returns a file-like object that writes to the same place.
|
|
|
|
Otherwise, returns ``path.open(mode)``.
|
|
"""
|
|
mode = f'{mode}b'
|
|
if path is None or path == STDSTREAM_PATH:
|
|
if isinstance(default, int):
|
|
retval = open(default, mode)
|
|
elif isinstance(default, TextIO):
|
|
retval = default.buffer
|
|
else:
|
|
retval = default
|
|
else:
|
|
retval = path.open(mode)
|
|
return cast(BinaryIO, retval)
|
|
|
|
def text_output(path: Optional[Path]=None,
|
|
default: OutputFile=sys.stdout,
|
|
mode: str='w',
|
|
encoding: Optional[str]=None,
|
|
) -> TextIO:
|
|
"""Get a file-like object suitable for text output
|
|
|
|
If ``path`` is ``None`` or ``-``, returns a file-like object backed by
|
|
``default``. If ``default`` is a file descriptor or binary IO object, this
|
|
method returns a file-like object that writes to the same place.
|
|
|
|
Otherwise, returns ``path.open(mode)``.
|
|
"""
|
|
if path is None or path == STDSTREAM_PATH:
|
|
if isinstance(default, int):
|
|
retval = open(default, mode, encoding=encoding)
|
|
elif isinstance(default, BinaryIO):
|
|
retval = io.TextIOWrapper(default, encoding=encoding)
|
|
else:
|
|
retval = default
|
|
else:
|
|
retval = path.open(mode, encoding=encoding)
|
|
return cast(TextIO, retval)
|