b25bea0fc6
Unearned Income is more interesting.
284 lines
9.8 KiB
Python
284 lines
9.8 KiB
Python
"""test_reports_fund.py - Unit tests for fund report"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import copy
|
|
import datetime
|
|
import io
|
|
import itertools
|
|
|
|
import pytest
|
|
|
|
from . import testutil
|
|
|
|
import babel.numbers
|
|
import odf.opendocument
|
|
import odf.table
|
|
|
|
from beancount import loader as bc_loader
|
|
from conservancy_beancount import data
|
|
from conservancy_beancount.reports import core
|
|
from conservancy_beancount.reports import fund
|
|
|
|
from decimal import Decimal
|
|
|
|
_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
|
|
START_DATE = datetime.date(2018, 3, 1)
|
|
MID_DATE = datetime.date(2019, 3, 1)
|
|
STOP_DATE = datetime.date(2020, 3, 1)
|
|
|
|
EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')
|
|
|
|
OPENING_BALANCES = {
|
|
'Alpha': 3000,
|
|
'Bravo': 2000,
|
|
'Charlie': 1000,
|
|
'Conservancy': 4000,
|
|
'Delta': 0,
|
|
}
|
|
|
|
BALANCES_BY_YEAR = {
|
|
('Conservancy', 2018): [
|
|
('Income:Other', 40),
|
|
('Expenses:Other', -4),
|
|
('Assets:Receivable:Accounts', 40),
|
|
('Liabilities:Payable:Accounts', 4),
|
|
],
|
|
('Conservancy', 2019): [
|
|
('Income:Other', 42),
|
|
('Expenses:Other', Decimal('-4.20')),
|
|
('Equity:Realized:CurrencyConversion', Decimal('6.20')),
|
|
('Assets:Receivable:Accounts', -40),
|
|
('Liabilities:Payable:Accounts', -4),
|
|
],
|
|
('Alpha', 2018): [
|
|
('Income:Other', 60),
|
|
('Liabilities:UnearnedIncome', 30),
|
|
('Assets:Prepaid:Expenses', 20),
|
|
],
|
|
('Alpha', 2019): [
|
|
('Income:Other', 30),
|
|
('Expenses:Other', -26),
|
|
('Assets:Prepaid:Expenses', -20),
|
|
('Liabilities:UnearnedIncome', -30),
|
|
],
|
|
('Bravo', 2018): [
|
|
('Expenses:Other', -20),
|
|
],
|
|
('Bravo', 2019): [
|
|
('Income:Other', 200),
|
|
],
|
|
('Delta', 2018): [
|
|
('Income:Other', Decimal('.40')),
|
|
],
|
|
('Delta', 2019): [
|
|
('Income:Other', Decimal('4.60')),
|
|
],
|
|
}
|
|
|
|
@pytest.fixture
|
|
def fund_entries():
|
|
return copy.deepcopy(_ledger_load[0])
|
|
|
|
def split_text_lines(output):
|
|
for line in output:
|
|
account, amount = line.rsplit(None, 1)
|
|
yield account.strip(), amount
|
|
|
|
def format_amount(amount, currency='USD'):
|
|
return babel.numbers.format_currency(
|
|
amount, currency, format_type='accounting',
|
|
)
|
|
|
|
def check_text_balances(actual, expected, *expect_accounts):
|
|
balance = Decimal()
|
|
for expect_account in expect_accounts:
|
|
expect_amount = expected[expect_account]
|
|
if expect_amount:
|
|
actual_account, actual_amount = next(actual)
|
|
assert actual_account == expect_account
|
|
assert actual_amount == format_amount(expect_amount)
|
|
balance += expect_amount
|
|
return balance
|
|
|
|
def check_text_report(output, project, start_date, stop_date):
|
|
_, _, project = project.rpartition('=')
|
|
balance_amount = Decimal(OPENING_BALANCES[project])
|
|
expected = collections.defaultdict(Decimal)
|
|
for year in range(2018, stop_date.year):
|
|
try:
|
|
amounts = BALANCES_BY_YEAR[(project, year)]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
for account, amount in amounts:
|
|
if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
|
|
balance_amount += amount
|
|
else:
|
|
expected[account] += amount
|
|
actual = split_text_lines(output)
|
|
next(actual); next(actual) # Discard headers
|
|
open_acct, open_amt = next(actual)
|
|
assert open_acct == "{} balance as of {}".format(
|
|
project, start_date.isoformat(),
|
|
)
|
|
assert open_amt == format_amount(balance_amount)
|
|
balance_amount += check_text_balances(
|
|
actual, expected,
|
|
'Equity:Realized:CurrencyConversion',
|
|
'Income:Other',
|
|
'Expenses:Other',
|
|
)
|
|
end_acct, end_amt = next(actual)
|
|
assert end_acct == "{} balance as of {}".format(
|
|
project, stop_date.isoformat(),
|
|
)
|
|
assert end_amt == format_amount(balance_amount)
|
|
balance_amount += check_text_balances(
|
|
actual, expected,
|
|
'Assets:Receivable:Accounts',
|
|
'Assets:Prepaid:Expenses',
|
|
'Liabilities:Payable:Accounts',
|
|
'Liabilities:UnearnedIncome',
|
|
)
|
|
assert next(actual, None) is None
|
|
|
|
def check_cell_balance(cell, balance):
|
|
if balance:
|
|
assert cell.value == balance
|
|
else:
|
|
assert not cell.value
|
|
|
|
def check_ods_sheet(sheet, account_balances, *, full):
|
|
if full:
|
|
account_bals = account_balances.copy()
|
|
account_bals['Unrestricted'] = account_bals.pop('Conservancy')
|
|
else:
|
|
account_bals = {
|
|
key: balances
|
|
for key, balances in account_balances.items()
|
|
if key != 'Conservancy' and any(v >= .5 for v in balances.values())
|
|
}
|
|
totals = {key: Decimal() for key in
|
|
['opening', 'Income', 'Expenses', 'Equity:Realized']}
|
|
for fund, balances in account_bals.items():
|
|
for key in totals:
|
|
totals[key] += balances[key]
|
|
account_bals[''] = totals
|
|
for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
|
|
cells = iter(testutil.ODSCell.from_row(row))
|
|
try:
|
|
fund = next(cells).firstChild.text
|
|
except (AttributeError, StopIteration):
|
|
continue
|
|
try:
|
|
balances = account_bals.pop(fund)
|
|
except KeyError:
|
|
pytest.fail(f"report included unexpected fund {fund}")
|
|
check_cell_balance(next(cells), balances['opening'])
|
|
check_cell_balance(next(cells), balances['Income'])
|
|
check_cell_balance(next(cells), -balances['Expenses'])
|
|
if full:
|
|
check_cell_balance(next(cells), balances['Equity:Realized'])
|
|
check_cell_balance(next(cells), sum(balances[key] for key in [
|
|
'opening', 'Income', 'Expenses', 'Equity:Realized',
|
|
]))
|
|
if full:
|
|
check_cell_balance(next(cells), balances['Assets:Receivable'])
|
|
check_cell_balance(next(cells), balances['Assets:Prepaid'])
|
|
check_cell_balance(next(cells), balances['Liabilities'])
|
|
check_cell_balance(next(cells), balances['Liabilities:Payable'])
|
|
assert next(cells, None) is None
|
|
assert not account_bals, "did not see all funds in report"
|
|
|
|
def check_ods_report(ods, start_date, stop_date):
|
|
account_bals = collections.OrderedDict((key, {
|
|
'opening': Decimal(amount),
|
|
'Income': Decimal(0),
|
|
'Expenses': Decimal(0),
|
|
'Equity:Realized': Decimal(0),
|
|
'Assets:Receivable': Decimal(0),
|
|
'Assets:Prepaid': Decimal(0),
|
|
'Liabilities:Payable': Decimal(0),
|
|
'Liabilities': Decimal(0), # UnearnedIncome
|
|
}) for key, amount in sorted(OPENING_BALANCES.items()))
|
|
for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
|
|
try:
|
|
amounts = BALANCES_BY_YEAR[(fund, year)]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
for account, amount in amounts:
|
|
if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
|
|
acct_key = 'opening'
|
|
else:
|
|
acct_key, _, _ = account.rpartition(':')
|
|
account_bals[fund][acct_key] += amount
|
|
sheets = iter(ods.getElementsByType(odf.table.Table))
|
|
check_ods_sheet(next(sheets), account_bals, full=False)
|
|
check_ods_sheet(next(sheets), account_bals, full=True)
|
|
assert next(sheets, None) is None, "found unexpected sheet"
|
|
|
|
def run_main(out_type, arglist, config=None):
|
|
if config is None:
|
|
config = testutil.TestConfig(
|
|
books_path=testutil.test_path('books/fund.beancount'),
|
|
)
|
|
arglist.insert(0, '--output-file=-')
|
|
output = out_type()
|
|
errors = io.StringIO()
|
|
retcode = fund.main(arglist, output, errors, config)
|
|
output.seek(0)
|
|
return retcode, output, errors
|
|
|
|
@pytest.mark.parametrize('project,start_date,stop_date', [
|
|
('Conservancy', START_DATE, STOP_DATE),
|
|
('project=Conservancy', MID_DATE, STOP_DATE),
|
|
('Conservancy', START_DATE, MID_DATE),
|
|
('Alpha', START_DATE, STOP_DATE),
|
|
('project=Alpha', MID_DATE, STOP_DATE),
|
|
('Alpha', START_DATE, MID_DATE),
|
|
('Bravo', START_DATE, STOP_DATE),
|
|
('project=Bravo', MID_DATE, STOP_DATE),
|
|
('Bravo', START_DATE, MID_DATE),
|
|
('project=Charlie', START_DATE, STOP_DATE),
|
|
])
|
|
def test_text_report(project, start_date, stop_date):
|
|
retcode, output, errors = run_main(io.StringIO, [
|
|
'-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
|
|
])
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
check_text_report(output, project, start_date, stop_date)
|
|
|
|
@pytest.mark.parametrize('start_date,stop_date', [
|
|
(START_DATE, STOP_DATE),
|
|
(MID_DATE, STOP_DATE),
|
|
(START_DATE, MID_DATE),
|
|
])
|
|
def test_ods_report(start_date, stop_date):
|
|
retcode, output, errors = run_main(io.BytesIO, [
|
|
'--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
|
|
])
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
ods = odf.opendocument.load(output)
|
|
check_ods_report(ods, start_date, stop_date)
|
|
|
|
def test_main_no_postings(caplog):
|
|
retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
|
|
assert retcode == 24
|
|
assert any(log.levelname == 'WARNING' for log in caplog.records)
|