accrual: Refactor reports into classes.

Preparation for introducing the aging report. This helps us
distinguish each report's setup requirements (different __init__ arguments).
This commit is contained in:
Brett Smith 2020-05-28 15:49:43 -04:00
parent 9223940213
commit 8b2683d962
2 changed files with 171 additions and 164 deletions

View file

@ -96,10 +96,6 @@ from .. import filters
from .. import rtutil
PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
ReportFunc = Callable[
[PostGroups, TextIO, TextIO, Optional[rt.Rt], Optional[rtutil.RT]],
None
]
RTObject = Mapping[str, str]
class Account(NamedTuple):
@ -132,37 +128,161 @@ class AccrualAccount(enum.Enum):
}
class ReportType:
NAMES: Set[str] = set()
BY_NAME: Dict[str, ReportFunc] = {}
class BaseReport:
def __init__(self, out_file: TextIO, err_file: TextIO) -> None:
self.out_file = out_file
self.err_file = err_file
@classmethod
def register(cls, *names: str) -> Callable[[ReportFunc], ReportFunc]:
def register_wrapper(func: ReportFunc) -> ReportFunc:
for name in names:
cls.BY_NAME[name] = func
cls.NAMES.add(names[0])
return func
return register_wrapper
def _since_last_nonzero(self, posts: core.RelatedPostings) -> core.RelatedPostings:
retval = core.RelatedPostings()
for post in posts:
if retval.balance().is_zero():
retval.clear()
retval.add(post)
return retval
@classmethod
def by_name(cls, name: str) -> ReportFunc:
def _report(self,
invoice: str,
posts: core.RelatedPostings,
index: int,
) -> Iterable[str]:
raise NotImplementedError("BaseReport._report")
def run(self, groups: PostGroups) -> None:
for index, invoice in enumerate(groups):
for line in self._report(str(invoice), groups[invoice], index):
print(line, file=self.out_file)
class BalanceReport(BaseReport):
def _report(self,
invoice: str,
posts: core.RelatedPostings,
index: int,
) -> Iterable[str]:
posts = self._since_last_nonzero(posts)
balance = posts.balance()
date_s = posts[0].meta.date.strftime('%Y-%m-%d')
if index:
yield ""
yield f"{invoice}:"
yield f" {balance} outstanding since {date_s}"
class OutgoingReport(BaseReport):
def __init__(self, rt_client: rt.Rt, out_file: TextIO, err_file: TextIO) -> None:
self.rt_client = rt_client
self.rt_wrapper = rtutil.RT(rt_client)
self.out_file = out_file
self.err_file = err_file
def _primary_rt_id(self, posts: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
rt_ids = posts.all_meta_links('rt-id')
rt_ids_count = len(rt_ids)
if rt_ids_count != 1:
raise ValueError(f"{rt_ids_count} rt-id links found")
parsed = rtutil.RT.parse(rt_ids.pop())
if parsed is None:
raise ValueError("rt-id is not a valid RT reference")
else:
return parsed
def _report(self,
invoice: str,
posts: core.RelatedPostings,
index: int,
) -> Iterable[str]:
posts = self._since_last_nonzero(posts)
try:
return cls.BY_NAME[name.lower()]
ticket_id, _ = self._primary_rt_id(posts)
ticket = self.rt_client.get_ticket(ticket_id)
# Note we only use this when ticket is None.
errmsg = f"ticket {ticket_id} not found"
except (ValueError, rt.RtError) as error:
ticket = None
errmsg = error.args[0]
if ticket is None:
print("error: can't generate outgoings report for {}"
" because no RT ticket available: {}".format(
invoice, errmsg,
), file=self.err_file)
return
try:
rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
except (IndexError, rt.RtError):
rt_requestor = None
if rt_requestor is None:
requestor = ''
requestor_name = ''
else:
requestor_name = (
rt_requestor.get('RealName')
or ticket.get('CF.{payment-to}')
or ''
)
requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
raw_balance = -posts.balance()
cost_balance = -posts.balance_at_cost()
cost_balance_s = cost_balance.format(None)
if raw_balance == cost_balance:
balance_s = cost_balance_s
else:
balance_s = f'{raw_balance} ({cost_balance_s})'
contract_links = posts.all_meta_links('contract')
if contract_links:
contract_s = ' , '.join(self.rt_wrapper.iter_urls(
contract_links, missing_fmt='<BROKEN RT LINK: {}>',
))
else:
contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
projects = [v for v in posts.meta_values('project')
if isinstance(v, str)]
yield "PAYMENT FOR APPROVAL:"
yield f"REQUESTOR: {requestor}"
yield f"TOTAL TO PAY: {balance_s}"
yield f"AGREEMENT: {contract_s}"
yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
yield f"PROJECT: {', '.join(projects)}"
yield "\nBEANCOUNT ENTRIES:\n"
last_txn: Optional[Transaction] = None
for post in posts:
txn = post.meta.txn
if txn is not last_txn:
last_txn = txn
txn = self.rt_wrapper.txn_with_urls(txn, '{}')
yield bc_printer.format_entry(txn)
class ReportType(enum.Enum):
BALANCE = BalanceReport
OUTGOING = OutgoingReport
BAL = BALANCE
OUT = OUTGOING
OUTGOINGS = OUTGOING
@classmethod
def by_name(cls, name: str) -> 'ReportType':
try:
return cls[name.upper()]
except KeyError:
raise ValueError(f"unknown report type {name!r}") from None
@classmethod
def default_for(cls, groups: PostGroups) -> ReportFunc:
def default_for(cls, groups: PostGroups) -> 'ReportType':
if len(groups) == 1 and all(
AccrualAccount.classify(group) is AccrualAccount.PAYABLE
and not AccrualAccount.PAYABLE.value.balance_paid(group.balance())
for group in groups.values()
):
report_name = 'outgoing'
return cls.OUTGOING
else:
report_name = 'balance'
return cls.BY_NAME[report_name]
return cls.BALANCE
class ReturnFlag(enum.IntFlag):
@ -217,123 +337,6 @@ def consistency_check(groups: PostGroups) -> Iterable[Error]:
post.meta.txn,
)
def _since_last_nonzero(posts: core.RelatedPostings) -> core.RelatedPostings:
retval = core.RelatedPostings()
for post in posts:
if retval.balance().is_zero():
retval.clear()
retval.add(post)
return retval
@ReportType.register('balance', 'bal')
def balance_report(groups: PostGroups,
out_file: TextIO,
err_file: TextIO=sys.stderr,
rt_client: Optional[rt.Rt]=None,
rt_wrapper: Optional[rtutil.RT]=None,
) -> None:
prefix = ''
for invoice, related in groups.items():
related = _since_last_nonzero(related)
balance = related.balance()
date_s = related[0].meta.date.strftime('%Y-%m-%d')
print(
f"{prefix}{invoice}:",
f" {balance} outstanding since {date_s}",
sep='\n', file=out_file,
)
prefix = '\n'
def _primary_rt_id(related: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
rt_ids = related.all_meta_links('rt-id')
rt_ids_count = len(rt_ids)
if rt_ids_count != 1:
raise ValueError(f"{rt_ids_count} rt-id links found")
parsed = rtutil.RT.parse(rt_ids.pop())
if parsed is None:
raise ValueError("rt-id is not a valid RT reference")
else:
return parsed
@ReportType.register('outgoing', 'outgoings', 'out')
def outgoing_report(groups: PostGroups,
out_file: TextIO,
err_file: TextIO=sys.stderr,
rt_client: Optional[rt.Rt]=None,
rt_wrapper: Optional[rtutil.RT]=None,
) -> None:
if rt_client is None or rt_wrapper is None:
raise ValueError("RT client is required but not configured")
for invoice, related in groups.items():
related = _since_last_nonzero(related)
try:
ticket_id, _ = _primary_rt_id(related)
ticket = rt_client.get_ticket(ticket_id)
# Note we only use this when ticket is None.
errmsg = f"ticket {ticket_id} not found"
except (ValueError, rt.RtError) as error:
ticket = None
errmsg = error.args[0]
if ticket is None:
print("error: can't generate outgoings report for {}"
" because no RT ticket available: {}".format(
invoice, errmsg,
), file=err_file)
continue
try:
rt_requestor = rt_client.get_user(ticket['Requestors'][0])
except (IndexError, rt.RtError):
rt_requestor = None
if rt_requestor is None:
requestor = ''
requestor_name = ''
else:
requestor_name = (
rt_requestor.get('RealName')
or ticket.get('CF.{payment-to}')
or ''
)
requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
raw_balance = -related.balance()
cost_balance = -related.balance_at_cost()
cost_balance_s = cost_balance.format(None)
if raw_balance == cost_balance:
balance_s = cost_balance_s
else:
balance_s = f'{raw_balance} ({cost_balance_s})'
contract_links = related.all_meta_links('contract')
if contract_links:
contract_s = ' , '.join(rt_wrapper.iter_urls(
contract_links, missing_fmt='<BROKEN RT LINK: {}>',
))
else:
contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
projects = [v for v in related.meta_values('project')
if isinstance(v, str)]
print(
"PAYMENT FOR APPROVAL:",
f"REQUESTOR: {requestor}",
f"TOTAL TO PAY: {balance_s}",
f"AGREEMENT: {contract_s}",
f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}",
f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}",
f"PROJECT: {', '.join(projects)}",
"\nBEANCOUNT ENTRIES:\n",
sep='\n', file=out_file,
)
last_txn: Optional[Transaction] = None
for post in related:
txn = post.meta.txn
if txn is not last_txn:
last_txn = txn
txn = rt_wrapper.txn_with_urls(txn, '{}')
bc_printer.print_entry(txn, file=out_file)
def filter_search(postings: Iterable[data.Posting],
search_terms: Iterable[SearchTerm],
) -> Iterable[data.Posting]:
@ -411,21 +414,22 @@ def main(arglist: Optional[Sequence[str]]=None,
if not groups:
print("warning: no matching entries found to report", file=stderr)
returncode |= ReturnFlag.NOTHING_TO_REPORT
else:
try:
args.report_type(
groups,
stdout,
stderr,
config.rt_client(),
config.rt_wrapper(),
report: Optional[BaseReport] = None
if args.report_type is ReportType.OUTGOING:
rt_client = config.rt_client()
if rt_client is None:
print(
"error: unable to generate outgoing report: RT client is required",
file=stderr,
)
except ValueError as exc:
print("error: unable to generate {}: {}".format(
args.report_type.__name__.replace('_', ' '),
exc.args[0],
), file=stderr)
returncode |= ReturnFlag.REPORT_ERRORS
else:
report = OutgoingReport(rt_client, stdout, stderr)
else:
report = args.report_type.value(stdout, stderr)
if report is None:
returncode |= ReturnFlag.REPORT_ERRORS
else:
report.run(groups)
return 0 if returncode == 0 else 16 + returncode
if __name__ == '__main__':

View file

@ -178,16 +178,16 @@ def test_filter_search(accrual_postings, search_terms, expect_count, check_func)
assert check_func(post)
@pytest.mark.parametrize('arg,expected', [
('balance', accrual.balance_report),
('outgoing', accrual.outgoing_report),
('bal', accrual.balance_report),
('out', accrual.outgoing_report),
('outgoings', accrual.outgoing_report),
('balance', accrual.BalanceReport),
('outgoing', accrual.OutgoingReport),
('bal', accrual.BalanceReport),
('out', accrual.OutgoingReport),
('outgoings', accrual.OutgoingReport),
])
def test_report_type_by_name(arg, expected):
assert accrual.ReportType.by_name(arg.lower()) is expected
assert accrual.ReportType.by_name(arg.title()) is expected
assert accrual.ReportType.by_name(arg.upper()) is expected
assert accrual.ReportType.by_name(arg.lower()).value is expected
assert accrual.ReportType.by_name(arg.title()).value is expected
assert accrual.ReportType.by_name(arg.upper()).value is expected
@pytest.mark.parametrize('arg', [
'unknown',
@ -260,8 +260,8 @@ def run_outgoing(invoice, postings, rt_client=None):
postings = relate_accruals_by_meta(postings, invoice)
output = io.StringIO()
errors = io.StringIO()
rt_cache = rtutil.RT(rt_client)
accrual.outgoing_report({invoice: postings}, output, errors, rt_client, rt_cache)
report = accrual.OutgoingReport(rt_client, output, errors)
report.run({invoice: postings})
return output, errors
@pytest.mark.parametrize('invoice,expected', [
@ -273,7 +273,10 @@ def run_outgoing(invoice, postings, rt_client=None):
def test_balance_report(accrual_postings, invoice, expected):
related = relate_accruals_by_meta(accrual_postings, invoice)
output = io.StringIO()
accrual.balance_report({invoice: related}, output)
errors = io.StringIO()
report = accrual.BalanceReport(output, errors)
report.run({invoice: related})
assert not errors.getvalue()
check_output(output, [invoice, expected])
def test_outgoing_report(accrual_postings):