diff --git a/conservancy_beancount/reports/accrual.py b/conservancy_beancount/reports/accrual.py
new file mode 100644
index 0000000..7d74bb4
--- /dev/null
+++ b/conservancy_beancount/reports/accrual.py
@@ -0,0 +1,352 @@
+#!/usr/bin/env python3
+"""accrual.py - Various reports about accruals"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import argparse
+import datetime
+import enum
+import collections
+import re
+import sys
+
+from typing import (
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ Mapping,
+ NamedTuple,
+ Optional,
+ Sequence,
+ Set,
+ TextIO,
+ Tuple,
+)
+from ..beancount_types import (
+ Error,
+ MetaKey,
+ MetaValue,
+ Transaction,
+)
+
+import rt
+
+from beancount.parser import printer as bc_printer
+
+from . import core
+from .. import config as configmod
+from .. import data
+from .. import filters
+from .. import rtutil
+
+PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
+ReportFunc = Callable[
+ [PostGroups, TextIO, TextIO, Optional[rt.Rt], Optional[rtutil.RT]],
+ None
+]
+RTObject = Mapping[str, str]
+
+class ReportType:
+ NAMES: Set[str] = set()
+ BY_NAME: Dict[str, ReportFunc] = {}
+
+ @classmethod
+ def register(cls, *names: str) -> Callable[[ReportFunc], ReportFunc]:
+ def register_wrapper(func: ReportFunc) -> ReportFunc:
+ for name in names:
+ cls.BY_NAME[name] = func
+ cls.NAMES.add(names[0])
+ return func
+ return register_wrapper
+
+ @classmethod
+ def by_name(cls, name: str) -> ReportFunc:
+ try:
+ return cls.BY_NAME[name.lower()]
+ except KeyError:
+ raise ValueError(f"unknown report type {name!r}") from None
+
+ @classmethod
+ def default_for(cls, groups: PostGroups) -> Tuple[ReportFunc, PostGroups]:
+ nonzero_groups = {
+ key: group for key, group in groups.items()
+ if not group.balance().is_zero()
+ }
+ if len(nonzero_groups) == 1 and all(
+ post.account.is_under('Liabilities')
+ for group in nonzero_groups.values()
+ for post in group
+ ):
+ report_name = 'outgoing'
+ else:
+ report_name = 'balance'
+ return cls.BY_NAME[report_name], nonzero_groups or groups
+
+
+class ReturnFlag(enum.IntFlag):
+ LOAD_ERRORS = 1
+ CONSISTENCY_ERRORS = 2
+ REPORT_ERRORS = 4
+ NOTHING_TO_REPORT = 8
+
+
+class SearchTerm(NamedTuple):
+ meta_key: MetaKey
+ pattern: str
+
+ @classmethod
+ def parse(cls, s: str) -> 'SearchTerm':
+ key_match = re.match(r'^[a-z][-\w]*=', s)
+ key: Optional[str]
+ if key_match:
+ key, _, raw_link = s.partition('=')
+ else:
+ key = None
+ raw_link = s
+ rt_ids = rtutil.RT.parse(raw_link)
+ if rt_ids is None:
+ rt_ids = rtutil.RT.parse('rt:' + raw_link)
+ if rt_ids is None:
+ if key is None:
+ key = 'invoice'
+ pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
+ else:
+ ticket_id, attachment_id = rt_ids
+ if key is None:
+ key = 'rt-id' if attachment_id is None else 'invoice'
+ pattern = rtutil.RT.metadata_regexp(
+ ticket_id,
+ attachment_id,
+ first_link_only=key == 'rt-id' and attachment_id is None,
+ )
+ return cls(key, pattern)
+
+
+def consistency_check(groups: PostGroups) -> Iterable[Error]:
+ for key, related in groups.items():
+ for checked_meta in ['contract', 'entity', 'purchase-order']:
+ meta_values = related.meta_values(checked_meta)
+ if len(meta_values) != 1:
+ errmsg = f'inconsistent {checked_meta} for invoice {key}'
+ for post in related:
+ yield Error(
+ post.meta,
+ f'{errmsg}: {post.meta.get(checked_meta)}',
+ post.meta.txn,
+ )
+
+def _since_last_nonzero(posts: core.RelatedPostings) -> core.RelatedPostings:
+ retval = core.RelatedPostings()
+ for post in posts:
+ if retval.balance().is_zero():
+ retval.clear()
+ retval.add(post)
+ return retval
+
+@ReportType.register('balance', 'bal')
+def balance_report(groups: PostGroups,
+ out_file: TextIO,
+ err_file: TextIO=sys.stderr,
+ rt_client: Optional[rt.Rt]=None,
+ rt_wrapper: Optional[rtutil.RT]=None,
+) -> None:
+ prefix = ''
+ for invoice, related in groups.items():
+ related = _since_last_nonzero(related)
+ balance = related.balance()
+ date_s = related[0].meta.date.strftime('%Y-%m-%d')
+ print(
+ f"{prefix}{invoice}:",
+ f" {balance} outstanding since {date_s}",
+ sep='\n', file=out_file,
+ )
+ prefix = '\n'
+
+def _primary_rt_id(related: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
+ rt_ids = related.all_meta_links('rt-id')
+ rt_ids_count = len(rt_ids)
+ if rt_ids_count != 1:
+ raise ValueError(f"{rt_ids_count} rt-id links found")
+ parsed = rtutil.RT.parse(rt_ids.pop())
+ if parsed is None:
+ raise ValueError("rt-id is not a valid RT reference")
+ else:
+ return parsed
+
+@ReportType.register('outgoing', 'outgoings', 'out')
+def outgoing_report(groups: PostGroups,
+ out_file: TextIO,
+ err_file: TextIO=sys.stderr,
+ rt_client: Optional[rt.Rt]=None,
+ rt_wrapper: Optional[rtutil.RT]=None,
+) -> None:
+ if rt_client is None or rt_wrapper is None:
+ raise ValueError("RT client is required but not configured")
+ for invoice, related in groups.items():
+ related = _since_last_nonzero(related)
+ try:
+ ticket_id, _ = _primary_rt_id(related)
+ ticket = rt_client.get_ticket(ticket_id)
+ # Note we only use this when ticket is None.
+ errmsg = f"ticket {ticket_id} not found"
+ except (ValueError, rt.RtError) as error:
+ ticket = None
+ errmsg = error.args[0]
+ if ticket is None:
+ print("error: can't generate outgoings report for {}"
+ " because no RT ticket available: {}".format(
+ invoice, errmsg,
+ ), file=err_file)
+ continue
+
+ try:
+ rt_requestor = rt_client.get_user(ticket['Requestors'][0])
+ except (IndexError, rt.RtError):
+ rt_requestor = None
+ if rt_requestor is None:
+ requestor = ''
+ payee = ''
+ else:
+ requestor = '{RealName} <{EmailAddress}>'.format_map(rt_requestor)
+ payee = rt_requestor['RealName']
+
+ contract_links = related.all_meta_links('contract')
+ if contract_links:
+ contract_s = ' , '.join(rt_wrapper.iter_urls(
+ contract_links, '<{}>', '{}', '',
+ ))
+ else:
+ contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
+ projects = [v for v in related.meta_values('project')
+ if isinstance(v, str)]
+ payment_method = ticket.get('CF.{payment-method}', '')
+
+ print(
+ "PAYMENT FOR APPROVAL:",
+ f"REQUESTOR: {requestor}",
+ f"TOTAL TO PAY: {-related.balance()}",
+ f"AGREEMENT: {contract_s}",
+ f"PAYMENT TO: {payee}",
+ f"PAYMENT METHOD: {payment_method}",
+ f"PROJECT: {', '.join(projects)}",
+ "\nBEANCOUNT ENTRIES:",
+ sep='\n', file=out_file,
+ )
+
+ last_txn: Optional[Transaction] = None
+ for post in related:
+ txn = post.meta.txn
+ if txn is not last_txn:
+ last_txn = txn
+ txn = rt_wrapper.txn_with_urls(txn)
+ bc_printer.print_entry(txn, file=out_file)
+
+def filter_search(postings: Iterable[data.Posting],
+ search_terms: Iterable[SearchTerm],
+) -> Iterable[data.Posting]:
+ postings = (post for post in postings if post.account.is_under(
+ 'Assets:Receivable', 'Liabilities:Payable',
+ ))
+ for meta_key, pattern in search_terms:
+ postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
+ return postings
+
+def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--report-type', '-t',
+ metavar='NAME',
+ type=ReportType.by_name,
+ help="""The type of report to generate, either `balance` or `outgoing`.
+If not specified, the default is `outgoing` for search criteria that return a
+single outstanding payable, and `balance` any other time.
+""")
+ parser.add_argument(
+ '--since',
+ metavar='YEAR',
+ type=int,
+ default=-1,
+ help="""How far back to search the books for related transactions.
+You can either specify a fiscal year, or a negative offset from the current
+fiscal year, to start loading entries from. The default is -1 (start from the
+previous fiscal year).
+""")
+ parser.add_argument(
+ 'search',
+ nargs=argparse.ZERO_OR_MORE,
+ help="""Report on accruals that match this criteria. The format is
+NAME=TERM. TERM is a link or word that must exist in a posting's NAME
+metadata to match. A single ticket number is a shortcut for
+`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
+`TIK/ATT` format, is a shortcut for `invoice=LINK`.
+""")
+ args = parser.parse_args(arglist)
+ args.search_terms = [SearchTerm.parse(s) for s in args.search]
+ return args
+
+def main(arglist: Optional[Sequence[str]]=None,
+ stdout: TextIO=sys.stdout,
+ stderr: TextIO=sys.stderr,
+ config: Optional[configmod.Config]=None,
+) -> int:
+ returncode = 0
+ args = parse_arguments(arglist)
+ if config is None:
+ config = configmod.Config()
+ config.load_file()
+ books_loader = config.books_loader()
+ if books_loader is not None:
+ entries, load_errors, _ = books_loader.load_fy_range(args.since)
+ else:
+ entries = []
+ source = {
+ 'filename': str(config.config_file_path()),
+ 'lineno': 1,
+ }
+ load_errors = [Error(source, "no books to load in configuration", None)]
+ postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
+ groups = core.RelatedPostings.group_by_meta(postings, 'invoice')
+ meta_errors = consistency_check(groups)
+ for error in load_errors:
+ bc_printer.print_error(error, file=stderr)
+ returncode |= ReturnFlag.LOAD_ERRORS
+ for error in meta_errors:
+ bc_printer.print_error(error, file=stderr)
+ returncode |= ReturnFlag.CONSISTENCY_ERRORS
+ if args.report_type is None:
+ args.report_type, groups = ReportType.default_for(groups)
+ if not groups:
+ print("warning: no matching entries found to report", file=stderr)
+ returncode |= ReturnFlag.NOTHING_TO_REPORT
+ else:
+ try:
+ args.report_type(
+ groups,
+ stdout,
+ stderr,
+ config.rt_client(),
+ config.rt_wrapper(),
+ )
+ except ValueError as exc:
+ print("error: unable to generate {}: {}".format(
+ args.report_type.__name__.replace('_', ' '),
+ exc.args[0],
+ ), file=stderr)
+ returncode |= ReturnFlag.REPORT_ERRORS
+ return 0 if returncode == 0 else 16 + returncode
+
+if __name__ == '__main__':
+ exit(main())
diff --git a/setup.py b/setup.py
index 1a8ce8c..3ef7431 100755
--- a/setup.py
+++ b/setup.py
@@ -26,5 +26,9 @@ setup(
],
packages=['conservancy_beancount'],
- entry_points={},
+ entry_points={
+ 'console_scripts': [
+ 'accrual-report = conservancy_beancount.reports.accrual:main',
+ ],
+ },
)
diff --git a/tests/test_reports_accrual.py b/tests/test_reports_accrual.py
new file mode 100644
index 0000000..eb49c1e
--- /dev/null
+++ b/tests/test_reports_accrual.py
@@ -0,0 +1,410 @@
+"""test_reports_accrual - Unit tests for accrual report"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import collections
+import copy
+import io
+import itertools
+import re
+
+import pytest
+
+from . import testutil
+
+from beancount import loader as bc_loader
+from conservancy_beancount import data
+from conservancy_beancount import rtutil
+from conservancy_beancount.reports import accrual
+from conservancy_beancount.reports import core
+
+_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
+ACCRUALS_COUNT = sum(
+ 1
+ for entry in _accruals_load[0]
+ for post in getattr(entry, 'postings', ())
+ if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
+)
+
+ACCOUNTS = [
+ 'Assets:Receivable:Accounts',
+ 'Assets:Receivable:Loans',
+ 'Liabilities:Payable:Accounts',
+ 'Liabilities:Payable:Vacation',
+]
+
+CONSISTENT_METADATA = [
+ 'contract',
+ 'entity',
+ 'purchase-order',
+]
+
+class RTClient(testutil.RTClient):
+ TICKET_DATA = {
+ '505': [],
+ '510': [
+ ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
+ ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
+ ('5105', 'payment.png', 'image/png', '51.5k'),
+ ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
+ ],
+ '515': [],
+ }
+
+
+@pytest.fixture
+def accrual_entries():
+ return copy.deepcopy(_accruals_load[0])
+
+@pytest.fixture
+def accrual_postings():
+ entries = copy.deepcopy(_accruals_load[0])
+ return data.Posting.from_entries(entries)
+
+def check_link_regexp(regexp, match_s, first_link_only=False):
+ assert regexp
+ assert re.search(regexp, match_s)
+ assert re.search(regexp, match_s + ' postlink')
+ assert re.search(regexp, match_s + '0') is None
+ assert re.search(regexp, '1' + match_s) is None
+ end_match = re.search(regexp, 'prelink ' + match_s)
+ if first_link_only:
+ assert end_match is None
+ else:
+ assert end_match
+
+@pytest.mark.parametrize('link_fmt', [
+ '{}',
+ 'rt:{}',
+ 'rt://ticket/{}',
+])
+def test_search_term_parse_rt_shortcuts(link_fmt):
+ key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
+ assert key == 'rt-id'
+ check_link_regexp(regexp, 'rt:220', first_link_only=True)
+ check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
+
+@pytest.mark.parametrize('link_fmt', [
+ '{}/{}',
+ 'rt:{}/{}',
+ 'rt://ticket/{}/attachments/{}',
+])
+def test_search_term_parse_invoice_shortcuts(link_fmt):
+ key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
+ assert key == 'invoice'
+ check_link_regexp(regexp, 'rt:330/660')
+ check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
+
+@pytest.mark.parametrize('key', [
+ 'approval',
+ 'contract',
+ 'invoice',
+])
+def test_search_term_parse_metadata_rt_shortcut(key):
+ actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
+ assert actual_key == key
+ check_link_regexp(regexp, 'rt:440/420')
+ check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
+
+@pytest.mark.parametrize('key', [
+ None,
+ 'approval',
+ 'contract',
+ 'invoice',
+])
+def test_search_term_parse_repo_link(key):
+ document = '1234.pdf'
+ if key is None:
+ key = 'invoice'
+ search = document
+ else:
+ search = f'{key}={document}'
+ actual_key, regexp = accrual.SearchTerm.parse(search)
+ assert actual_key == key
+ check_link_regexp(regexp, document)
+
+@pytest.mark.parametrize('search,unmatched', [
+ ('1234.pdf', '1234_pdf'),
+])
+def test_search_term_parse_regexp_escaping(search, unmatched):
+ _, regexp = accrual.SearchTerm.parse(search)
+ assert re.search(regexp, unmatched) is None
+
+@pytest.mark.parametrize('search_terms,expect_count,check_func', [
+ ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
+ 'Assets:Receivable:', 'Liabilities:Payable:',
+ )),
+ ([('invoice', '^rt:505/5050$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
+ ([('rt-id', r'^rt:\D+515$')], 1, lambda post: post.meta['entity'] == 'DonorB'),
+ ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
+ ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
+ lambda post: post.meta['invoice'].startswith('rt:510/')),
+ ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
+])
+def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
+ actual = list(accrual.filter_search(accrual_postings, search_terms))
+ if expect_count < ACCRUALS_COUNT:
+ assert ACCRUALS_COUNT > len(actual) >= expect_count
+ else:
+ assert len(actual) == ACCRUALS_COUNT
+ for post in actual:
+ assert check_func(post)
+
+@pytest.mark.parametrize('arg,expected', [
+ ('balance', accrual.balance_report),
+ ('outgoing', accrual.outgoing_report),
+ ('bal', accrual.balance_report),
+ ('out', accrual.outgoing_report),
+ ('outgoings', accrual.outgoing_report),
+])
+def test_report_type_by_name(arg, expected):
+ assert accrual.ReportType.by_name(arg.lower()) is expected
+ assert accrual.ReportType.by_name(arg.title()) is expected
+ assert accrual.ReportType.by_name(arg.upper()) is expected
+
+@pytest.mark.parametrize('arg', [
+ 'unknown',
+ 'blance',
+ 'outgong',
+])
+def test_report_type_by_unknown_name(arg):
+ # Raising ValueError helps argparse generate good messages.
+ with pytest.raises(ValueError):
+ accrual.ReportType.by_name(arg)
+
+@pytest.mark.parametrize('invoice,expected', [
+ # No outstanding balance
+ ('rt:505/5050', accrual.balance_report),
+ ('rt:510/5100', accrual.balance_report),
+ # Outstanding receivable
+ ('rt://ticket/515/attachments/5150', accrual.balance_report),
+ # Outstanding payable
+ ('rt:510/6100', accrual.outgoing_report),
+])
+def test_default_report_type(accrual_postings, invoice, expected):
+ related = core.RelatedPostings()
+ for post in accrual_postings:
+ if (post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
+ and post.meta.get('invoice') == invoice):
+ related.add(post)
+ groups = {invoice: related}
+ report_type, report_groups = accrual.ReportType.default_for(groups)
+ assert report_type is expected
+ assert report_groups == groups
+
+@pytest.mark.parametrize('entity,exp_type,exp_invoices', [
+ ('^Lawyer$', accrual.outgoing_report, {'rt:510/6100'}),
+ ('^Donor[AB]$', accrual.balance_report, {'rt://ticket/515/attachments/5150'}),
+ ('^(Lawyer|DonorB)$', accrual.balance_report,
+ {'rt:510/6100', 'rt://ticket/515/attachments/5150'}),
+])
+def test_default_report_type_multi_invoices(accrual_postings, entity, exp_type, exp_invoices):
+ groups = core.RelatedPostings.group_by_meta((
+ post for post in accrual_postings
+ if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
+ and re.match(entity, post.meta.get('entity', ''))
+ ), 'invoice')
+ report_type, report_groups = accrual.ReportType.default_for(groups)
+ assert report_type is exp_type
+ assert set(report_groups.keys()) == exp_invoices
+ assert all(len(related) > 0 for related in report_groups.values())
+
+@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
+ CONSISTENT_METADATA,
+ ACCOUNTS,
+))
+def test_consistency_check_when_consistent(meta_key, account):
+ invoice = f'test-{meta_key}-invoice'
+ meta = {
+ 'invoice': invoice,
+ meta_key: f'test-{meta_key}-value',
+ }
+ txn = testutil.Transaction(postings=[
+ (account, 100, meta),
+ (account, -100, meta),
+ ])
+ related = core.RelatedPostings(data.Posting.from_txn(txn))
+ assert not list(accrual.consistency_check({invoice: related}))
+
+@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
+ ['approval', 'fx-rate', 'statement'],
+ ACCOUNTS,
+))
+def test_consistency_check_ignored_metadata(meta_key, account):
+ invoice = f'test-{meta_key}-invoice'
+ txn = testutil.Transaction(postings=[
+ (account, 100, {'invoice': invoice, meta_key: 'credit'}),
+ (account, -100, {'invoice': invoice, meta_key: 'debit'}),
+ ])
+ related = core.RelatedPostings(data.Posting.from_txn(txn))
+ assert not list(accrual.consistency_check({invoice: related}))
+
+@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
+ CONSISTENT_METADATA,
+ ACCOUNTS,
+))
+def test_consistency_check_when_inconsistent(meta_key, account):
+ invoice = f'test-{meta_key}-invoice'
+ txn = testutil.Transaction(postings=[
+ (account, 100, {'invoice': invoice, meta_key: 'credit', 'lineno': 1}),
+ (account, -100, {'invoice': invoice, meta_key: 'debit', 'lineno': 2}),
+ ])
+ related = core.RelatedPostings(data.Posting.from_txn(txn))
+ errors = list(accrual.consistency_check({invoice: related}))
+ for exp_lineno, (actual, exp_msg) in enumerate(itertools.zip_longest(errors, [
+ f'inconsistent {meta_key} for invoice {invoice}: credit',
+ f'inconsistent {meta_key} for invoice {invoice}: debit',
+ ]), 1):
+ assert actual.message == exp_msg
+ assert actual.entry is txn
+ assert actual.source.get('lineno') == exp_lineno
+
+def check_output(output, expect_patterns):
+ output.seek(0)
+ for pattern in expect_patterns:
+ assert any(re.search(pattern, line) for line in output), \
+ f"{pattern!r} not found in output"
+
+@pytest.mark.parametrize('invoice,expected', [
+ ('rt:505/5050', "Zero balance outstanding since 2020-05-05"),
+ ('rt:510/5100', "Zero balance outstanding since 2020-05-10"),
+ ('rt:510/6100', "-280.00 USD outstanding since 2020-06-10"),
+ ('rt://ticket/515/attachments/5150', "1500.00 USD outstanding since 2020-05-15",),
+])
+def test_balance_report(accrual_postings, invoice, expected):
+ related = core.RelatedPostings(
+ post for post in accrual_postings
+ if post.meta.get('invoice') == invoice
+ and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
+ )
+ output = io.StringIO()
+ accrual.balance_report({invoice: related}, output)
+ check_output(output, [invoice, expected])
+
+def test_outgoing_report(accrual_postings):
+ invoice = 'rt:510/6100'
+ related = core.RelatedPostings(
+ post for post in accrual_postings
+ if post.meta.get('invoice') == invoice
+ and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
+ )
+ output = io.StringIO()
+ errors = io.StringIO()
+ rt_client = RTClient()
+ rt_cache = rtutil.RT(rt_client)
+ accrual.outgoing_report({invoice: related}, output, errors, rt_client, rt_cache)
+ assert not errors.getvalue()
+ rt_url = rt_client.DEFAULT_URL[:-9]
+ rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>')
+ contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>')
+ check_output(output, [
+ r'^PAYMENT FOR APPROVAL:$',
+ r'^REQUESTOR: Mx\. 510 $',
+ r'^TOTAL TO PAY: 280\.00 USD$',
+ fr'^AGREEMENT: {contract_url}',
+ r'^PAYMENT TO: Mx\. 510$',
+ r'^PAYMENT METHOD: payment method 510$',
+ r'^BEANCOUNT ENTRIES:$',
+ # For each transaction, check for the date line, a metadata, and the
+ # Expenses posting.
+ r'^\s*2020-06-10\s',
+ fr'^\s+rt-id: "{rt_id_url}"$',
+ r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
+ r'^\s*2020-06-12\s',
+ fr'^\s+contract: "{contract_url}"$',
+ r'^\s+Expenses:FilingFees\s+60\.00 USD$',
+ ])
+
+def run_main(arglist, config):
+ output = io.StringIO()
+ errors = io.StringIO()
+ retcode = accrual.main(arglist, output, errors, config)
+ return retcode, output, errors
+
+def check_main_fails(arglist, config, error_flags, error_patterns):
+ retcode, output, errors = run_main(arglist, config)
+ assert retcode > 16
+ assert (retcode - 16) & error_flags
+ check_output(errors, error_patterns)
+ assert not output.getvalue()
+
+@pytest.mark.parametrize('arglist', [
+ ['--report-type=outgoing'],
+ ['510'],
+ ['510/6100'],
+ ['entity=Lawyer'],
+])
+def test_main_outgoing_report(arglist):
+ rt_client = RTClient()
+ config = testutil.TestConfig(
+ books_path=testutil.test_path('books/accruals.beancount'),
+ rt_client=rt_client,
+ )
+ retcode, output, errors = run_main(arglist, config)
+ assert not errors.getvalue()
+ assert retcode == 0
+ rt_url = rt_client.DEFAULT_URL[:-9]
+ rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>')
+ contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>')
+ check_output(output, [
+ r'^REQUESTOR: Mx\. 510 $',
+ r'^TOTAL TO PAY: 280\.00 USD$',
+ r'^\s*2020-06-12\s',
+ r'^\s+Expenses:FilingFees\s+60\.00 USD$',
+ ])
+
+@pytest.mark.parametrize('arglist', [
+ ['-t', 'balance'],
+ ['515'],
+ ['515/5150'],
+ ['entity=DonorB'],
+])
+def test_main_balance_report(arglist):
+ config = testutil.TestConfig(
+ books_path=testutil.test_path('books/accruals.beancount'),
+ )
+ retcode, output, errors = run_main(arglist, config)
+ assert not errors.getvalue()
+ assert retcode == 0
+ check_output(output, [
+ r'\brt://ticket/515/attachments/5150:$',
+ r'^\s+1500\.00 USD outstanding since 2020-05-15$',
+ ])
+
+def test_main_no_books():
+ check_main_fails([], testutil.TestConfig(), 1 | 8, [
+ r':1: +no books to load in configuration\b',
+ ])
+
+@pytest.mark.parametrize('arglist', [
+ ['499'],
+ ['505/99999'],
+ ['entity=NonExistent'],
+])
+def test_main_no_matches(arglist):
+ config = testutil.TestConfig(
+ books_path=testutil.test_path('books/accruals.beancount'),
+ )
+ check_main_fails(arglist, config, 8, [
+ r'^warning: no matching entries found to report$',
+ ])
+
+def test_main_no_rt():
+ config = testutil.TestConfig(
+ books_path=testutil.test_path('books/accruals.beancount'),
+ )
+ check_main_fails(['-t', 'out'], config, 4, [
+ r'^error: unable to generate outgoing report: RT client is required\b',
+ ])
diff --git a/tests/testutil.py b/tests/testutil.py
index 3612091..4d25277 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -16,6 +16,7 @@
import datetime
import itertools
+import re
import beancount.core.amount as bc_amount
import beancount.core.data as bc_data
@@ -23,7 +24,7 @@ import beancount.core.data as bc_data
from decimal import Decimal
from pathlib import Path
-from conservancy_beancount import rtutil
+from conservancy_beancount import books, rtutil
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
@@ -172,12 +173,25 @@ def OpeningBalance(acct=None, **txn_meta):
(acct, -260),
])
+class TestBooksLoader(books.Loader):
+ def __init__(self, source):
+ self.source = source
+
+ def fy_range_string(self, from_fy=None, to_fy=None, plugins=None):
+ return f'include "{self.source}"'
+
+
class TestConfig:
def __init__(self, *,
+ books_path=None,
payment_threshold=0,
repo_path=None,
rt_client=None,
):
+ if books_path is None:
+ self._books_loader = None
+ else:
+ self._books_loader = TestBooksLoader(books_path)
self._payment_threshold = Decimal(payment_threshold)
self.repo_path = test_path(repo_path)
self._rt_client = rt_client
@@ -186,6 +200,12 @@ class TestConfig:
else:
self._rt_wrapper = rtutil.RT(rt_client)
+ def books_loader(self):
+ return self._books_loader
+
+ def config_file_path(self):
+ return test_path('userconfig/conservancy_beancount/config.ini')
+
def payment_threshold(self):
return self._payment_threshold
@@ -316,4 +336,25 @@ class RTClient:
return {
'id': 'ticket/{}'.format(ticket_id_s),
'numerical_id': ticket_id_s,
+ 'CF.{payment-method}': f'payment method {ticket_id_s}',
+ 'Requestors': [
+ f'mx{ticket_id_s}@example.org',
+ 'requestor2@example.org',
+ ],
+ }
+
+ def get_user(self, user_id):
+ user_id_s = str(user_id)
+ match = re.search(r'(\d+)@', user_id_s)
+ if match is None:
+ email = f'mx{user_id_s}@example.org'
+ user_id_num = int(user_id_s)
+ else:
+ email = user_id_s
+ user_id_num = int(match.group(1))
+ return {
+ 'id': f'user/{user_id_num}',
+ 'EmailAddress': email,
+ 'Name': email,
+ 'RealName': f'Mx. {user_id_num}',
}