diff --git a/conservancy_beancount/reports/ledger.py b/conservancy_beancount/reports/ledger.py index a433473..d763619 100644 --- a/conservancy_beancount/reports/ledger.py +++ b/conservancy_beancount/reports/ledger.py @@ -52,6 +52,7 @@ import logging import sys from typing import ( + Any, Callable, Dict, Iterable, @@ -65,6 +66,9 @@ from typing import ( Tuple, Union, ) +from ..beancount_types import ( + Transaction, +) from pathlib import Path @@ -132,6 +136,7 @@ class LedgerODS(core.BaseODS[data.Posting, None]): self.sheet_size = sheet_size self.totals_with_entries = totals_with_entries self.totals_without_entries = totals_without_entries + self.report_name = "Ledger" if accounts is None: self.accounts = set(data.Account.iter_accounts()) @@ -176,6 +181,7 @@ class LedgerODS(core.BaseODS[data.Posting, None]): self.amount_column = self.column_style(1.2) self.default_column = self.column_style(1.5) self.column_styles: Mapping[str, Union[str, odf.style.Style]] = { + 'Account': self.column_style(2), # for TransactionODS 'Date': '', 'Description': self.column_style(2), 'Original Amount': self.amount_column, @@ -299,6 +305,26 @@ class LedgerODS(core.BaseODS[data.Posting, None]): )) self.lock_first_row() + def _write_total_row(self, + date: datetime.date, + description: str, + balance: core.Balance, + ) -> None: + cells: List[odf.table.TableCell] = [] + for column in self.CORE_COLUMNS: + if column == 'Date': + cell = self.date_cell(date, stylename=self.merge_styles( + self.style_bold, self.style_date, + )) + elif column == 'Description': + cell = self.string_cell(description, stylename=self.style_bold) + elif column == 'Booked Amount': + cell = self.balance_cell(self.norm_func(balance), stylename=self.style_bold) + else: + cell = odf.table.TableCell() + cells.append(cell) + self.add_row(*cells) + def _report_section_balance(self, key: data.Account, date_key: str) -> None: related = self.account_groups[key] if date_key == 'start': @@ -315,22 +341,14 @@ class LedgerODS(core.BaseODS[data.Posting, None]): else: balance = related.period_bal description = "Period Total" - self.add_row( - self.date_cell(date, stylename=self.merge_styles( - self.style_bold, self.style_date, - )), - odf.table.TableCell(), - self.string_cell(description, stylename=self.style_bold), - odf.table.TableCell(), - self.balance_cell(self.norm_func(balance), stylename=self.style_bold), - ) + self._write_total_row(date, description, balance) def write_header(self, key: data.Account) -> None: self.add_row() self.add_row( odf.table.TableCell(), self.string_cell( - f"{key} Ledger" + f"{key} {self.report_name}" f" From {self.date_range.start.isoformat()}" f" To {self.date_range.stop.isoformat()}", stylename=self.style_bold, @@ -438,14 +456,205 @@ class LedgerODS(core.BaseODS[data.Posting, None]): self.start_sheet(sheet_names[index]) +class TransactionFilter(enum.IntFlag): + ZERO = 1 + CREDIT = 2 + DEBIT = 4 + ALL = ZERO | CREDIT | DEBIT + + @classmethod + def from_arg(cls, s: str) -> 'TransactionFilter': + try: + return cls[s.upper()] + except KeyError: + raise ValueError(f"unknown transaction filter {s!r}") + + @classmethod + def post_flag(cls, post: data.Posting) -> int: + norm_func = core.normalize_amount_func(post.account) + number = norm_func(post.units.number) + if not number: + return cls.ZERO + elif number > 0: + return cls.CREDIT + else: + return cls.DEBIT + + +class TransactionODS(LedgerODS): + CORE_COLUMNS: Sequence[str] = [ + 'Date', + 'Description', + 'Account', + data.Metadata.human_name('entity'), + 'Original Amount', + 'Booked Amount', + ] + METADATA_COLUMNS: Sequence[str] = [ + 'project', + 'rt-id', + 'receipt', + 'check', + 'invoice', + 'contract', + 'approval', + 'paypal-id', + 'check-number', + 'bank-statement', + ] + + def __init__(self, + start_date: datetime.date, + stop_date: datetime.date, + accounts: Optional[Sequence[str]]=None, + rt_wrapper: Optional[rtutil.RT]=None, + sheet_size: Optional[int]=None, + totals_with_entries: Optional[Sequence[str]]=None, + totals_without_entries: Optional[Sequence[str]]=None, + txn_filter: int=TransactionFilter.ALL, + ) -> None: + super().__init__( + start_date, + stop_date, + accounts, + rt_wrapper, + sheet_size, + totals_with_entries, + totals_without_entries, + ) + self.txn_filter = txn_filter + if self.txn_filter == TransactionFilter.CREDIT: + self.report_name = "Receipts" + elif self.txn_filter == TransactionFilter.DEBIT: + self.report_name = "Disbursements" + else: + self.report_name = "Transactions" + + def _wanted_txns(self, postings: Iterable[data.Posting]) -> Iterator[Transaction]: + last_txn: Optional[Transaction] = None + for post in postings: + txn = post.meta.txn + if (txn is not last_txn + and TransactionFilter.post_flag(post) & self.txn_filter): + yield txn + last_txn = txn + + def metadata_columns_for(self, sheet_name: str) -> Sequence[str]: + return self.METADATA_COLUMNS + + def write_balance_sheet(self) -> None: + return + + def _report_section_balance(self, key: data.Account, date_key: str) -> None: + if self.txn_filter == TransactionFilter.ALL: + super()._report_section_balance(key, date_key) + elif date_key == 'stop': + balance = core.Balance( + post.at_cost() + for txn in self._wanted_txns(self.account_groups[key]) + for post in data.Posting.from_txn(txn) + if post.account == key + ) + self._write_total_row(self.date_range.stop, "Period Activity", balance) + + def _account_tally(self, account: data.Account) -> int: + return sum(len(txn.postings) + for txn in self._wanted_txns(self.account_groups[account])) + + def write_entries(self, account: data.Account, rows: Iterable[data.Posting]) -> None: + for txn in self._wanted_txns(rows): + post_list = list(data.Posting.from_txn(txn)) + post_list.sort(key=lambda post: ( + 0 if post.account == account else 1, + -abs(post.at_cost().number), + )) + postings = iter(post_list) + post1 = next(postings) + if post1.cost is None: + amount_cell = odf.table.TableCell() + else: + amount_cell = self.currency_cell(self.norm_func(post1.units)) + self.add_row( + self.date_cell(txn.date), + self.string_cell(txn.narration), + self.string_cell(post1.account), + self.string_cell(post1.meta.get('entity') or ''), + amount_cell, + self.currency_cell(self.norm_func(post1.at_cost())), + *(self.meta_links_cell(post1.meta.report_links(key)) + if key in data.LINK_METADATA + else self.string_cell(post1.meta.get(key, '')) + for key in self.metadata_columns), + ) + for post in postings: + meta_cells: List[odf.table.TableCell] = [] + for meta_key in self.metadata_columns: + try: + dup = post.meta[meta_key] is txn.meta[meta_key] + except KeyError: + dup = False + if dup: + meta_cell = odf.table.TableCell() + elif meta_key in data.LINK_METADATA: + meta_cell = self.meta_links_cell(post.meta.report_links(meta_key)) + else: + meta_cell = self.string_cell(post.meta.get(meta_key, '')) + meta_cells.append(meta_cell) + if post.cost is None: + amount_cell = odf.table.TableCell() + else: + amount_cell = self.currency_cell(self.norm_func(post.units)) + self.add_row( + odf.table.TableCell(), + odf.table.TableCell(), + self.string_cell(post.account), + self.string_cell(post.meta.get('entity') or ''), + amount_cell, + self.currency_cell(self.norm_func(post.at_cost())), + *meta_cells, + ) + + class ReturnFlag(enum.IntFlag): LOAD_ERRORS = 1 NOTHING_TO_REPORT = 8 +class CashReportAction(argparse.Action): + def __call__(self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[Sequence[Any], str, None]=None, + option_string: Optional[str]=None, + ) -> None: + namespace.txn_filter = self.const + if namespace.accounts is None: + namespace.accounts = [] + namespace.accounts.append('Assets:PayPal') + namespace.accounts.append('Cash') + if namespace.stop_date is None: + namespace.stop_date = datetime.date.today() + + def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace: parser = argparse.ArgumentParser(prog=PROGNAME) cliutil.add_version_argument(parser) + parser.add_argument( + '--disbursements', + action=CashReportAction, + const=TransactionFilter.DEBIT, + nargs=0, + help="""Shortcut to set all the necessary options to generate a cash +disbursements report. +""") + parser.add_argument( + '--receipts', + action=CashReportAction, + const=TransactionFilter.CREDIT, + nargs=0, + help="""Shortcut to set all the necessary options to generate a cash +receipts report. +""") parser.add_argument( '--begin', '--start', '-b', dest='start_date', @@ -462,6 +671,15 @@ The default is one year ago. help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format. The default is a year after the start date, or 30 days from today if the start date was also not specified. +""") + parser.add_argument( + '--transactions', '-t', + dest='txn_filter', + metavar='TYPE', + type=TransactionFilter.from_arg, + help="""Report whole transactions rather than individual postings. +The type argument selects which type of transactions to report. Choices are +credit, debit, or all. """) parser.add_argument( '--account', '-a', @@ -583,15 +801,27 @@ def main(arglist: Optional[Sequence[str]]=None, if rt_wrapper is None: logger.warning("could not initialize RT client; spreadsheet links will be broken") try: - report = LedgerODS( - args.start_date, - args.stop_date, - args.accounts, - rt_wrapper, - args.sheet_size, - args.show_totals, - args.add_totals, - ) + if args.txn_filter is None: + report = LedgerODS( + args.start_date, + args.stop_date, + args.accounts, + rt_wrapper, + args.sheet_size, + args.show_totals, + args.add_totals, + ) + else: + report = TransactionODS( + args.start_date, + args.stop_date, + args.accounts, + rt_wrapper, + args.sheet_size, + args.show_totals, + args.add_totals, + args.txn_filter, + ) except ValueError as error: logger.error("%s: %r", *error.args) return 2 @@ -602,8 +832,10 @@ def main(arglist: Optional[Sequence[str]]=None, if args.output_file is None: out_dir_path = config.repository_path() or Path() - args.output_file = out_dir_path / 'LedgerReport_{}_{}.ods'.format( - args.start_date.isoformat(), args.stop_date.isoformat(), + args.output_file = out_dir_path / '{}Report_{}_{}.ods'.format( + report.report_name, + args.start_date.isoformat(), + args.stop_date.isoformat(), ) logger.info("Writing report to %s", args.output_file) ods_file = cliutil.bytes_output(args.output_file, stdout) diff --git a/setup.py b/setup.py index 1c67f55..f073853 100755 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup setup( name='conservancy_beancount', description="Plugin, library, and reports for reading Conservancy's books", - version='1.5.13', + version='1.6.0', author='Software Freedom Conservancy', author_email='info@sfconservancy.org', license='GNU AGPLv3+', diff --git a/tests/test_reports_ledger.py b/tests/test_reports_ledger.py index db95bfb..6d17991 100644 --- a/tests/test_reports_ledger.py +++ b/tests/test_reports_ledger.py @@ -19,6 +19,7 @@ import contextlib import copy import datetime import io +import itertools import re import pytest @@ -63,6 +64,12 @@ START_DATE = datetime.date(2018, 3, 1) MID_DATE = datetime.date(2019, 3, 1) STOP_DATE = datetime.date(2020, 3, 1) +REPORT_KWARGS = [ + {'report_class': ledger.LedgerODS}, + *({'report_class': ledger.TransactionODS, 'txn_filter': flags} + for flags in ledger.TransactionFilter), +] + @pytest.fixture def ledger_entries(): return copy.deepcopy(_ledger_load[0]) @@ -101,13 +108,17 @@ class ExpectedPostings(core.RelatedPostings): cls.find_section(ods, data.Account(account)) @classmethod - def check_in_report(cls, ods, account, start_date=START_DATE, end_date=STOP_DATE): + def check_in_report(cls, ods, account, + start_date=START_DATE, end_date=STOP_DATE, txn_filter=None): date = end_date + datetime.timedelta(days=1) txn = testutil.Transaction(date=date, postings=[ (account, 0), ]) related = cls(data.Posting.from_txn(txn)) - related.check_report(ods, start_date, end_date) + if txn_filter is None: + related.check_report(ods, start_date, end_date) + else: + related.check_txn_report(ods, txn_filter, start_date, end_date) def slice_date_range(self, start_date, end_date): postings = enumerate(self) @@ -156,6 +167,64 @@ class ExpectedPostings(core.RelatedPostings): empty = '$0.00' if expect_posts else '0' assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0') + def _post_data_from_row(self, row): + if row[4].text: + number = row[4].value + match = re.search(r'([A-Z]{3})\d*Cell', row[4].getAttribute('stylename') or '') + assert match + currency = match.group(1) + else: + number = row[5].value + currency = 'USD' + return (row[2].text, row[3].text, number, currency) + + def _post_data_from_post(self, post, norm_func): + return ( + post.account, + post.meta.get('entity') or '', + norm_func(post.units.number), + post.units.currency, + ) + + def check_txn_report(self, ods, txn_filter, start_date, end_date, expect_totals=True): + account = self[0].account + norm_func = core.normalize_amount_func(account) + open_bal, expect_posts = self.slice_date_range(start_date, end_date) + open_bal = norm_func(open_bal) + period_bal = core.MutableBalance() + rows = self.find_section(ods, account) + if (expect_totals + and txn_filter == ledger.TransactionFilter.ALL + and account.is_under('Assets', 'Liabilities')): + opening_row = testutil.ODSCell.from_row(next(rows)) + assert opening_row[0].value == start_date + assert opening_row[5].text == open_bal.format(None, empty='0', sep='\0') + period_bal += open_bal + last_txn = None + for post in expect_posts: + txn = post.meta.txn + post_flag = ledger.TransactionFilter.post_flag(post) + if txn is last_txn or (not txn_filter & post_flag): + continue + last_txn = txn + row1 = testutil.ODSCell.from_row(next(rows)) + assert row1[0].value == txn.date + assert row1[1].text == (txn.narration or '') + expected = {self._post_data_from_post(post, norm_func) + for post in txn.postings} + actual = {self._post_data_from_row(testutil.ODSCell.from_row(row)) + for row in itertools.islice(rows, len(txn.postings) - 1)} + actual.add(self._post_data_from_row(row1)) + assert actual == expected + for post_acct, _, number, currency in expected: + if post_acct == account: + period_bal += testutil.Amount(number, currency) + if expect_totals: + closing_row = testutil.ODSCell.from_row(next(rows)) + assert closing_row[0].value == end_date + empty = '$0.00' if period_bal else '0' + assert closing_row[5].text == period_bal.format(None, empty=empty, sep='\0') + def get_sheet_names(ods): return [sheet.getAttribute('name').replace(' ', ':') @@ -259,14 +328,16 @@ def test_plan_sheets_full_split_required(caplog): assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets'] assert not caplog.records -def build_report(ledger_entries, start_date, stop_date, *args, **kwargs): +def build_report(ledger_entries, start_date, stop_date, *args, + report_class=ledger.LedgerODS, **kwargs): postings = list(data.Posting.from_entries(iter(ledger_entries))) with clean_account_meta(): data.Account.load_openings_and_closings(iter(ledger_entries)) - report = ledger.LedgerODS(start_date, stop_date, *args, **kwargs) + report = report_class(start_date, stop_date, *args, **kwargs) report.write(iter(postings)) return postings, report +@pytest.mark.parametrize('report_kwargs', iter(REPORT_KWARGS)) @pytest.mark.parametrize('start_date,stop_date', [ (START_DATE, STOP_DATE), (START_DATE, MID_DATE), @@ -274,50 +345,77 @@ def build_report(ledger_entries, start_date, stop_date, *args, **kwargs): (START_DATE.replace(month=6), START_DATE.replace(month=12)), (STOP_DATE, STOP_DATE.replace(month=12)), ]) -def test_date_range_report(ledger_entries, start_date, stop_date): - postings, report = build_report(ledger_entries, start_date, stop_date) +def test_date_range_report(ledger_entries, start_date, stop_date, report_kwargs): + txn_filter = report_kwargs.get('txn_filter') + postings, report = build_report(ledger_entries, start_date, stop_date, **report_kwargs) expected = dict(ExpectedPostings.group_by_account(postings)) for account in iter_accounts(ledger_entries): try: related = expected[account] except KeyError: - ExpectedPostings.check_in_report(report.document, account, start_date, stop_date) + ExpectedPostings.check_in_report( + report.document, account, start_date, stop_date, txn_filter, + ) else: - related.check_report(report.document, start_date, stop_date) + if txn_filter is None: + related.check_report(report.document, start_date, stop_date) + else: + related.check_txn_report( + report.document, txn_filter, start_date, stop_date, + ) +@pytest.mark.parametrize('report_kwargs', iter(REPORT_KWARGS)) @pytest.mark.parametrize('tot_accts', [ (), ('Assets', 'Liabilities'), ('Income', 'Expenses'), ('Assets', 'Liabilities', 'Income', 'Expenses'), ]) -def test_report_filter_totals(ledger_entries, tot_accts): +def test_report_filter_totals(ledger_entries, tot_accts, report_kwargs): + txn_filter = report_kwargs.get('txn_filter') postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, totals_with_entries=tot_accts, - totals_without_entries=tot_accts) + totals_without_entries=tot_accts, + **report_kwargs) expected = dict(ExpectedPostings.group_by_account(postings)) for account in iter_accounts(ledger_entries): expect_totals = account.startswith(tot_accts) if account in expected and expected[account][-1].meta.date >= START_DATE: - expected[account].check_report(report.document, START_DATE, STOP_DATE, - expect_totals=expect_totals) + if txn_filter is None: + expected[account].check_report( + report.document, START_DATE, STOP_DATE, expect_totals=expect_totals, + ) + else: + expected[account].check_txn_report( + report.document, txn_filter, + START_DATE, STOP_DATE, expect_totals=expect_totals, + ) elif expect_totals: - ExpectedPostings.check_in_report(report.document, account) + ExpectedPostings.check_in_report( + report.document, account, START_DATE, STOP_DATE, txn_filter, + ) else: ExpectedPostings.check_not_in_report(report.document, account) +@pytest.mark.parametrize('report_kwargs', iter(REPORT_KWARGS)) @pytest.mark.parametrize('accounts', [ ('Income', 'Expenses'), ('Assets:Receivable', 'Liabilities:Payable'), ]) -def test_account_names_report(ledger_entries, accounts): - postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, accounts) +def test_account_names_report(ledger_entries, accounts, report_kwargs): + txn_filter = report_kwargs.get('txn_filter') + postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, + accounts, **report_kwargs) expected = dict(ExpectedPostings.group_by_account(postings)) for account in iter_accounts(ledger_entries): - if account.startswith(accounts): + if not account.startswith(accounts): + ExpectedPostings.check_not_in_report(report.document, account) + elif txn_filter is None: expected[account].check_report(report.document, START_DATE, STOP_DATE) else: - ExpectedPostings.check_not_in_report(report.document, account) + expected[account].check_txn_report( + report.document, txn_filter, START_DATE, STOP_DATE, + ) def run_main(arglist, config=None): if config is None: @@ -424,6 +522,30 @@ def test_main_project_report(ledger_entries, project, start_date, stop_date): except KeyError: ExpectedPostings.check_not_in_report(ods, account) +@pytest.mark.parametrize('flag', [ + '--disbursements', + '--receipts', +]) +def test_main_cash_report(ledger_entries, flag): + if flag == '--receipts': + txn_filter = ledger.TransactionFilter.CREDIT + else: + txn_filter = ledger.TransactionFilter.DEBIT + retcode, output, errors = run_main([ + flag, + '-b', START_DATE.isoformat(), + '-e', STOP_DATE.isoformat(), + ]) + assert not errors.getvalue() + assert retcode == 0 + ods = odf.opendocument.load(output) + postings = data.Posting.from_entries(ledger_entries) + for account, expected in ExpectedPostings.group_by_account(postings): + if account == 'Assets:Checking' or account == 'Assets:PayPal': + expected.check_txn_report(ods, txn_filter, START_DATE, STOP_DATE) + else: + expected.check_not_in_report(ods) + @pytest.mark.parametrize('arg', [ 'Assets:NoneSuchBank', 'Funny money',