317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""test_reports_ledger.py - Unit tests for general ledger report"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import collections
|
|
import copy
|
|
import datetime
|
|
import io
|
|
import re
|
|
|
|
import pytest
|
|
|
|
from . import testutil
|
|
|
|
import odf.table
|
|
import odf.text
|
|
|
|
from beancount.core import data as bc_data
|
|
from beancount import loader as bc_loader
|
|
from conservancy_beancount import data
|
|
from conservancy_beancount.reports import core
|
|
from conservancy_beancount.reports import ledger
|
|
|
|
Acct = data.Account
|
|
|
|
_ledger_load = bc_loader.load_file(testutil.test_path('books/ledger.beancount'))
|
|
DEFAULT_REPORT_SHEETS = [
|
|
'Balance',
|
|
'Income',
|
|
'Expenses',
|
|
'Equity',
|
|
'Assets:Receivable',
|
|
'Liabilities:Payable',
|
|
'Assets:PayPal',
|
|
'Assets',
|
|
'Liabilities',
|
|
]
|
|
PROJECT_REPORT_SHEETS = [
|
|
'Balance',
|
|
'Income',
|
|
'Expenses',
|
|
'Assets:Receivable',
|
|
'Liabilities:Payable',
|
|
]
|
|
OVERSIZE_RE = re.compile(
|
|
r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
|
|
)
|
|
START_DATE = datetime.date(2018, 3, 1)
|
|
MID_DATE = datetime.date(2019, 3, 1)
|
|
STOP_DATE = datetime.date(2020, 3, 1)
|
|
|
|
@pytest.fixture
|
|
def ledger_entries():
|
|
return copy.deepcopy(_ledger_load[0])
|
|
|
|
class NotFound(Exception): pass
|
|
class NoSheet(NotFound): pass
|
|
class NoHeader(NotFound): pass
|
|
|
|
class ExpectedPostings(core.RelatedPostings):
|
|
def slice_date_range(self, start_date, end_date):
|
|
postings = enumerate(self)
|
|
for start_index, post in postings:
|
|
if start_date <= post.meta.date:
|
|
break
|
|
else:
|
|
start_index += 1
|
|
if end_date <= post.meta.date:
|
|
end_index = start_index
|
|
else:
|
|
for end_index, post in postings:
|
|
if end_date <= post.meta.date:
|
|
break
|
|
else:
|
|
end_index = None
|
|
return (self[:start_index].balance_at_cost(),
|
|
self[start_index:end_index])
|
|
|
|
def check_report(self, ods, start_date, end_date):
|
|
account = self[0].account
|
|
norm_func = core.normalize_amount_func(account)
|
|
open_bal, expect_posts = self.slice_date_range(start_date, end_date)
|
|
open_bal = norm_func(open_bal)
|
|
for sheet in ods.getElementsByType(odf.table.Table):
|
|
sheet_account = sheet.getAttribute('name').replace(' ', ':')
|
|
if sheet_account and account.is_under(sheet_account):
|
|
break
|
|
else:
|
|
raise NoSheet(account)
|
|
rows = iter(sheet.getElementsByType(odf.table.TableRow))
|
|
for row in rows:
|
|
cells = row.childNodes
|
|
if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
|
|
break
|
|
else:
|
|
if expect_posts:
|
|
raise NoHeader(account)
|
|
else:
|
|
return
|
|
if account.is_under('Assets', 'Equity', 'Liabilities'):
|
|
opening_row = testutil.ODSCell.from_row(next(rows))
|
|
assert opening_row[0].value == start_date
|
|
assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
|
|
for expected in expect_posts:
|
|
cells = iter(testutil.ODSCell.from_row(next(rows)))
|
|
assert next(cells).value == expected.meta.date
|
|
assert next(cells).text == (expected.meta.get('entity') or '')
|
|
assert next(cells).text == (expected.meta.txn.narration or '')
|
|
if expected.cost is None:
|
|
assert not next(cells).text
|
|
assert next(cells).value == norm_func(expected.units.number)
|
|
else:
|
|
assert next(cells).value == norm_func(expected.units.number)
|
|
assert next(cells).value == norm_func(expected.at_cost().number)
|
|
closing_row = testutil.ODSCell.from_row(next(rows))
|
|
closing_bal = open_bal + norm_func(expect_posts.balance_at_cost())
|
|
assert closing_row[0].value == end_date
|
|
assert closing_row[4].text == closing_bal.format(None, empty='0', sep='\0')
|
|
|
|
|
|
def get_sheet_names(ods):
|
|
return [sheet.getAttribute('name').replace(' ', ':')
|
|
for sheet in ods.getElementsByType(odf.table.Table)]
|
|
|
|
def check_oversize_logs(caplog, accounts, sheet_size):
|
|
actual = {}
|
|
for log in caplog.records:
|
|
match = OVERSIZE_RE.match(log.message)
|
|
if match:
|
|
assert int(match.group(3).replace(',', '')) == sheet_size
|
|
actual[match.group(1)] = int(match.group(2).replace(',', ''))
|
|
expected = {name: size for name, size in accounts.items() if size > sheet_size}
|
|
assert actual == expected
|
|
|
|
def test_plan_sheets_no_change():
|
|
have = {
|
|
Acct('Assets:Cash'): 10,
|
|
Acct('Income:Donations'): 20,
|
|
}
|
|
want = ['Assets', 'Income']
|
|
actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
|
|
assert actual == want
|
|
|
|
@pytest.mark.parametrize('have', [
|
|
{},
|
|
{Acct('Income:Other'): 10},
|
|
{Acct('Assets:Checking'): 20, Acct('Expenses:Other'): 15},
|
|
])
|
|
def test_plan_sheets_includes_accounts_without_transactions(have):
|
|
want = ['Assets', 'Income', 'Expenses']
|
|
actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
|
|
assert actual == want
|
|
|
|
def test_plan_sheets_single_split():
|
|
have = {
|
|
Acct('Assets:Cash'): 60,
|
|
Acct('Assets:Checking'): 80,
|
|
Acct('Income:Donations'): 50,
|
|
Acct('Expenses:Travel'): 90,
|
|
Acct('Expenses:FilingFees'): 25,
|
|
}
|
|
want = ['Assets', 'Income', 'Expenses']
|
|
actual = ledger.LedgerODS.plan_sheets(have, want, 100)
|
|
assert actual == [
|
|
'Assets:Checking',
|
|
'Assets',
|
|
'Income',
|
|
'Expenses:Travel',
|
|
'Expenses',
|
|
]
|
|
|
|
def test_plan_sheets_split_subtree():
|
|
have = {
|
|
Acct('Assets:Bank1:Checking'): 80,
|
|
Acct('Assets:Bank1:Savings'): 10,
|
|
Acct('Assets:Cash:USD'): 20,
|
|
Acct('Assets:Cash:EUR'): 15,
|
|
}
|
|
actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
|
|
assert actual == ['Assets:Bank1', 'Assets']
|
|
|
|
def test_plan_sheets_ambiguous_split():
|
|
have = {
|
|
Acct('Assets:Bank1:Checking'): 80,
|
|
Acct('Assets:Bank1:Savings'): 40,
|
|
Acct('Assets:Receivable:Accounts'): 40,
|
|
Acct('Assets:Cash'): 10,
|
|
}
|
|
actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
|
|
# :Savings cannot fit with :Checking, so it's important that the return
|
|
# value disambiguate that.
|
|
assert actual == ['Assets:Bank1:Checking', 'Assets']
|
|
|
|
def test_plan_sheets_oversize(caplog):
|
|
have = {
|
|
Acct('Assets:Checking'): 150,
|
|
Acct('Assets:Cash'): 50,
|
|
}
|
|
actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
|
|
assert actual == ['Assets:Checking', 'Assets']
|
|
check_oversize_logs(caplog, have, 100)
|
|
|
|
def test_plan_sheets_all_oversize(caplog):
|
|
have = {
|
|
Acct('Assets:Checking'): 150,
|
|
Acct('Assets:Cash'): 150,
|
|
}
|
|
actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
|
|
# In this case, each account should appear in alphabetical order.
|
|
assert actual == ['Assets:Cash', 'Assets:Checking']
|
|
check_oversize_logs(caplog, have, 100)
|
|
|
|
def test_plan_sheets_full_split_required(caplog):
|
|
have = {
|
|
Acct('Assets:Bank:Savings'): 98,
|
|
Acct('Assets:Bank:Checking'): 96,
|
|
Acct('Assets:Bank:Investment'): 94,
|
|
}
|
|
actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
|
|
assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
|
|
assert not caplog.records
|
|
|
|
@pytest.mark.parametrize('start_date,stop_date', [
|
|
(START_DATE, STOP_DATE),
|
|
(START_DATE, MID_DATE),
|
|
(MID_DATE, STOP_DATE),
|
|
(START_DATE.replace(month=6), START_DATE.replace(month=12)),
|
|
(STOP_DATE, STOP_DATE.replace(month=12)),
|
|
])
|
|
def test_date_range_report(ledger_entries, start_date, stop_date):
|
|
postings = list(data.Posting.from_entries(ledger_entries))
|
|
report = ledger.LedgerODS(start_date, stop_date)
|
|
report.write(iter(postings))
|
|
for _, expected in ExpectedPostings.group_by_account(postings):
|
|
expected.check_report(report.document, start_date, stop_date)
|
|
|
|
@pytest.mark.parametrize('sheet_names', [
|
|
('Income', 'Expenses'),
|
|
('Assets:Receivable', 'Liabilities:Payable'),
|
|
])
|
|
def test_account_names_report(ledger_entries, sheet_names):
|
|
postings = list(data.Posting.from_entries(ledger_entries))
|
|
report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names)
|
|
report.write(iter(postings))
|
|
for key, expected in ExpectedPostings.group_by_account(postings):
|
|
should_find = key.startswith(sheet_names)
|
|
try:
|
|
expected.check_report(report.document, START_DATE, STOP_DATE)
|
|
except NotFound:
|
|
assert not should_find
|
|
else:
|
|
assert should_find
|
|
|
|
def run_main(arglist, config=None):
|
|
if config is None:
|
|
config = testutil.TestConfig(
|
|
books_path=testutil.test_path('books/ledger.beancount'),
|
|
rt_client=testutil.RTClient(),
|
|
)
|
|
arglist.insert(0, '--output-file=-')
|
|
output = io.BytesIO()
|
|
errors = io.StringIO()
|
|
retcode = ledger.main(arglist, output, errors, config)
|
|
output.seek(0)
|
|
return retcode, output, errors
|
|
|
|
def test_main(ledger_entries):
|
|
retcode, output, errors = run_main([
|
|
'-b', START_DATE.isoformat(),
|
|
'-e', STOP_DATE.isoformat(),
|
|
])
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
ods = odf.opendocument.load(output)
|
|
assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
|
|
postings = data.Posting.from_entries(ledger_entries)
|
|
for _, expected in ExpectedPostings.group_by_account(postings):
|
|
expected.check_report(ods, START_DATE, STOP_DATE)
|
|
|
|
@pytest.mark.parametrize('project,start_date,stop_date', [
|
|
('eighteen', START_DATE, MID_DATE.replace(day=30)),
|
|
('nineteen', MID_DATE, STOP_DATE),
|
|
])
|
|
def test_main_project_report(ledger_entries, project, start_date, stop_date):
|
|
postings = data.Posting.from_entries(ledger_entries)
|
|
for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
|
|
if key == project:
|
|
break
|
|
assert key == project
|
|
retcode, output, errors = run_main([
|
|
f'--begin={start_date.isoformat()}',
|
|
f'--end={stop_date.isoformat()}',
|
|
project,
|
|
])
|
|
assert not errors.getvalue()
|
|
assert retcode == 0
|
|
ods = odf.opendocument.load(output)
|
|
assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
|
|
for _, expected in ExpectedPostings.group_by_account(related):
|
|
expected.check_report(ods, start_date, stop_date)
|
|
|
|
def test_main_no_postings(caplog):
|
|
retcode, output, errors = run_main(['NonexistentProject'])
|
|
assert retcode == 24
|
|
assert any(log.levelname == 'WARNING' for log in caplog.records)
|