355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""accrual.py - Various reports about accruals"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import datetime
|
|
import enum
|
|
import collections
|
|
import re
|
|
import sys
|
|
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
Iterator,
|
|
Mapping,
|
|
NamedTuple,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
TextIO,
|
|
Tuple,
|
|
)
|
|
from ..beancount_types import (
|
|
Error,
|
|
MetaKey,
|
|
MetaValue,
|
|
Transaction,
|
|
)
|
|
|
|
import rt
|
|
|
|
from beancount.parser import printer as bc_printer
|
|
|
|
from . import core
|
|
from .. import config as configmod
|
|
from .. import data
|
|
from .. import filters
|
|
from .. import rtutil
|
|
|
|
PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
|
|
ReportFunc = Callable[
|
|
[PostGroups, TextIO, TextIO, Optional[rt.Rt], Optional[rtutil.RT]],
|
|
None
|
|
]
|
|
RTObject = Mapping[str, str]
|
|
|
|
class ReportType:
|
|
NAMES: Set[str] = set()
|
|
BY_NAME: Dict[str, ReportFunc] = {}
|
|
|
|
@classmethod
|
|
def register(cls, *names: str) -> Callable[[ReportFunc], ReportFunc]:
|
|
def register_wrapper(func: ReportFunc) -> ReportFunc:
|
|
for name in names:
|
|
cls.BY_NAME[name] = func
|
|
cls.NAMES.add(names[0])
|
|
return func
|
|
return register_wrapper
|
|
|
|
@classmethod
|
|
def by_name(cls, name: str) -> ReportFunc:
|
|
try:
|
|
return cls.BY_NAME[name.lower()]
|
|
except KeyError:
|
|
raise ValueError(f"unknown report type {name!r}") from None
|
|
|
|
@classmethod
|
|
def default_for(cls, groups: PostGroups) -> Tuple[ReportFunc, PostGroups]:
|
|
nonzero_groups = {
|
|
key: group for key, group in groups.items()
|
|
if not group.balance().is_zero()
|
|
}
|
|
if len(nonzero_groups) == 1 and all(
|
|
post.account.is_under('Liabilities')
|
|
for group in nonzero_groups.values()
|
|
for post in group
|
|
):
|
|
report_name = 'outgoing'
|
|
else:
|
|
report_name = 'balance'
|
|
return cls.BY_NAME[report_name], nonzero_groups or groups
|
|
|
|
|
|
class ReturnFlag(enum.IntFlag):
|
|
LOAD_ERRORS = 1
|
|
CONSISTENCY_ERRORS = 2
|
|
REPORT_ERRORS = 4
|
|
NOTHING_TO_REPORT = 8
|
|
|
|
|
|
class SearchTerm(NamedTuple):
|
|
meta_key: MetaKey
|
|
pattern: str
|
|
|
|
@classmethod
|
|
def parse(cls, s: str) -> 'SearchTerm':
|
|
key_match = re.match(r'^[a-z][-\w]*=', s)
|
|
key: Optional[str]
|
|
if key_match:
|
|
key, _, raw_link = s.partition('=')
|
|
else:
|
|
key = None
|
|
raw_link = s
|
|
rt_ids = rtutil.RT.parse(raw_link)
|
|
if rt_ids is None:
|
|
rt_ids = rtutil.RT.parse('rt:' + raw_link)
|
|
if rt_ids is None:
|
|
if key is None:
|
|
key = 'invoice'
|
|
pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
|
|
else:
|
|
ticket_id, attachment_id = rt_ids
|
|
if key is None:
|
|
key = 'rt-id' if attachment_id is None else 'invoice'
|
|
pattern = rtutil.RT.metadata_regexp(
|
|
ticket_id,
|
|
attachment_id,
|
|
first_link_only=key == 'rt-id' and attachment_id is None,
|
|
)
|
|
return cls(key, pattern)
|
|
|
|
|
|
def consistency_check(groups: PostGroups) -> Iterable[Error]:
|
|
for key, related in groups.items():
|
|
for checked_meta in ['contract', 'entity', 'purchase-order']:
|
|
meta_values = related.meta_values(checked_meta)
|
|
if len(meta_values) != 1:
|
|
errmsg = f'inconsistent {checked_meta} for invoice {key}'
|
|
for post in related:
|
|
yield Error(
|
|
post.meta,
|
|
f'{errmsg}: {post.meta.get(checked_meta)}',
|
|
post.meta.txn,
|
|
)
|
|
|
|
def _since_last_nonzero(posts: core.RelatedPostings) -> core.RelatedPostings:
|
|
retval = core.RelatedPostings()
|
|
for post in posts:
|
|
if retval.balance().is_zero():
|
|
retval.clear()
|
|
retval.add(post)
|
|
return retval
|
|
|
|
@ReportType.register('balance', 'bal')
|
|
def balance_report(groups: PostGroups,
|
|
out_file: TextIO,
|
|
err_file: TextIO=sys.stderr,
|
|
rt_client: Optional[rt.Rt]=None,
|
|
rt_wrapper: Optional[rtutil.RT]=None,
|
|
) -> None:
|
|
prefix = ''
|
|
for invoice, related in groups.items():
|
|
related = _since_last_nonzero(related)
|
|
balance = related.balance()
|
|
date_s = related[0].meta.date.strftime('%Y-%m-%d')
|
|
print(
|
|
f"{prefix}{invoice}:",
|
|
f" {balance} outstanding since {date_s}",
|
|
sep='\n', file=out_file,
|
|
)
|
|
prefix = '\n'
|
|
|
|
def _primary_rt_id(related: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
|
|
rt_ids = related.all_meta_links('rt-id')
|
|
rt_ids_count = len(rt_ids)
|
|
if rt_ids_count != 1:
|
|
raise ValueError(f"{rt_ids_count} rt-id links found")
|
|
parsed = rtutil.RT.parse(rt_ids.pop())
|
|
if parsed is None:
|
|
raise ValueError("rt-id is not a valid RT reference")
|
|
else:
|
|
return parsed
|
|
|
|
@ReportType.register('outgoing', 'outgoings', 'out')
|
|
def outgoing_report(groups: PostGroups,
|
|
out_file: TextIO,
|
|
err_file: TextIO=sys.stderr,
|
|
rt_client: Optional[rt.Rt]=None,
|
|
rt_wrapper: Optional[rtutil.RT]=None,
|
|
) -> None:
|
|
if rt_client is None or rt_wrapper is None:
|
|
raise ValueError("RT client is required but not configured")
|
|
for invoice, related in groups.items():
|
|
related = _since_last_nonzero(related)
|
|
try:
|
|
ticket_id, _ = _primary_rt_id(related)
|
|
ticket = rt_client.get_ticket(ticket_id)
|
|
# Note we only use this when ticket is None.
|
|
errmsg = f"ticket {ticket_id} not found"
|
|
except (ValueError, rt.RtError) as error:
|
|
ticket = None
|
|
errmsg = error.args[0]
|
|
if ticket is None:
|
|
print("error: can't generate outgoings report for {}"
|
|
" because no RT ticket available: {}".format(
|
|
invoice, errmsg,
|
|
), file=err_file)
|
|
continue
|
|
|
|
try:
|
|
rt_requestor = rt_client.get_user(ticket['Requestors'][0])
|
|
except (IndexError, rt.RtError):
|
|
rt_requestor = None
|
|
if rt_requestor is None:
|
|
requestor = ''
|
|
requestor_name = ''
|
|
else:
|
|
requestor_name = (
|
|
rt_requestor.get('RealName')
|
|
or ticket.get('CF.{payment-to}')
|
|
or ''
|
|
)
|
|
requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
|
|
|
|
contract_links = related.all_meta_links('contract')
|
|
if contract_links:
|
|
contract_s = ' , '.join(rt_wrapper.iter_urls(
|
|
contract_links, '<{}>', '{}', '<BROKEN RT LINK: {}>',
|
|
))
|
|
else:
|
|
contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
|
|
projects = [v for v in related.meta_values('project')
|
|
if isinstance(v, str)]
|
|
|
|
print(
|
|
"PAYMENT FOR APPROVAL:",
|
|
f"REQUESTOR: {requestor}",
|
|
f"TOTAL TO PAY: {-related.balance()}",
|
|
f"AGREEMENT: {contract_s}",
|
|
f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}",
|
|
f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}",
|
|
f"PROJECT: {', '.join(projects)}",
|
|
"\nBEANCOUNT ENTRIES:\n",
|
|
sep='\n', file=out_file,
|
|
)
|
|
|
|
last_txn: Optional[Transaction] = None
|
|
for post in related:
|
|
txn = post.meta.txn
|
|
if txn is not last_txn:
|
|
last_txn = txn
|
|
txn = rt_wrapper.txn_with_urls(txn)
|
|
bc_printer.print_entry(txn, file=out_file)
|
|
|
|
def filter_search(postings: Iterable[data.Posting],
|
|
search_terms: Iterable[SearchTerm],
|
|
) -> Iterable[data.Posting]:
|
|
postings = (post for post in postings if post.account.is_under(
|
|
'Assets:Receivable', 'Liabilities:Payable',
|
|
))
|
|
for meta_key, pattern in search_terms:
|
|
postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
|
|
return postings
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
'--report-type', '-t',
|
|
metavar='NAME',
|
|
type=ReportType.by_name,
|
|
help="""The type of report to generate, either `balance` or `outgoing`.
|
|
If not specified, the default is `outgoing` for search criteria that return a
|
|
single outstanding payable, and `balance` any other time.
|
|
""")
|
|
parser.add_argument(
|
|
'--since',
|
|
metavar='YEAR',
|
|
type=int,
|
|
default=-1,
|
|
help="""How far back to search the books for related transactions.
|
|
You can either specify a fiscal year, or a negative offset from the current
|
|
fiscal year, to start loading entries from. The default is -1 (start from the
|
|
previous fiscal year).
|
|
""")
|
|
parser.add_argument(
|
|
'search',
|
|
nargs=argparse.ZERO_OR_MORE,
|
|
help="""Report on accruals that match this criteria. The format is
|
|
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
|
|
metadata to match. A single ticket number is a shortcut for
|
|
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
|
|
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
|
|
""")
|
|
args = parser.parse_args(arglist)
|
|
args.search_terms = [SearchTerm.parse(s) for s in args.search]
|
|
return args
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
config: Optional[configmod.Config]=None,
|
|
) -> int:
|
|
returncode = 0
|
|
args = parse_arguments(arglist)
|
|
if config is None:
|
|
config = configmod.Config()
|
|
config.load_file()
|
|
books_loader = config.books_loader()
|
|
if books_loader is not None:
|
|
entries, load_errors, _ = books_loader.load_fy_range(args.since)
|
|
else:
|
|
entries = []
|
|
source = {
|
|
'filename': str(config.config_file_path()),
|
|
'lineno': 1,
|
|
}
|
|
load_errors = [Error(source, "no books to load in configuration", None)]
|
|
postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
|
|
groups = core.RelatedPostings.group_by_meta(postings, 'invoice')
|
|
meta_errors = consistency_check(groups)
|
|
for error in load_errors:
|
|
bc_printer.print_error(error, file=stderr)
|
|
returncode |= ReturnFlag.LOAD_ERRORS
|
|
for error in meta_errors:
|
|
bc_printer.print_error(error, file=stderr)
|
|
returncode |= ReturnFlag.CONSISTENCY_ERRORS
|
|
if args.report_type is None:
|
|
args.report_type, groups = ReportType.default_for(groups)
|
|
if not groups:
|
|
print("warning: no matching entries found to report", file=stderr)
|
|
returncode |= ReturnFlag.NOTHING_TO_REPORT
|
|
else:
|
|
try:
|
|
args.report_type(
|
|
groups,
|
|
stdout,
|
|
stderr,
|
|
config.rt_client(),
|
|
config.rt_wrapper(),
|
|
)
|
|
except ValueError as exc:
|
|
print("error: unable to generate {}: {}".format(
|
|
args.report_type.__name__.replace('_', ' '),
|
|
exc.args[0],
|
|
), file=stderr)
|
|
returncode |= ReturnFlag.REPORT_ERRORS
|
|
return 0 if returncode == 0 else 16 + returncode
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|