"""test_reports_fund.py - Unit tests for fund report""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections import copy import datetime import io import itertools import pytest from . import testutil import babel.numbers import odf.opendocument import odf.table from beancount import loader as bc_loader from conservancy_beancount import data from conservancy_beancount.reports import core from conservancy_beancount.reports import fund from decimal import Decimal _ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount')) START_DATE = datetime.date(2018, 3, 1) MID_DATE = datetime.date(2019, 3, 1) STOP_DATE = datetime.date(2020, 3, 1) OPENING_BALANCES = { 'Alpha': 3000, 'Bravo': 2000, 'Charlie': 1000, 'Conservancy': 4000, } BALANCES_BY_YEAR = { ('Conservancy', 2018): [ ('Income:Other', 40), ('Expenses:Other', -4), ], ('Conservancy', 2019): [ ('Income:Other', 42), ('Expenses:Other', Decimal('-4.20')), ('Equity:Realized:CurrencyConversion', Decimal('6.20')), ], ('Alpha', 2018): [ ('Income:Other', 60), ], ('Alpha', 2019): [ ('Income:Other', 30), ('Expenses:Other', -26), ], ('Bravo', 2018): [ ('Expenses:Other', -20), ], ('Bravo', 2019): [ ('Income:Other', 200), ], } @pytest.fixture def fund_entries(): return copy.deepcopy(_ledger_load[0]) def fund_postings(entries, project, stop_date): return ( post for post in data.Posting.from_entries(entries) if post.meta.date < stop_date and post.account.is_under('Equity', 'Income', 'Expenses') and post.meta.get('project') == project ) def split_text_lines(output): for line in output: account, amount = line.rsplit(None, 1) yield account.strip(), amount def format_amount(amount, currency='USD'): return babel.numbers.format_currency( amount, currency, format_type='accounting', ) def check_text_report(output, project, start_date, stop_date): _, _, project = project.rpartition('=') balance_amount = Decimal(OPENING_BALANCES[project]) expected = collections.defaultdict(Decimal) for year in range(2018, stop_date.year): try: amounts = BALANCES_BY_YEAR[(project, year)] except KeyError: pass else: for account, amount in amounts: if year < start_date.year: balance_amount += amount else: expected[account] += amount expected.default_factory = None actual = split_text_lines(output) next(actual); next(actual) # Discard headers open_acct, open_amt = next(actual) assert open_acct == "{} balance as of {}".format( project, start_date.isoformat(), ) assert open_amt == format_amount(balance_amount) for expect_account in [ 'Equity:Realized:CurrencyConversion', 'Income:Other', 'Expenses:Other', ]: try: expect_amount = expected[expect_account] except KeyError: continue else: actual_account, actual_amount = next(actual) assert actual_account == expect_account assert actual_amount == format_amount(expect_amount) balance_amount += expect_amount end_acct, end_amt = next(actual) assert end_acct == "{} balance as of {}".format( project, stop_date.isoformat(), ) assert end_amt == format_amount(balance_amount) assert next(actual, None) is None def check_ods_report(ods, start_date, stop_date): account_bals = collections.OrderedDict((key, { 'opening': Decimal(amount), 'Income': Decimal(0), 'Expenses': Decimal(0), 'Equity': Decimal(0), }) for key, amount in sorted(OPENING_BALANCES.items())) for fund, year in itertools.product(account_bals, range(2018, stop_date.year)): try: amounts = BALANCES_BY_YEAR[(fund, year)] except KeyError: pass else: for account, amount in amounts: if year < start_date.year: acct_key = 'opening' else: acct_key, _, _ = account.partition(':') account_bals[fund][acct_key] += amount account_bals['Unrestricted'] = account_bals.pop('Conservancy') for row in ods.getElementsByType(odf.table.TableRow): cells = iter(testutil.ODSCell.from_row(row)) try: fund = next(cells).firstChild.text except (AttributeError, StopIteration): fund = None if fund in account_bals: balances = account_bals.pop(fund) assert next(cells).value == balances['opening'] assert next(cells).value == balances['Income'] assert next(cells).value == -balances['Expenses'] if balances['Equity']: assert next(cells).value == balances['Equity'] else: assert not next(cells).value assert next(cells).value == sum(balances.values()) assert not account_bals, "did not see all funds in report" def run_main(out_type, arglist, config=None): if config is None: config = testutil.TestConfig( books_path=testutil.test_path('books/fund.beancount'), ) arglist.insert(0, '--output-file=-') output = out_type() errors = io.StringIO() retcode = fund.main(arglist, output, errors, config) output.seek(0) return retcode, output, errors @pytest.mark.parametrize('project,start_date,stop_date', [ ('Conservancy', START_DATE, STOP_DATE), ('project=Conservancy', MID_DATE, STOP_DATE), ('Conservancy', START_DATE, MID_DATE), ('Alpha', START_DATE, STOP_DATE), ('project=Alpha', MID_DATE, STOP_DATE), ('Alpha', START_DATE, MID_DATE), ('Bravo', START_DATE, STOP_DATE), ('project=Bravo', MID_DATE, STOP_DATE), ('Bravo', START_DATE, MID_DATE), ('project=Charlie', START_DATE, STOP_DATE), ]) def test_text_report(project, start_date, stop_date): retcode, output, errors = run_main(io.StringIO, [ '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project, ]) assert not errors.getvalue() assert retcode == 0 check_text_report(output, project, start_date, stop_date) @pytest.mark.parametrize('start_date,stop_date', [ (START_DATE, STOP_DATE), (MID_DATE, STOP_DATE), (START_DATE, MID_DATE), ]) def test_ods_report(start_date, stop_date): retcode, output, errors = run_main(io.BytesIO, [ '--begin', start_date.isoformat(), '--end', stop_date.isoformat(), ]) assert not errors.getvalue() assert retcode == 0 ods = odf.opendocument.load(output) check_ods_report(ods, start_date, stop_date) def test_main_no_postings(caplog): retcode, output, errors = run_main(io.StringIO, ['NonexistentProject']) assert retcode == 24 assert any(log.levelname == 'WARNING' for log in caplog.records)