ledger: Add options to control account totals display.

This commit is contained in:
Brett Smith 2020-07-20 22:45:14 -04:00
parent 708d48699a
commit 6c7603fa6c
3 changed files with 140 additions and 52 deletions

View file

@ -118,12 +118,20 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
accounts: Optional[Sequence[str]]=None,
rt_wrapper: Optional[rtutil.RT]=None,
sheet_size: Optional[int]=None,
totals_with_entries: Optional[Sequence[str]]=None,
totals_without_entries: Optional[Sequence[str]]=None,
) -> None:
if sheet_size is None:
sheet_size = self.SHEET_SIZE
if totals_with_entries is None:
totals_with_entries = [s for s in self.ACCOUNT_COLUMNS if ':' not in s]
if totals_without_entries is None:
totals_without_entries = totals_with_entries
super().__init__(rt_wrapper)
self.date_range = ranges.DateRange(start_date, stop_date)
self.sheet_size = sheet_size
self.totals_with_entries = totals_with_entries
self.totals_without_entries = totals_without_entries
if accounts is None:
self.accounts = set(data.Account.iter_accounts())
@ -312,7 +320,7 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
self.balance_cell(self.norm_func(balance), stylename=self.style_bold),
)
def start_section(self, key: data.Account) -> None:
def start_section(self, key: data.Account, *, force_total: bool=False) -> None:
self.add_row()
self.add_row(
odf.table.TableCell(),
@ -325,7 +333,8 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
),
)
self.norm_func = core.normalize_amount_func(key)
self._report_section_balance(key, 'start')
if force_total or key.is_under(*self.totals_with_entries):
self._report_section_balance(key, 'start')
def end_section(self, key: data.Account) -> None:
self._report_section_balance(key, 'stop')
@ -409,8 +418,10 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
postings = self.account_groups[account]
if postings:
super().write(postings)
elif account.is_open_on_date(self.date_range.start):
self.start_section(account)
elif not account.is_open_on_date(self.date_range.start):
pass
elif account.is_under(*self.totals_without_entries):
self.start_section(account, force_total=True)
self.end_section(account)
for index in range(using_sheet_index + 1, len(sheet_names)):
self.start_sheet(sheet_names[index])
@ -450,6 +461,23 @@ date was also not specified.
multiple times. You can specify a part of the account hierarchy, or an account
classification from metadata. If not specified, the default set adapts to your
search criteria.
""")
parser.add_argument(
'--show-totals', '-S',
metavar='ACCOUNT',
action='append',
help="""When entries for this account appear in the report, include
account balance(s) as well. You can specify this option multiple times. Pass in
a part of the account hierarchy. The default is all accounts.
""")
parser.add_argument(
'--add-totals', '-T',
metavar='ACCOUNT',
action='append',
help="""When an account could be included in the report but does not
have any entries in the date range, include a header and account balance(s) for
it. You can specify this option multiple times. Pass in a part of the account
hierarchy. The default set adapts to your search criteria.
""")
parser.add_argument(
'--sheet-size', '--size',
@ -479,6 +507,8 @@ metadata to match. A single ticket number is a shortcut for
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
""")
args = parser.parse_args(arglist)
if args.add_totals is None and args.search_terms:
args.add_totals = []
if args.accounts is None:
if any(term.meta_key == 'project' for term in args.search_terms):
args.accounts = [
@ -548,6 +578,8 @@ def main(arglist: Optional[Sequence[str]]=None,
args.accounts,
rt_wrapper,
args.sheet_size,
args.show_totals,
args.add_totals,
)
except ValueError as error:
logger.error("%s: %r", *error.args)

View file

@ -5,7 +5,7 @@ from setuptools import setup
setup(
name='conservancy_beancount',
description="Plugin, library, and reports for reading Conservancy's books",
version='1.5.11',
version='1.5.12',
author='Software Freedom Conservancy',
author_email='info@sfconservancy.org',
license='GNU AGPLv3+',

View file

@ -67,11 +67,48 @@ STOP_DATE = datetime.date(2020, 3, 1)
def ledger_entries():
return copy.deepcopy(_ledger_load[0])
def iter_accounts(entries):
for entry in entries:
if isinstance(entry, bc_data.Open):
yield entry.account
class NotFound(Exception): pass
class NoSheet(NotFound): pass
class NoHeader(NotFound): pass
class ExpectedPostings(core.RelatedPostings):
@classmethod
def find_section(cls, ods, account):
for sheet in ods.getElementsByType(odf.table.Table):
sheet_account = sheet.getAttribute('name').replace(' ', ':')
if sheet_account and account.is_under(sheet_account):
break
else:
raise NoSheet(account)
rows = iter(sheet.getElementsByType(odf.table.TableRow))
for row in rows:
cells = row.childNodes
if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
break
else:
raise NoHeader(account)
return rows
@classmethod
def check_not_in_report(cls, ods, *accounts):
for account in accounts:
with pytest.raises(NotFound):
cls.find_section(ods, data.Account(account))
@classmethod
def check_in_report(cls, ods, account, start_date=START_DATE, end_date=STOP_DATE):
date = end_date + datetime.timedelta(days=1)
txn = testutil.Transaction(date=date, postings=[
(account, 0),
])
related = cls(data.Posting.from_txn(txn))
related.check_report(ods, start_date, end_date)
def slice_date_range(self, start_date, end_date):
postings = enumerate(self)
for start_index, post in postings:
@ -90,29 +127,14 @@ class ExpectedPostings(core.RelatedPostings):
return (self[:start_index].balance_at_cost(),
self[start_index:end_index])
def check_report(self, ods, start_date, end_date):
def check_report(self, ods, start_date, end_date, expect_totals=True):
account = self[0].account
norm_func = core.normalize_amount_func(account)
open_bal, expect_posts = self.slice_date_range(start_date, end_date)
open_bal = norm_func(open_bal)
for sheet in ods.getElementsByType(odf.table.Table):
sheet_account = sheet.getAttribute('name').replace(' ', ':')
if sheet_account and account.is_under(sheet_account):
break
else:
raise NoSheet(account)
rows = iter(sheet.getElementsByType(odf.table.TableRow))
for row in rows:
cells = row.childNodes
if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
break
else:
if expect_posts:
raise NoHeader(account)
else:
return
closing_bal = norm_func(expect_posts.balance_at_cost())
if account.is_under('Assets', 'Liabilities'):
rows = self.find_section(ods, account)
if expect_totals and account.is_under('Assets', 'Liabilities'):
opening_row = testutil.ODSCell.from_row(next(rows))
assert opening_row[0].value == start_date
assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
@ -128,10 +150,11 @@ class ExpectedPostings(core.RelatedPostings):
else:
assert next(cells).value == norm_func(expected.units.number)
assert next(cells).value == norm_func(expected.at_cost().number)
closing_row = testutil.ODSCell.from_row(next(rows))
assert closing_row[0].value == end_date
empty = '$0.00' if expect_posts else '0'
assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
if expect_totals:
closing_row = testutil.ODSCell.from_row(next(rows))
assert closing_row[0].value == end_date
empty = '$0.00' if expect_posts else '0'
assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
def get_sheet_names(ods):
@ -236,6 +259,14 @@ def test_plan_sheets_full_split_required(caplog):
assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
assert not caplog.records
def build_report(ledger_entries, start_date, stop_date, *args, **kwargs):
postings = list(data.Posting.from_entries(iter(ledger_entries)))
with clean_account_meta():
data.Account.load_openings_and_closings(iter(ledger_entries))
report = ledger.LedgerODS(start_date, stop_date, *args, **kwargs)
report.write(iter(postings))
return postings, report
@pytest.mark.parametrize('start_date,stop_date', [
(START_DATE, STOP_DATE),
(START_DATE, MID_DATE),
@ -244,32 +275,49 @@ def test_plan_sheets_full_split_required(caplog):
(STOP_DATE, STOP_DATE.replace(month=12)),
])
def test_date_range_report(ledger_entries, start_date, stop_date):
postings = list(data.Posting.from_entries(iter(ledger_entries)))
with clean_account_meta():
data.Account.load_openings_and_closings(iter(ledger_entries))
report = ledger.LedgerODS(start_date, stop_date)
report.write(iter(postings))
for _, expected in ExpectedPostings.group_by_account(postings):
expected.check_report(report.document, start_date, stop_date)
postings, report = build_report(ledger_entries, start_date, stop_date)
expected = dict(ExpectedPostings.group_by_account(postings))
for account in iter_accounts(ledger_entries):
try:
related = expected[account]
except KeyError:
ExpectedPostings.check_in_report(report.document, account, start_date, stop_date)
else:
related.check_report(report.document, start_date, stop_date)
@pytest.mark.parametrize('tot_accts', [
(),
('Assets', 'Liabilities'),
('Income', 'Expenses'),
('Assets', 'Liabilities', 'Income', 'Expenses'),
])
def test_report_filter_totals(ledger_entries, tot_accts):
postings, report = build_report(ledger_entries, START_DATE, STOP_DATE,
totals_with_entries=tot_accts,
totals_without_entries=tot_accts)
expected = dict(ExpectedPostings.group_by_account(postings))
for account in iter_accounts(ledger_entries):
expect_totals = account.startswith(tot_accts)
if account in expected and expected[account][-1].meta.date >= START_DATE:
expected[account].check_report(report.document, START_DATE, STOP_DATE,
expect_totals=expect_totals)
elif expect_totals:
ExpectedPostings.check_in_report(report.document, account)
else:
ExpectedPostings.check_not_in_report(report.document, account)
@pytest.mark.parametrize('accounts', [
('Income', 'Expenses'),
('Assets:Receivable', 'Liabilities:Payable'),
])
def test_account_names_report(ledger_entries, accounts):
postings = list(data.Posting.from_entries(iter(ledger_entries)))
with clean_account_meta():
data.Account.load_openings_and_closings(iter(ledger_entries))
report = ledger.LedgerODS(START_DATE, STOP_DATE, accounts=accounts)
report.write(iter(postings))
for key, expected in ExpectedPostings.group_by_account(postings):
should_find = key.startswith(accounts)
try:
expected.check_report(report.document, START_DATE, STOP_DATE)
except NotFound:
assert not should_find
postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, accounts)
expected = dict(ExpectedPostings.group_by_account(postings))
for account in iter_accounts(ledger_entries):
if account.startswith(accounts):
expected[account].check_report(report.document, START_DATE, STOP_DATE)
else:
assert should_find
ExpectedPostings.check_not_in_report(report.document, account)
def run_main(arglist, config=None):
if config is None:
@ -295,9 +343,13 @@ def test_main(ledger_entries):
assert retcode == 0
ods = odf.opendocument.load(output)
assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
postings = data.Posting.from_entries(ledger_entries)
for _, expected in ExpectedPostings.group_by_account(postings):
expected.check_report(ods, START_DATE, STOP_DATE)
postings = data.Posting.from_entries(iter(ledger_entries))
expected = dict(ExpectedPostings.group_by_account(postings))
for account in iter_accounts(ledger_entries):
try:
expected[account].check_report(ods, START_DATE, STOP_DATE)
except KeyError:
ExpectedPostings.check_in_report(ods, account)
@pytest.mark.parametrize('acct_arg', [
'Liabilities',
@ -351,7 +403,7 @@ def test_main_account_classification_splits_hierarchy(ledger_entries):
('nineteen', MID_DATE, STOP_DATE),
])
def test_main_project_report(ledger_entries, project, start_date, stop_date):
postings = data.Posting.from_entries(ledger_entries)
postings = data.Posting.from_entries(iter(ledger_entries))
for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
if key == project:
break
@ -365,8 +417,12 @@ def test_main_project_report(ledger_entries, project, start_date, stop_date):
assert retcode == 0
ods = odf.opendocument.load(output)
assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
for _, expected in ExpectedPostings.group_by_account(related):
expected.check_report(ods, start_date, stop_date)
expected = dict(ExpectedPostings.group_by_account(related))
for account in iter_accounts(ledger_entries):
try:
expected[account].check_report(ods, start_date, stop_date)
except KeyError:
ExpectedPostings.check_not_in_report(ods, account)
@pytest.mark.parametrize('arg', [
'Assets:NoneSuchBank',