"""test_reports_accrual - Unit tests for accrual report""" # Copyright © 2020 Brett Smith # License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0 # # Full copyright and licensing details can be found at toplevel file # LICENSE.txt in the repository. import collections import copy import datetime import io import itertools import operator import re import babel.numbers import odf.opendocument import odf.table import odf.text import pytest from . import testutil from decimal import Decimal from typing import NamedTuple, Optional, Sequence from beancount.core import data as bc_data from beancount import loader as bc_loader from conservancy_beancount import cliutil from conservancy_beancount import data from conservancy_beancount import rtutil from conservancy_beancount.reports import accrual _accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount')) ACCRUAL_TXNS = [ entry for entry in _accruals_load[0] if hasattr(entry, 'narration') and entry.narration != 'Opening balances' ] ACCRUALS_COUNT = sum( 1 for txn in ACCRUAL_TXNS for post in txn.postings if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:')) ) # Accounts where invoice and entity metadata are both used to identify accruals. INVOICE_ACCOUNTS = [ 'Assets:Receivable:Accounts', 'Assets:Receivable:Loans', 'Liabilities:Payable:Accounts', ] # Accounts where only entity metadata is used to identify accruals. ENTITY_ACCOUNTS = [ 'Assets:Prepaid:Expenses', 'Liabilities:Payable:Vacation', ] ACCOUNTS = INVOICE_ACCOUNTS + ENTITY_ACCOUNTS AGE_SUM_RE = re.compile(r'(?:\b(\d+) Years?)?(?: ?\b(\d+) Days?)?[–:]') class AgingRow(NamedTuple): date: datetime.date entity: Sequence[str] amount: Optional[Sequence[bc_data.Amount]] at_cost: bc_data.Amount rt_id: Sequence[str] invoice: Sequence[str] project: Sequence[str] @classmethod def make_simple(cls, date, entity, at_cost, invoice, rt_id=None, orig_amount=None, project='Conservancy'): if isinstance(date, str): date = datetime.datetime.strptime(date, '%Y-%m-%d').date() if not isinstance(at_cost, tuple): at_cost = testutil.Amount(at_cost) if rt_id is None and invoice.startswith('rt:'): rt_id, _, _ = invoice.partition('/') return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice], [project]) def check_row_match(self, sheet_row): cells = testutil.ODSCell.from_row(sheet_row) assert len(cells) >= len(self) cells = iter(cells) assert next(cells).value == self.date assert next(cells).text == '\0'.join(self.entity) assert next(cells).text == '\0'.join( babel.numbers.format_currency(number, currency, format_type='accounting') for number, currency in self.amount or () ) usd_cell = next(cells) assert usd_cell.value_type == 'currency' assert usd_cell.value == self.at_cost.number assert next(cells).text == '\0'.join(self.project) for index, cell in enumerate(cells): links = cell.getElementsByType(odf.text.A) assert len(links) == len(cell.childNodes) assert index >= 1 AGING_AP = [ AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'), AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'), AgingRow.make_simple('2010-04-25', 'Vendor', 200, 'FIXME'), AgingRow.make_simple('2010-04-30', 'Vendor', 220, 'rt:310/3120'), AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'), AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200', orig_amount=[testutil.Amount(1000, 'EUR')]), AgingRow.make_simple('2010-06-20', 'StateGov', 50, 'Invoices/2010StateRegistration.pdf'), ] AGING_AR = [ AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'), AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500, 'rt://ticket/515/attachments/5150'), AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700', project='Development Grant'), AgingRow.make_simple('2010-09-15', 'GrantCo', 6000, 'rt:470/4700', project='Development Grant'), ] class RTClient(testutil.RTClient): TICKET_DATA = { '40': [ ('400', 'invoice feb.csv', 'text/csv', '40.4k'), ], '44': [ ('440', 'invoice feb.csv', 'text/csv', '40.4k'), ], '310': [ ('3100', 'VendorContract.pdf', 'application/pdf', '1.7m'), ('3120', 'VendorInvoiceB.pdf', 'application/pdf', '1.8m'), ], '490': [], '505': [], '510': [ ('4000', 'contract.pdf', 'application/pdf', '1.4m'), ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'), ('5105', 'payment.png', 'image/png', '51.5k'), ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'), ], '515': [], '520': [], } @pytest.fixture def accrual_postings(): return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS)) def accruals_by_meta(postings, value, key='invoice', wrap_type=iter): return wrap_type( post for post in postings if post.meta.get(key) == value and post.account.is_under('Assets:Receivable', 'Liabilities:Payable') ) def find_row_by_text(row_source, want_text): for row in row_source: try: found_row = row.childNodes[0].text == want_text except IndexError: found_row = False if found_row: return row pytest.fail(f"did not find row with text {want_text!r}") def check_age_sum(aging_rows, row, date): text = row.firstChild.text ages = [int(match.group(1) or 0) * 365 + int(match.group(2) or 0) for match in AGE_SUM_RE.finditer(text)] if len(ages) == 1: # datetime only supports a 10K year range so this should cover all of it if text.startswith('Total Aged Over '): age_range = range(ages[0], 3650000) else: age_range = range(-3650000, ages[0]) elif len(ages) == 2: age_range = range(*ages) else: pytest.fail(f"row has incorrect age matches: {ages!r}") assert row.lastChild.value == sum( row.at_cost.number for row in aging_rows if row.at_cost.number > 0 and (date - row.date).days in age_range ) return row.lastChild.value def check_aging_sheet(sheet, aging_rows, date): if not aging_rows: return rows = iter(sheet.getElementsByType(odf.table.TableRow)) expect_rows = iter(aging_rows) row0 = find_row_by_text(rows, aging_rows[0].date.isoformat()) next(expect_rows).check_row_match(row0) for actual, expected in zip(rows, expect_rows): expected.check_row_match(actual) row0 = find_row_by_text(rows, "Total Aged Over 1 Year: ") aging_sum = check_age_sum(aging_rows, row0, date) sums = 0 for row in rows: if not row.firstChild: pass elif row.firstChild.text.startswith("Total Unpaid"): assert row.lastChild.value == aging_sum sums += 1 else: aging_sum += check_age_sum(aging_rows, row, date) assert sums > 1 def check_aging_ods(ods_file, date, recv_rows=AGING_AR, pay_rows=AGING_AP): ods_file.seek(0) ods = odf.opendocument.load(ods_file) sheets = ods.spreadsheet.getElementsByType(odf.table.Table) assert len(sheets) >= 2 check_aging_sheet(sheets[0], recv_rows, date) check_aging_sheet(sheets[1], pay_rows, date) @pytest.mark.parametrize('search_terms,expect_count,check_func', [ ([], ACCRUALS_COUNT, lambda post: post.account.is_under( 'Assets:Receivable:', 'Liabilities:Payable:', )), ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'), ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'), ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'), ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2, lambda post: post.meta['invoice'].startswith('rt:510/')), ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False), ]) def test_filter_search(accrual_postings, search_terms, expect_count, check_func): search_terms = [cliutil.SearchTerm._make(query) for query in search_terms] actual = list(accrual.filter_search(accrual_postings, search_terms)) if expect_count < ACCRUALS_COUNT: assert ACCRUALS_COUNT > len(actual) >= expect_count else: assert len(actual) == ACCRUALS_COUNT for post in actual: assert check_func(post) @pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values( INVOICE_ACCOUNTS, ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES], itertools.count(1), )) def test_make_consistent_bad_invoice(acct_name, invoice, day): if acct_name.startswith('Assets:'): mult = 10 else: mult = -10 txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[ (acct_name, index * mult, {'invoice': invoice, 'entity': f'BadInvoice{day}'}) for index in range(1, 4) ]) postings = data.Posting.from_txn(txn) consistent = dict(accrual.AccrualPostings.make_consistent(iter(postings))) assert len(consistent) == 1 _, actual = consistent.popitem() assert len(actual) == 3 for act_post, exp_post in zip(actual, postings): assert act_post == exp_post def test_make_consistent_across_accounts(): invoice = 'Invoices/CrossAccount.pdf' txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[ (acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'}) for acct_name in ACCOUNTS ]) consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn))) assert len(consistent) == len(ACCOUNTS) for key, posts in consistent.items(): assert len(posts) == 1 def test_make_consistent_both_invoice_and_account(): txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[ (acct_name, 150) for acct_name in ACCOUNTS ]) consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn))) assert len(consistent) == len(ACCOUNTS) for key, posts in consistent.items(): assert len(posts) == 1 @pytest.mark.parametrize('acct_name', ACCOUNTS) def test_make_consistent_across_entity(acct_name): amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg txn = testutil.Transaction(postings=[ (acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'}) for n in range(1, 4) ]) consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn))) assert len(consistent) == 3 for key, posts in consistent.items(): assert len(posts) == 1 @pytest.mark.parametrize('acct_name', INVOICE_ACCOUNTS) def test_make_consistent_entity_differs_accrual_payment(acct_name): invoice = 'Invoices/DifferPay.pdf' txn = testutil.Transaction(postings=[ # Depending on the account, the order of the accrual and payment might # be swapped here, but that shouldn't matter. (acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}), (acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}), ]) related = list(data.Posting.from_txn(txn)) consistent = accrual.AccrualPostings.make_consistent(related) _, actual = next(consistent) assert len(actual) == len(related) assert all(post in actual for post in related) assert next(consistent, None) is None def test_make_consistent_by_date_accruals_differ(): meta = {'rt-id': '1', 'invoice': 'rt:1/2', 'entity': 'MultiDate'} entries = [testutil.Transaction(date=date, postings=[ ('Assets:Receivable:Accounts', date.day * 100, meta), ]) for date in itertools.islice(testutil.date_seq(), 3)] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 3 assert {post.units.number for group in actual for post in group} == {100, 200, 300} def test_make_consistent_by_date_with_exact_payment(): meta = {'rt-id': '1', 'invoice': 'rt:1/3', 'entity': 'OnePayment'} entries = [testutil.Transaction(date=date, postings=[( 'Assets:Receivable:Accounts', 35 * (1 if date.day % 2 else -1), meta, )]) for date in itertools.islice(testutil.date_seq(), 3)] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 2 assert sum(post.units.number for post in actual[0]) == 0 assert len(actual[1]) == 1 assert actual[1][0].meta.date.day == 3 def test_make_consistent_by_date_with_underpayment(): meta = {'rt-id': '1', 'invoice': 'rt:1/4', 'entity': 'UnderPayment'} entries = [testutil.Transaction(date=date, postings=[( 'Assets:Receivable:Accounts', 40 * (1 if date.day % 2 else -.5), meta, )]) for date in itertools.islice(testutil.date_seq(), 3)] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 2 assert len(actual[0]) == 2 assert actual[0][0].units.number == 40 assert actual[0][1].units.number == -20 assert len(actual[1]) == 1 assert actual[1][0].meta.date.day == 3 def test_make_consistent_by_date_with_overpayment(): meta = {'rt-id': '1', 'invoice': 'rt:1/5', 'entity': 'OverPayment'} entries = [testutil.Transaction(date=date, postings=[( 'Assets:Receivable:Accounts', 50 * (1 if date.day % 2 else -1.5), meta, )]) for date in itertools.islice(testutil.date_seq(), 3)] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 2 assert sum(post.units.number for post in actual[0]) == 0 assert len(actual[1]) == 2 assert actual[1][0].meta.date.day == 2 assert actual[1][0].units.number == -25 assert actual[1][1].meta.date.day == 3 def test_make_consistent_by_date_with_late_payment(): meta = {'rt-id': '1', 'invoice': 'rt:1/6', 'entity': 'LatePayment'} entries = [testutil.Transaction(date=date, postings=[( 'Assets:Receivable:Accounts', 60 * (-1 if date.day > 2 else 1), meta, )]) for date in itertools.islice(testutil.date_seq(), 3)] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 2 assert len(actual[0]) == 2 assert actual[0][0].meta.date.day == 1 assert actual[0][1].meta.date.day == 3 assert len(actual[1]) == 1 assert actual[1][0].meta.date.day == 2 def test_make_consistent_by_date_with_split_payments(): meta = {'rt-id': '1', 'invoice': 'rt:1/7', 'entity': 'SplitPayments'} entries = [testutil.Transaction(date=date, postings=[( 'Assets:Receivable:Accounts', amount, meta, )]) for date, amount in zip(testutil.date_seq(), [70, 80, -50, -100])] actual = [group for _, group in accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))] assert len(actual) == 2 assert [post.units.number for post in actual[0]] == [70, -50, -20] assert [post.units.number for post in actual[1]] == [80, -80] @pytest.mark.parametrize('account,day', itertools.product( ACCOUNTS, [1, 10, 20, 30], )) def test_make_consistent_with_three_one_split(account, day): meta = {'rt-id': '1', 'invoice': 'rt:1/8', 'entity': '3Split'} entries = [testutil.Transaction(date=datetime.date(2019, 5, dd), postings=[( account, dd, meta, )]) for dd in [5, 15, 25]] entries.insert(day // 10, testutil.Transaction( date=datetime.date(2019, 5, day), postings=[(account, -45, meta)], )) postings = data.Posting.from_entries(entries) actual = dict(accrual.AccrualPostings.make_consistent(iter(postings))) if account.startswith('Assets:'): group_count = 3 post_count = 2 else: group_count = 1 post_count = 4 assert len(actual) == group_count for related in actual.values(): assert len(related) == post_count assert sum(post.units.number for post in related) == 0 @pytest.mark.parametrize('account', ACCOUNTS) def test_make_consistent_with_three_two_split(account): meta = {'rt-id': '1', 'invoice': 'rt:1/9', 'entity': '5Split'} entries = [testutil.Transaction(date=datetime.date(2019, 5, day), postings=[( account, day * (1 if day % 10 else -1.5), meta, )]) for day in range(5, 30, 5)] postings = data.Posting.from_entries(entries) actual = dict(accrual.AccrualPostings.make_consistent(iter(postings))) if account.startswith('Assets:'): group_count = 3 else: group_count = 2 assert len(actual) == group_count for related in actual.values(): assert len(related) >= 2 assert sum(post.units.number for post in related) == 0 def check_output(output, expect_patterns): output.seek(0) testutil.check_lines_match(iter(output), expect_patterns) def run_outgoing(rt_id, postings, rt_client=None): if rt_client is None: rt_client = RTClient() rt_wrapper = rtutil.RT(rt_client) if not isinstance(postings, accrual.AccrualPostings): postings = accruals_by_meta(postings, rt_id, 'rt-id', wrap_type=accrual.AccrualPostings) output = io.StringIO() report = accrual.OutgoingReport(rt_wrapper, output) report.run({rt_id: postings}) return output @pytest.mark.parametrize('invoice,expected', [ ('rt:505/5050', "Zero balance outstanding since 2010-05-05"), ('rt:510/5100', "Zero balance outstanding since 2010-05-10"), ('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"), ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",), ]) def test_balance_report(accrual_postings, invoice, expected, caplog): related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings) output = io.StringIO() report = accrual.BalanceReport(output) report.run({invoice: related}) assert not caplog.records check_output(output, [invoice, expected]) def test_outgoing_report(accrual_postings, caplog): rt_client = RTClient() output = run_outgoing('rt:510', accrual_postings, rt_client) rt_url = RTClient.DEFAULT_URL[:-9] rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b' contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b' assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: Mx\. 510 $', r'^PAYMENT TO: Hon\. Mx\. 510$', r'^TOTAL TO PAY: \$280\.00$', fr'^AGREEMENT: {contract_url}', r'^BEANCOUNT ENTRIES:$', ]) # Find the date line of the first transaction. # For each transaction, check for the date line, a metadata, and the # Expenses posting. for line in output: if not line.isspace(): break assert re.match(r'\s*2010-06-10\s', line), \ "first entry line did not have expected date" check_output(output, [ fr'^\s+rt-id: "{rt_id_url}"$', r'^\s+Expenses:Services:Legal\s+220\.00 USD$', r'^\s*2010-06-10\s', fr'^\s+contract: "{contract_url}"$', r'^\s+Expenses:FilingFees\s+60\.00 USD$', ]) assert rt_client.edits == {'510': { 'CF_payment-amount': 'USD 280.00', 'CF_payment-method': 'USD ACH', }} assert 'payment-method:' not in output.getvalue() def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog): rt_client = RTClient(want_cfs=False) output = run_outgoing('rt:510', accrual_postings, rt_client) assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: $', r'^PAYMENT TO:\s*$', ]) def test_outgoing_report_fx_amounts(accrual_postings, caplog): rt_client = RTClient() output = run_outgoing('rt:520 rt:525', accrual_postings, rt_client) assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: Mx\. 520 $', r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$', ]) assert rt_client.edits == {'520': { 'CF_payment-amount': 'EUR 1,000.00 ($1,100.00)', 'CF_payment-method': 'EUR Wire', }} assert 'payment-method:' not in output.getvalue() def test_outgoing_report_multi_invoice(accrual_postings, caplog): rt_client = RTClient() output = run_outgoing('rt:310', accrual_postings, rt_client) log, = caplog.records assert log.levelname == 'WARNING' assert log.message.startswith('cannot set payment-method for rt:310: ') check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: Mx\. 310 $', r'^TOTAL TO PAY: \$420.00$', ]) assert rt_client.edits == {'310': { 'CF_payment-amount': 'USD 420.00', }} assert 'payment-method:' not in output.getvalue() @pytest.mark.parametrize('arg', [ 'usd ach', ' eur wire', 'cad vendorportal ', ' gbp check ', ]) def test_outgoing_report_good_payment_method(caplog, accrual_postings, arg): rt_id = 'rt:40' meta = {'rt-id': rt_id, 'invoice': 'rt:40/100', 'payment-method': arg} txn = testutil.Transaction(postings=[ ('Liabilities:Payable:Accounts', -100, meta), ]) rt_client = RTClient() run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client) assert not caplog.records cf_values = rt_client.edits[rt_id[3:]]['CF_payment-method'].split() assert cf_values[0] == arg.split()[0].upper() assert len(cf_values) > 1 @pytest.mark.parametrize('arg', [ '', 'usd', 'usd nonexistent', 'check', 'us check', *testutil.NON_STRING_METADATA_VALUES, ]) def test_outgoing_report_bad_payment_method(caplog, accrual_postings, arg): rt_id = 'rt:40' meta = {'rt-id': rt_id, 'invoice': 'rt:40/100', 'payment-method': arg} txn = testutil.Transaction(postings=[ ('Liabilities:Payable:Accounts', -100, meta), ]) rt_client = RTClient() run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client) assert caplog.records for log in caplog.records: assert log.levelname == 'WARNING' assert log.message.startswith(f'cannot set payment-method for {rt_id}: ') assert 'CF_payment-method' not in rt_client.edits[rt_id[3:]] def test_outgoing_report_without_rt_id(accrual_postings, caplog): invoice = 'rt://ticket/515/attachments/5150' related = accruals_by_meta( accrual_postings, invoice, wrap_type=accrual.AccrualPostings, ) output = run_outgoing(None, related) assert caplog.records log = caplog.records[0] assert log.message.startswith( f"can't generate outgoings report for 2010-05-15 MatchingProgram {invoice}" " because no RT ticket available:", ) assert not output.getvalue() def run_aging_report(postings, today): postings = ( post for post in postings if post.account.is_under('Assets:Receivable', 'Liabilities:Payable') ) groups = dict(accrual.AccrualPostings.make_consistent(postings)) output = io.BytesIO() rt_wrapper = rtutil.RT(RTClient()) report = accrual.AgingReport(rt_wrapper, output, today) report.run(groups) return output @pytest.mark.parametrize('date', [ datetime.date(2010, 3, 1), # Both these dates are chosen for their off-by-one potential: # the first is exactly 30 days after the 2010-06-10 payable; # the second is exactly 60 days after the 2010-05-15 receivable. datetime.date(2010, 7, 10), datetime.date(2010, 7, 14), # The remainder just shuffle the age buckets some. datetime.date(2010, 12, 1), datetime.date(2011, 6, 1), datetime.date(2011, 12, 1), datetime.date(2012, 3, 1), ]) def test_aging_report_date_cutoffs(accrual_postings, date): output = run_aging_report(accrual_postings, date) check_aging_ods(output, date) def test_aging_report_entity_consistency(accrual_postings): date = datetime.date.today() output = run_aging_report(( post for post in accrual_postings if post.meta.get('rt-id') == 'rt:480' and post.units.number < 0 ), date) check_aging_ods(output, date, [], [ AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'), AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'), ]) def run_main(arglist, config=None, out_type=io.StringIO): if config is None: config = testutil.TestConfig( books_path=testutil.test_path('books/accruals.beancount'), rt_client=RTClient(), ) if out_type is io.BytesIO: arglist.insert(0, '--output-file=-') output = out_type() errors = io.StringIO() retcode = accrual.main(arglist, output, errors, config) output.seek(0) errors.seek(0) return retcode, output, errors def check_main_fails(arglist, config, expect_retcode): if not isinstance(expect_retcode, int): expect_retcode = cliutil.ExitCode[expect_retcode] retcode, output, errors = run_main(arglist, config) assert retcode == expect_retcode assert not output.getvalue() return errors @pytest.mark.parametrize('arglist', [ ['--report-type=balance', 'entity=EarlyBird'], ['--report-type=outgoing', 'entity=EarlyBird'], ]) def test_output_excludes_payments(arglist): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 output.seek(0) for line in output: assert not re.match(r'\brt:4\d\b', line) @pytest.mark.parametrize('arglist,expect_invoice', [ (['40'], 'rt:40/400'), (['44/440'], 'rt:44/440'), ]) def test_output_payments_when_only_match(arglist, expect_invoice): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 check_output(output, [ rf'^EarlyBird {re.escape(expect_invoice)}:$', r' outstanding since ', ]) @pytest.mark.parametrize('arglist,expect_amount', [ (['310'], 420), (['310/3120'], 220), (['-t', 'out', 'entity=Vendor'], 420), ]) def test_main_outgoing_report(arglist, expect_amount): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 rt_url = RTClient.DEFAULT_URL[:-9] rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>') contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>') check_output(output, [ r'^REQUESTOR: Mx\. 310 $', rf'^TOTAL TO PAY: \${expect_amount}\.00$', r'^\s*2010-04-30\s', r'^\s+Expenses:Travel\s+220 USD$', ]) @pytest.mark.parametrize('arglist', [ ['-t', 'balance'], ['515/5150'], ]) def test_main_balance_report(arglist): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 check_output(output, [ r'\brt://ticket/515/attachments/5150:$', r'^\s+1,500\.00 USD outstanding since 2010-05-15$', ]) def test_main_balance_report_because_no_rt_id(): invoice = 'Invoices/2010StateRegistration.pdf' retcode, output, errors = run_main([invoice]) assert not errors.getvalue() assert retcode == 0 check_output(output, [ rf'\b{re.escape(invoice)}:$', r'^\s+-50\.00 USD outstanding since 2010-06-20$', ]) @pytest.mark.parametrize('arglist', [ [], ['entity=Lawyer'], ]) def test_main_aging_report(arglist): if arglist: recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity] pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity] else: recv_rows = AGING_AR pay_rows = AGING_AP retcode, output, errors = run_main(arglist, out_type=io.BytesIO) assert not errors.getvalue() assert retcode == 0 check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows) def test_main_no_books(): errors = check_main_fails([], testutil.TestConfig(), 'NoConfiguration') testutil.check_lines_match(iter(errors), [ r':[01]: +no books to load in configuration\b', ]) @pytest.mark.parametrize('arglist', [ ['499'], ['505/99999'], ['-t', 'balance', 'entity=NonExistent'], ]) def test_main_no_matches(arglist, caplog): check_main_fails(arglist, None, 'NoDataFiltered') testutil.check_logs_match(caplog, [ ('WARNING', 'no matching entries found to report'), ]) def test_main_no_rt(caplog): config = testutil.TestConfig( books_path=testutil.test_path('books/accruals.beancount'), ) check_main_fails(['-t', 'out'], config, 'NoConfiguration') testutil.check_logs_match(caplog, [ ('ERROR', 'unable to generate outgoing report: RT client is required'), ])