e22e63dcca
It is more common than I realized that we split an invoice by entity on the accrual side, so this supports that better. It still disregards inconsistency between accrual entity and payment entity for reporting purposes, to help keep reporting clean around automatic imports. The changes to BaseReport._report shook out because at this point, the group key is effectively arbitrary and shouldn't be used for any reporting purposes.
826 lines
29 KiB
Python
826 lines
29 KiB
Python
"""test_reports_accrual - Unit tests for accrual report"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import copy
|
|
import datetime
|
|
import io
|
|
import itertools
|
|
import logging
|
|
import operator
|
|
import re
|
|
|
|
import babel.numbers
|
|
import odf.opendocument
|
|
import odf.table
|
|
import odf.text
|
|
|
|
import pytest
|
|
|
|
from . import testutil
|
|
|
|
from decimal import Decimal
|
|
from typing import NamedTuple, Optional, Sequence
|
|
|
|
from beancount.core import data as bc_data
|
|
from beancount import loader as bc_loader
|
|
from conservancy_beancount import data
|
|
from conservancy_beancount import rtutil
|
|
from conservancy_beancount.reports import accrual
|
|
from conservancy_beancount.reports import core
|
|
|
|
_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
|
|
ACCRUAL_TXNS = [
|
|
entry for entry in _accruals_load[0]
|
|
if hasattr(entry, 'narration')
|
|
and entry.narration != 'Opening balances'
|
|
]
|
|
ACCRUALS_COUNT = sum(
|
|
1
|
|
for txn in ACCRUAL_TXNS
|
|
for post in txn.postings
|
|
if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
|
|
)
|
|
|
|
ACCOUNTS = [
|
|
'Assets:Receivable:Accounts',
|
|
'Assets:Receivable:Loans',
|
|
'Liabilities:Payable:Accounts',
|
|
'Liabilities:Payable:Vacation',
|
|
]
|
|
|
|
CONSISTENT_METADATA = [
|
|
'contract',
|
|
'purchase-order',
|
|
]
|
|
|
|
class AgingRow(NamedTuple):
|
|
date: datetime.date
|
|
entity: Sequence[str]
|
|
amount: Optional[Sequence[bc_data.Amount]]
|
|
at_cost: bc_data.Amount
|
|
rt_id: Sequence[str]
|
|
invoice: Sequence[str]
|
|
|
|
@classmethod
|
|
def make_simple(cls, date, entity, at_cost, invoice, rt_id=None, orig_amount=None):
|
|
if isinstance(date, str):
|
|
date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
|
|
if not isinstance(at_cost, tuple):
|
|
at_cost = testutil.Amount(at_cost)
|
|
if rt_id is None:
|
|
rt_id, _, _ = invoice.partition('/')
|
|
return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice])
|
|
|
|
def check_row_match(self, sheet_row):
|
|
cells = testutil.ODSCell.from_row(sheet_row)
|
|
assert len(cells) == len(self)
|
|
cells = iter(cells)
|
|
assert next(cells).value == self.date
|
|
assert next(cells).text == '\0'.join(self.entity)
|
|
assert next(cells).text == '\0'.join(
|
|
babel.numbers.format_currency(number, currency, format_type='accounting')
|
|
for number, currency in self.amount or ()
|
|
)
|
|
usd_cell = next(cells)
|
|
assert usd_cell.value_type == 'currency'
|
|
assert usd_cell.value == self.at_cost.number
|
|
for index, cell in enumerate(cells):
|
|
links = cell.getElementsByType(odf.text.A)
|
|
assert len(links) == len(cell.childNodes)
|
|
assert index >= 1
|
|
|
|
|
|
AGING_AP = [
|
|
AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
|
|
AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
|
|
AgingRow.make_simple('2010-04-30', 'Vendor', 200, 'FIXME'),
|
|
AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
|
|
AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
|
|
orig_amount=[testutil.Amount(1000, 'EUR')]),
|
|
]
|
|
|
|
AGING_AR = [
|
|
AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
|
|
AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
|
|
'rt://ticket/515/attachments/5150'),
|
|
]
|
|
|
|
class RTClient(testutil.RTClient):
|
|
TICKET_DATA = {
|
|
'40': [
|
|
('400', 'invoice feb.csv', 'text/csv', '40.4k'),
|
|
],
|
|
'44': [
|
|
('440', 'invoice feb.csv', 'text/csv', '40.4k'),
|
|
],
|
|
'490': [],
|
|
'505': [],
|
|
'510': [
|
|
('4000', 'contract.pdf', 'application/pdf', '1.4m'),
|
|
('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
|
|
('5105', 'payment.png', 'image/png', '51.5k'),
|
|
('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
|
|
],
|
|
'515': [],
|
|
'520': [],
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def accrual_postings():
|
|
return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))
|
|
|
|
def check_link_regexp(regexp, match_s, first_link_only=False):
|
|
assert regexp
|
|
assert re.search(regexp, match_s)
|
|
assert re.search(regexp, match_s + ' postlink')
|
|
assert re.search(regexp, match_s + '0') is None
|
|
assert re.search(regexp, '1' + match_s) is None
|
|
end_match = re.search(regexp, 'prelink ' + match_s)
|
|
if first_link_only:
|
|
assert end_match is None
|
|
else:
|
|
assert end_match
|
|
|
|
def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
|
|
return wrap_type(
|
|
post for post in postings
|
|
if post.meta.get(key) == value
|
|
and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
|
|
)
|
|
|
|
def find_row_by_text(row_source, want_text):
|
|
for row in row_source:
|
|
try:
|
|
found_row = row.childNodes[0].text == want_text
|
|
except IndexError:
|
|
found_row = False
|
|
if found_row:
|
|
return row
|
|
return None
|
|
|
|
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
|
|
if not aging_rows:
|
|
return
|
|
if isinstance(accrue_date, int):
|
|
accrue_date = date + datetime.timedelta(days=accrue_date)
|
|
rows = iter(sheet.getElementsByType(odf.table.TableRow))
|
|
for row in rows:
|
|
if "Aging Report" in row.text:
|
|
break
|
|
else:
|
|
assert None, "Header row not found"
|
|
assert f"Accrued by {accrue_date.isoformat()}" in row.text
|
|
assert f"Unpaid by {date.isoformat()}" in row.text
|
|
expect_rows = iter(aging_rows)
|
|
row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
|
|
next(expect_rows).check_row_match(row0)
|
|
for actual, expected in zip(rows, expect_rows):
|
|
expected.check_row_match(actual)
|
|
for row in rows:
|
|
if row.text.startswith("Total Aged Over "):
|
|
break
|
|
else:
|
|
assert None, "Totals rows not found"
|
|
actual_sum = Decimal(row.childNodes[-1].value)
|
|
for row in rows:
|
|
if row.text.startswith("Total Aged Over "):
|
|
actual_sum += Decimal(row.childNodes[-1].value)
|
|
else:
|
|
break
|
|
assert actual_sum == sum(
|
|
row.at_cost.number
|
|
for row in aging_rows
|
|
if row.date <= accrue_date
|
|
and row.at_cost.number > 0
|
|
)
|
|
|
|
def check_aging_ods(ods_file,
|
|
date=None,
|
|
recv_rows=AGING_AR,
|
|
pay_rows=AGING_AP,
|
|
):
|
|
if date is None:
|
|
date = datetime.date.today()
|
|
ods_file.seek(0)
|
|
ods = odf.opendocument.load(ods_file)
|
|
sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
|
|
assert len(sheets) == 2
|
|
check_aging_sheet(sheets[0], recv_rows, date, -60)
|
|
check_aging_sheet(sheets[1], pay_rows, date, -30)
|
|
|
|
@pytest.mark.parametrize('link_fmt', [
|
|
'{}',
|
|
'rt:{}',
|
|
'rt://ticket/{}',
|
|
])
|
|
def test_search_term_parse_rt_shortcuts(link_fmt):
|
|
key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
|
|
assert key == 'rt-id'
|
|
check_link_regexp(regexp, 'rt:220', first_link_only=True)
|
|
check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
|
|
|
|
@pytest.mark.parametrize('link_fmt', [
|
|
'{}/{}',
|
|
'rt:{}/{}',
|
|
'rt://ticket/{}/attachments/{}',
|
|
])
|
|
def test_search_term_parse_invoice_shortcuts(link_fmt):
|
|
key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
|
|
assert key == 'invoice'
|
|
check_link_regexp(regexp, 'rt:330/660')
|
|
check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
|
|
|
|
@pytest.mark.parametrize('key', [
|
|
'approval',
|
|
'contract',
|
|
'invoice',
|
|
])
|
|
def test_search_term_parse_metadata_rt_shortcut(key):
|
|
actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
|
|
assert actual_key == key
|
|
check_link_regexp(regexp, 'rt:440/420')
|
|
check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
|
|
|
|
@pytest.mark.parametrize('key', [
|
|
None,
|
|
'approval',
|
|
'contract',
|
|
'invoice',
|
|
])
|
|
def test_search_term_parse_repo_link(key):
|
|
document = '1234.pdf'
|
|
if key is None:
|
|
key = 'invoice'
|
|
search = document
|
|
else:
|
|
search = f'{key}={document}'
|
|
actual_key, regexp = accrual.SearchTerm.parse(search)
|
|
assert actual_key == key
|
|
check_link_regexp(regexp, document)
|
|
|
|
@pytest.mark.parametrize('search,unmatched', [
|
|
('1234.pdf', '1234_pdf'),
|
|
])
|
|
def test_search_term_parse_regexp_escaping(search, unmatched):
|
|
_, regexp = accrual.SearchTerm.parse(search)
|
|
assert re.search(regexp, unmatched) is None
|
|
|
|
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
|
|
([], ACCRUALS_COUNT, lambda post: post.account.is_under(
|
|
'Assets:Receivable:', 'Liabilities:Payable:',
|
|
)),
|
|
([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
|
|
([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
|
|
([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
|
|
([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
|
|
lambda post: post.meta['invoice'].startswith('rt:510/')),
|
|
([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
|
|
])
|
|
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
|
|
actual = list(accrual.filter_search(accrual_postings, search_terms))
|
|
if expect_count < ACCRUALS_COUNT:
|
|
assert ACCRUALS_COUNT > len(actual) >= expect_count
|
|
else:
|
|
assert len(actual) == ACCRUALS_COUNT
|
|
for post in actual:
|
|
assert check_func(post)
|
|
|
|
@pytest.mark.parametrize('arg,expected', [
|
|
('aging', accrual.AgingReport),
|
|
('balance', accrual.BalanceReport),
|
|
('outgoing', accrual.OutgoingReport),
|
|
('age', accrual.AgingReport),
|
|
('bal', accrual.BalanceReport),
|
|
('out', accrual.OutgoingReport),
|
|
('outgoings', accrual.OutgoingReport),
|
|
])
|
|
def test_report_type_by_name(arg, expected):
|
|
assert accrual.ReportType.by_name(arg.lower()).value is expected
|
|
assert accrual.ReportType.by_name(arg.title()).value is expected
|
|
assert accrual.ReportType.by_name(arg.upper()).value is expected
|
|
|
|
@pytest.mark.parametrize('arg', [
|
|
'unknown',
|
|
'blance',
|
|
'outgong',
|
|
])
|
|
def test_report_type_by_unknown_name(arg):
|
|
# Raising ValueError helps argparse generate good messages.
|
|
with pytest.raises(ValueError):
|
|
accrual.ReportType.by_name(arg)
|
|
|
|
@pytest.mark.parametrize('acct_name', ACCOUNTS)
|
|
def test_accrual_postings_consistent_account(acct_name):
|
|
meta = {'invoice': '{acct_name} invoice.pdf'}
|
|
txn = testutil.Transaction(postings=[
|
|
(acct_name, 50, meta),
|
|
(acct_name, 25, meta),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
assert related.account == acct_name
|
|
assert related.accounts == {acct_name}
|
|
|
|
@pytest.mark.parametrize('meta_key,acct_name', testutil.combine_values(
|
|
CONSISTENT_METADATA,
|
|
ACCOUNTS,
|
|
))
|
|
def test_accrual_postings_consistent_metadata(meta_key, acct_name):
|
|
meta_value = f'{meta_key}.pdf'
|
|
meta = {
|
|
meta_key: meta_value,
|
|
'invoice': f'invoice with {meta_key}.pdf',
|
|
}
|
|
txn = testutil.Transaction(postings=[
|
|
(acct_name, 70, meta),
|
|
(acct_name, 35, meta),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
attr_name = meta_key.replace('-', '_')
|
|
assert getattr(related, attr_name) == meta_value
|
|
assert getattr(related, f'{attr_name}s') == {meta_value}
|
|
|
|
def test_accrual_postings_entity():
|
|
txn = testutil.Transaction(postings=[
|
|
(ACCOUNTS[0], 25, {'entity': 'Accruee'}),
|
|
(ACCOUNTS[0], -15, {'entity': 'Payee15'}),
|
|
(ACCOUNTS[0], -10, {'entity': 'Payee10'}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
assert related.accrued_entities == {'Accruee'}
|
|
assert related.paid_entities == {'Payee10', 'Payee15'}
|
|
|
|
def test_accrual_postings_entities():
|
|
txn = testutil.Transaction(postings=[
|
|
(ACCOUNTS[0], 25, {'entity': 'Accruee'}),
|
|
(ACCOUNTS[0], -15, {'entity': 'Payee15'}),
|
|
(ACCOUNTS[0], -10, {'entity': 'Payee10'}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
actual = related.entities()
|
|
assert next(actual, None) == 'Accruee'
|
|
assert set(actual) == {'Payee10', 'Payee15'}
|
|
|
|
def test_accrual_postings_entities_no_duplicates():
|
|
txn = testutil.Transaction(postings=[
|
|
(ACCOUNTS[0], 25, {'entity': 'Accruee'}),
|
|
(ACCOUNTS[0], -15, {'entity': 'Accruee'}),
|
|
(ACCOUNTS[0], -10, {'entity': 'Other'}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
actual = related.entities()
|
|
assert next(actual, None) == 'Accruee'
|
|
assert next(actual, None) == 'Other'
|
|
assert next(actual, None) is None
|
|
|
|
def test_accrual_postings_inconsistent_account():
|
|
meta = {'invoice': 'invoice.pdf'}
|
|
txn = testutil.Transaction(postings=[
|
|
(acct_name, index, meta)
|
|
for index, acct_name in enumerate(ACCOUNTS)
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
assert related.account is related.INCONSISTENT
|
|
assert related.accounts == set(ACCOUNTS)
|
|
|
|
@pytest.mark.parametrize('meta_key,acct_name', testutil.combine_values(
|
|
CONSISTENT_METADATA,
|
|
ACCOUNTS,
|
|
))
|
|
def test_accrual_postings_inconsistent_metadata(meta_key, acct_name):
|
|
invoice = 'invoice with {meta_key}.pdf'
|
|
meta_value = f'{meta_key}.pdf'
|
|
txn = testutil.Transaction(postings=[
|
|
(acct_name, 20, {'invoice': invoice, meta_key: meta_value}),
|
|
(acct_name, 35, {'invoice': invoice}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
attr_name = meta_key.replace('-', '_')
|
|
assert getattr(related, attr_name) is related.INCONSISTENT
|
|
assert getattr(related, f'{attr_name}s') == {meta_value, None}
|
|
|
|
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
|
|
CONSISTENT_METADATA,
|
|
ACCOUNTS,
|
|
))
|
|
def test_consistency_check_when_consistent(meta_key, account):
|
|
invoice = f'test-{meta_key}-invoice'
|
|
meta_value = f'test-{meta_key}-value'
|
|
meta = {
|
|
'invoice': invoice,
|
|
meta_key: meta_value,
|
|
}
|
|
txn = testutil.Transaction(postings=[
|
|
(account, 100, meta),
|
|
(account, -100, meta),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
assert not list(related.report_inconsistencies())
|
|
|
|
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
|
|
['approval', 'entity', 'fx-rate', 'statement'],
|
|
ACCOUNTS,
|
|
))
|
|
def test_consistency_check_ignored_metadata(meta_key, account):
|
|
invoice = f'test-{meta_key}-invoice'
|
|
txn = testutil.Transaction(postings=[
|
|
(account, 100, {'invoice': invoice, meta_key: 'credit'}),
|
|
(account, -100, {'invoice': invoice, meta_key: 'debit'}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
assert not list(related.report_inconsistencies())
|
|
|
|
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
|
|
CONSISTENT_METADATA,
|
|
ACCOUNTS,
|
|
))
|
|
def test_consistency_check_when_inconsistent(meta_key, account):
|
|
invoice = f'test-{meta_key}-invoice'
|
|
txn = testutil.Transaction(postings=[
|
|
(account, 100, {'invoice': invoice, meta_key: 'credit', 'lineno': 1}),
|
|
(account, -100, {'invoice': invoice, meta_key: 'debit', 'lineno': 2}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
errors = list(related.report_inconsistencies())
|
|
for exp_lineno, (actual, exp_msg) in enumerate(itertools.zip_longest(errors, [
|
|
f'inconsistent {meta_key} for invoice {invoice}: credit',
|
|
f'inconsistent {meta_key} for invoice {invoice}: debit',
|
|
]), 1):
|
|
assert actual.message == exp_msg
|
|
assert actual.entry is txn
|
|
assert actual.source.get('lineno') == exp_lineno
|
|
|
|
def test_consistency_check_cost():
|
|
account = ACCOUNTS[0]
|
|
invoice = 'test-cost-invoice'
|
|
txn = testutil.Transaction(postings=[
|
|
(account, 100, 'EUR', ('1.1251', 'USD'), {'invoice': invoice, 'lineno': 1}),
|
|
(account, -100, 'EUR', ('1.125', 'USD'), {'invoice': invoice, 'lineno': 2}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
errors = list(related.report_inconsistencies())
|
|
for post, err in itertools.zip_longest(txn.postings, errors):
|
|
assert err.message == f'inconsistent cost for invoice {invoice}: {post.cost}'
|
|
assert err.entry is txn
|
|
assert err.source.get('lineno') == post.meta['lineno']
|
|
|
|
def test_make_consistent_not_needed():
|
|
main_meta = {
|
|
'entity': 'ConsistentTest',
|
|
'invoice': 'Invoices/ConsistentDoc.pdf',
|
|
}
|
|
other_meta = {key: f'{key}.pdf' for key in CONSISTENT_METADATA}
|
|
# We intentionally make inconsistencies in "minor" metadata that shouldn't
|
|
# split out the group.
|
|
txn = testutil.Transaction(postings=[
|
|
(ACCOUNTS[0], 20, {**main_meta, **other_meta}),
|
|
(ACCOUNTS[0], 25, {**main_meta}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = related.make_consistent()
|
|
actual_key, actual_postings = next(consistent)
|
|
assert actual_key == main_meta['invoice']
|
|
assert actual_postings is related
|
|
assert next(consistent, None) is None
|
|
|
|
@pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values(
|
|
ACCOUNTS,
|
|
['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES],
|
|
itertools.count(1),
|
|
))
|
|
def test_make_consistent_bad_invoice(acct_name, invoice, day):
|
|
txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[
|
|
(acct_name, index * 10, {'invoice': invoice, 'entity': f'BadInvoice{day}'})
|
|
for index in range(1, 4)
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = dict(related.make_consistent())
|
|
assert len(consistent) == 1
|
|
key = next(iter(consistent))
|
|
assert acct_name in key
|
|
if invoice:
|
|
assert str(invoice) in key
|
|
actual = consistent[key]
|
|
assert actual
|
|
assert len(actual) == 3
|
|
for act_post, exp_post in zip(actual, txn.postings):
|
|
assert act_post.units == exp_post.units
|
|
assert act_post.meta.get('invoice') == invoice
|
|
|
|
def test_make_consistent_across_accounts():
|
|
invoice = 'Invoices/CrossAccount.pdf'
|
|
txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[
|
|
(acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'})
|
|
for acct_name in ACCOUNTS
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = dict(related.make_consistent())
|
|
assert len(consistent) == len(ACCOUNTS)
|
|
for key, posts in consistent.items():
|
|
assert len(posts) == 1
|
|
assert posts.account in key
|
|
|
|
def test_make_consistent_both_invoice_and_account():
|
|
txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[
|
|
(acct_name, 150) for acct_name in ACCOUNTS
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = dict(related.make_consistent())
|
|
assert len(consistent) == len(ACCOUNTS)
|
|
for key, posts in consistent.items():
|
|
assert len(posts) == 1
|
|
assert posts.account in key
|
|
|
|
@pytest.mark.parametrize('acct_name', ACCOUNTS)
|
|
def test_make_consistent_across_entity(acct_name):
|
|
amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg
|
|
txn = testutil.Transaction(postings=[
|
|
(acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'})
|
|
for n in range(1, 4)
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = dict(related.make_consistent())
|
|
assert len(consistent) == 3
|
|
for key, posts in consistent.items():
|
|
assert len(posts) == 1
|
|
assert len(posts.accrued_entities) == 1
|
|
assert next(posts.entities()) in key
|
|
|
|
@pytest.mark.parametrize('acct_name', ACCOUNTS)
|
|
def test_make_consistent_entity_differs_accrual_payment(acct_name):
|
|
invoice = 'Invoices/DifferPay.pdf'
|
|
txn = testutil.Transaction(postings=[
|
|
# Depending on the account, the order of the accrual and payment might
|
|
# be swapped here, but that shouldn't matter.
|
|
(acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}),
|
|
(acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}),
|
|
])
|
|
related = accrual.AccrualPostings(data.Posting.from_txn(txn))
|
|
consistent = related.make_consistent()
|
|
_, actual = next(consistent)
|
|
assert actual is related
|
|
assert next(consistent, None) is None
|
|
|
|
def check_output(output, expect_patterns):
|
|
output.seek(0)
|
|
testutil.check_lines_match(iter(output), expect_patterns)
|
|
|
|
def run_outgoing(invoice, postings, rt_client=None):
|
|
if rt_client is None:
|
|
rt_client = RTClient()
|
|
if not isinstance(postings, core.RelatedPostings):
|
|
postings = accruals_by_meta(postings, invoice, wrap_type=accrual.AccrualPostings)
|
|
output = io.StringIO()
|
|
report = accrual.OutgoingReport(rt_client, output)
|
|
report.run({invoice: postings})
|
|
return output
|
|
|
|
@pytest.mark.parametrize('invoice,expected', [
|
|
('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
|
|
('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
|
|
('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
|
|
('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
|
|
])
|
|
def test_balance_report(accrual_postings, invoice, expected, caplog):
|
|
related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
|
|
output = io.StringIO()
|
|
report = accrual.BalanceReport(output)
|
|
report.run({invoice: related})
|
|
assert not caplog.records
|
|
check_output(output, [invoice, expected])
|
|
|
|
def test_outgoing_report(accrual_postings, caplog):
|
|
invoice = 'rt:510/6100'
|
|
output = run_outgoing(invoice, accrual_postings)
|
|
rt_url = RTClient.DEFAULT_URL[:-9]
|
|
rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
|
|
contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
|
|
assert not caplog.records
|
|
check_output(output, [
|
|
r'^PAYMENT FOR APPROVAL:$',
|
|
r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
|
|
r'^TOTAL TO PAY: \$280\.00$',
|
|
fr'^AGREEMENT: {contract_url}',
|
|
r'^PAYMENT TO: Hon\. Mx\. 510$',
|
|
r'^PAYMENT METHOD: payment method 510$',
|
|
r'^BEANCOUNT ENTRIES:$',
|
|
# For each transaction, check for the date line, a metadata, and the
|
|
# Expenses posting.
|
|
r'^\s*2010-06-10\s',
|
|
fr'^\s+rt-id: "{rt_id_url}"$',
|
|
r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
|
|
r'^\s*2010-06-12\s',
|
|
fr'^\s+contract: "{contract_url}"$',
|
|
r'^\s+Expenses:FilingFees\s+60\.00 USD$',
|
|
])
|
|
|
|
def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
|
|
invoice = 'rt:510/6100'
|
|
rt_client = RTClient(want_cfs=False)
|
|
output = run_outgoing(invoice, accrual_postings, rt_client)
|
|
assert not caplog.records
|
|
check_output(output, [
|
|
r'^PAYMENT FOR APPROVAL:$',
|
|
r'^REQUESTOR: <mx510@example\.org>$',
|
|
r'^PAYMENT TO:\s*$',
|
|
r'^PAYMENT METHOD:\s*$',
|
|
])
|
|
|
|
def test_outgoing_report_fx_amounts(accrual_postings, caplog):
|
|
invoice = 'rt:520/5200'
|
|
output = run_outgoing(invoice, accrual_postings)
|
|
assert not caplog.records
|
|
check_output(output, [
|
|
r'^PAYMENT FOR APPROVAL:$',
|
|
r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
|
|
r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
|
|
])
|
|
|
|
def test_outgoing_report_without_rt_id(accrual_postings, caplog):
|
|
invoice = 'rt://ticket/515/attachments/5150'
|
|
output = run_outgoing(invoice, accrual_postings)
|
|
assert caplog.records
|
|
log = caplog.records[0]
|
|
assert log.message.startswith(
|
|
f"can't generate outgoings report for {invoice} because no RT ticket available:",
|
|
)
|
|
assert not output.getvalue()
|
|
|
|
def run_aging_report(postings, today=None):
|
|
if today is None:
|
|
today = datetime.date.today()
|
|
postings = (
|
|
post for post in postings
|
|
if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
|
|
)
|
|
groups = {
|
|
key: group
|
|
for _, related in accrual.AccrualPostings.group_by_meta(postings, 'invoice')
|
|
for key, group in related.make_consistent()
|
|
}
|
|
output = io.BytesIO()
|
|
rt_client = RTClient()
|
|
report = accrual.AgingReport(rt_client, output, today)
|
|
report.run(groups)
|
|
return output
|
|
|
|
def test_aging_report(accrual_postings):
|
|
output = run_aging_report(accrual_postings)
|
|
check_aging_ods(output)
|
|
|
|
@pytest.mark.parametrize('date,recv_end,pay_end', [
|
|
# Both these dates are chosen for their off-by-one potential:
|
|
# the first is exactly 30 days after the 2010-06-10 payable;
|
|
# the second is exactly 60 days after the 2010-05-15 receivable.
|
|
(datetime.date(2010, 7, 10), 1, 4),
|
|
(datetime.date(2010, 7, 14), 2, 4),
|
|
])
|
|
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
|
|
expect_recv = AGING_AR[:recv_end]
|
|
expect_pay = AGING_AP[:pay_end]
|
|
output = run_aging_report(accrual_postings, date)
|
|
check_aging_ods(output, date, expect_recv, expect_pay)
|
|
|
|
def test_aging_report_entity_consistency(accrual_postings):
|
|
output = run_aging_report((
|
|
post for post in accrual_postings
|
|
if post.meta.get('rt-id') == 'rt:480'
|
|
and post.units.number < 0
|
|
))
|
|
check_aging_ods(output, None, [], [
|
|
AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
|
|
AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
|
|
])
|
|
|
|
def run_main(arglist, config=None):
|
|
if config is None:
|
|
config = testutil.TestConfig(
|
|
books_path=testutil.test_path('books/accruals.beancount'),
|
|
rt_client=RTClient(),
|
|
)
|
|
output = io.StringIO()
|
|
errors = io.StringIO()
|
|
retcode = accrual.main(arglist, output, errors, config)
|
|
return retcode, output, errors
|
|
|
|
def check_main_fails(arglist, config, error_flags, error_patterns):
|
|
retcode, output, errors = run_main(arglist, config)
|
|
assert retcode > 16
|
|
assert (retcode - 16) & error_flags
|
|
check_output(errors, error_patterns)
|
|
assert not output.getvalue()
|
|
|
|
@pytest.mark.parametrize('arglist', [
|
|
['--report-type=balance', 'entity=EarlyBird'],
|
|
['--report-type=outgoing', 'entity=EarlyBird'],
|
|
])
|
|
def test_output_excludes_payments(arglist):
|
|
retcode, output, errors = run_main(arglist)
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
output.seek(0)
|
|
for line in output:
|
|
assert not re.match(r'\brt:4\d\b', line)
|
|
|
|
@pytest.mark.parametrize('arglist,expect_invoice', [
|
|
(['40'], 'rt:40/400'),
|
|
(['44/440'], 'rt:44/440'),
|
|
])
|
|
def test_output_payments_when_only_match(arglist, expect_invoice):
|
|
retcode, output, errors = run_main(arglist)
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
check_output(output, [
|
|
rf'^{re.escape(expect_invoice)}:$',
|
|
r' outstanding since ',
|
|
])
|
|
|
|
@pytest.mark.parametrize('arglist', [
|
|
['510'],
|
|
['510/6100'],
|
|
['entity=Lawyer'],
|
|
])
|
|
def test_main_outgoing_report(arglist):
|
|
retcode, output, errors = run_main(arglist)
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
rt_url = RTClient.DEFAULT_URL[:-9]
|
|
rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>')
|
|
contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>')
|
|
check_output(output, [
|
|
r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
|
|
r'^TOTAL TO PAY: \$280\.00$',
|
|
r'^\s*2010-06-12\s',
|
|
r'^\s+Expenses:FilingFees\s+60\.00 USD$',
|
|
])
|
|
|
|
@pytest.mark.parametrize('arglist', [
|
|
['-t', 'balance'],
|
|
['515/5150'],
|
|
['entity=MatchingProgram'],
|
|
])
|
|
def test_main_balance_report(arglist):
|
|
retcode, output, errors = run_main(arglist)
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
check_output(output, [
|
|
r'\brt://ticket/515/attachments/5150:$',
|
|
r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
|
|
])
|
|
|
|
@pytest.mark.parametrize('arglist', [
|
|
[],
|
|
['-t', 'aging', 'entity=Lawyer'],
|
|
])
|
|
def test_main_aging_report(tmp_path, arglist):
|
|
if arglist:
|
|
recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
|
|
pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
|
|
else:
|
|
recv_rows = AGING_AR
|
|
pay_rows = AGING_AP
|
|
output_path = tmp_path / 'AgingReport.ods'
|
|
arglist.insert(0, f'--output-file={output_path}')
|
|
retcode, output, errors = run_main(arglist)
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
assert not output.getvalue()
|
|
with output_path.open('rb') as ods_file:
|
|
check_aging_ods(ods_file, None, recv_rows, pay_rows)
|
|
|
|
def test_main_no_books():
|
|
check_main_fails([], testutil.TestConfig(), 1 | 8, [
|
|
r':1: +no books to load in configuration\b',
|
|
])
|
|
|
|
@pytest.mark.parametrize('arglist', [
|
|
['499'],
|
|
['505/99999'],
|
|
['entity=NonExistent'],
|
|
])
|
|
def test_main_no_matches(arglist):
|
|
check_main_fails(arglist, None, 8, [
|
|
r': WARNING: no matching entries found to report$',
|
|
])
|
|
|
|
def test_main_no_rt():
|
|
config = testutil.TestConfig(
|
|
books_path=testutil.test_path('books/accruals.beancount'),
|
|
)
|
|
check_main_fails(['-t', 'out'], config, 4, [
|
|
r': ERROR: unable to generate outgoing report: RT client is required\b',
|
|
])
|