conservancy_beancount/tests/test_reports_fund.py

278 lines
9.6 KiB
Python

"""test_reports_fund.py - Unit tests for fund report"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import collections
import copy
import datetime
import io
import itertools
import pytest
from . import testutil
import babel.numbers
import odf.opendocument
import odf.table
from beancount import loader as bc_loader
from conservancy_beancount import data
from conservancy_beancount.reports import core
from conservancy_beancount.reports import fund
from decimal import Decimal
_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
START_DATE = datetime.date(2018, 3, 1)
MID_DATE = datetime.date(2019, 3, 1)
STOP_DATE = datetime.date(2020, 3, 1)
EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')
OPENING_BALANCES = {
'Alpha': 3000,
'Bravo': 2000,
'Charlie': 1000,
'Conservancy': 4000,
'Delta': 0,
}
BALANCES_BY_YEAR = {
('Conservancy', 2018): [
('Income:Other', 40),
('Expenses:Other', -4),
('Assets:Receivable:Accounts', 40),
('Liabilities:Payable:Accounts', 4),
],
('Conservancy', 2019): [
('Income:Other', 42),
('Expenses:Other', Decimal('-4.20')),
('Equity:Realized:CurrencyConversion', Decimal('6.20')),
('Assets:Receivable:Accounts', -40),
('Liabilities:Payable:Accounts', -4),
],
('Alpha', 2018): [
('Income:Other', 60),
('Liabilities:UnearnedIncome', 30),
('Assets:Prepaid:Expenses', 20),
],
('Alpha', 2019): [
('Income:Other', 30),
('Expenses:Other', -26),
('Assets:Prepaid:Expenses', -20),
('Liabilities:UnearnedIncome', -30),
],
('Bravo', 2018): [
('Expenses:Other', -20),
],
('Bravo', 2019): [
('Income:Other', 200),
],
('Delta', 2018): [
('Income:Other', Decimal('.40')),
],
('Delta', 2019): [
('Income:Other', Decimal('4.60')),
],
}
@pytest.fixture
def fund_entries():
return copy.deepcopy(_ledger_load[0])
def split_text_lines(output):
for line in output:
account, amount = line.rsplit(None, 1)
yield account.strip(), amount
def format_amount(amount, currency='USD'):
return babel.numbers.format_currency(
amount, currency, format_type='accounting',
)
def check_text_balances(actual, expected, *expect_accounts):
balance = Decimal()
for expect_account in expect_accounts:
expect_amount = expected[expect_account]
if expect_amount:
actual_account, actual_amount = next(actual)
assert actual_account == expect_account
assert actual_amount == format_amount(expect_amount)
balance += expect_amount
return balance
def check_text_report(output, project, start_date, stop_date):
_, _, project = project.rpartition('=')
balance_amount = Decimal(OPENING_BALANCES[project])
expected = collections.defaultdict(Decimal)
for year in range(2018, stop_date.year):
try:
amounts = BALANCES_BY_YEAR[(project, year)]
except KeyError:
pass
else:
for account, amount in amounts:
if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
balance_amount += amount
else:
expected[account] += amount
actual = split_text_lines(output)
next(actual); next(actual) # Discard headers
open_acct, open_amt = next(actual)
assert open_acct == "{} balance as of {}".format(
project, start_date.isoformat(),
)
assert open_amt == format_amount(balance_amount)
balance_amount += check_text_balances(
actual, expected,
'Equity:Realized:CurrencyConversion',
'Income:Other',
'Expenses:Other',
)
end_acct, end_amt = next(actual)
assert end_acct == "{} balance as of {}".format(
project, stop_date.isoformat(),
)
assert end_amt == format_amount(balance_amount)
balance_amount += check_text_balances(
actual, expected,
'Assets:Receivable:Accounts',
'Assets:Prepaid:Expenses',
'Liabilities:Payable:Accounts',
'Liabilities:UnearnedIncome',
)
assert next(actual, None) is None
def check_cell_balance(cell, balance):
if balance:
assert cell.value == balance
else:
assert not cell.value
def check_ods_sheet(sheet, account_balances, *, full):
if full:
account_bals = account_balances.copy()
account_bals['Unrestricted'] = account_bals.pop('Conservancy')
else:
account_bals = {
key: balances
for key, balances in account_balances.items()
if key != 'Conservancy' and any(v >= .5 for v in balances.values())
}
for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
cells = iter(testutil.ODSCell.from_row(row))
try:
fund = next(cells).firstChild.text
except (AttributeError, StopIteration):
continue
try:
balances = account_bals.pop(fund)
except KeyError:
pytest.fail(f"report included unexpected fund {fund}")
check_cell_balance(next(cells), balances['opening'])
check_cell_balance(next(cells), balances['Income'])
check_cell_balance(next(cells), -balances['Expenses'])
if full:
check_cell_balance(next(cells), balances['Equity:Realized'])
check_cell_balance(next(cells), sum(balances[key] for key in [
'opening', 'Income', 'Expenses', 'Equity:Realized',
]))
if full:
check_cell_balance(next(cells), balances['Assets:Receivable'])
check_cell_balance(next(cells), balances['Assets:Prepaid'])
check_cell_balance(next(cells), balances['Liabilities:Payable'])
check_cell_balance(next(cells), balances['Liabilities'])
assert next(cells, None) is None
assert not account_bals, "did not see all funds in report"
def check_ods_report(ods, start_date, stop_date):
account_bals = collections.OrderedDict((key, {
'opening': Decimal(amount),
'Income': Decimal(0),
'Expenses': Decimal(0),
'Equity:Realized': Decimal(0),
'Assets:Receivable': Decimal(0),
'Assets:Prepaid': Decimal(0),
'Liabilities:Payable': Decimal(0),
'Liabilities': Decimal(0), # UnearnedIncome
}) for key, amount in sorted(OPENING_BALANCES.items()))
for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
try:
amounts = BALANCES_BY_YEAR[(fund, year)]
except KeyError:
pass
else:
for account, amount in amounts:
if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
acct_key = 'opening'
else:
acct_key, _, _ = account.rpartition(':')
account_bals[fund][acct_key] += amount
sheets = iter(ods.getElementsByType(odf.table.Table))
check_ods_sheet(next(sheets), account_bals, full=False)
check_ods_sheet(next(sheets), account_bals, full=True)
assert next(sheets, None) is None, "found unexpected sheet"
def run_main(out_type, arglist, config=None):
if config is None:
config = testutil.TestConfig(
books_path=testutil.test_path('books/fund.beancount'),
)
arglist.insert(0, '--output-file=-')
output = out_type()
errors = io.StringIO()
retcode = fund.main(arglist, output, errors, config)
output.seek(0)
return retcode, output, errors
@pytest.mark.parametrize('project,start_date,stop_date', [
('Conservancy', START_DATE, STOP_DATE),
('project=Conservancy', MID_DATE, STOP_DATE),
('Conservancy', START_DATE, MID_DATE),
('Alpha', START_DATE, STOP_DATE),
('project=Alpha', MID_DATE, STOP_DATE),
('Alpha', START_DATE, MID_DATE),
('Bravo', START_DATE, STOP_DATE),
('project=Bravo', MID_DATE, STOP_DATE),
('Bravo', START_DATE, MID_DATE),
('project=Charlie', START_DATE, STOP_DATE),
])
def test_text_report(project, start_date, stop_date):
retcode, output, errors = run_main(io.StringIO, [
'-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
])
assert not errors.getvalue()
assert retcode == 0
check_text_report(output, project, start_date, stop_date)
@pytest.mark.parametrize('start_date,stop_date', [
(START_DATE, STOP_DATE),
(MID_DATE, STOP_DATE),
(START_DATE, MID_DATE),
])
def test_ods_report(start_date, stop_date):
retcode, output, errors = run_main(io.BytesIO, [
'--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
])
assert not errors.getvalue()
assert retcode == 0
ods = odf.opendocument.load(output)
check_ods_report(ods, start_date, stop_date)
def test_main_no_postings(caplog):
retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
assert retcode == 24
assert any(log.levelname == 'WARNING' for log in caplog.records)