diff --git a/conservancy_beancount/reports/ledger.py b/conservancy_beancount/reports/ledger.py
new file mode 100644
index 0000000..d9e3325
--- /dev/null
+++ b/conservancy_beancount/reports/ledger.py
@@ -0,0 +1,534 @@
+"""ledger.py - General ledger report from Beancount
+This tool produces a spreadsheet that shows postings in Beancount, organized
+by account.
+Specify the date range you want to report with the ``--begin`` and ``--end``
+Select the accounts you want to report with the ``--account`` option. You can
+specify this option multiple times. The report will include at least one sheet
+for each account you specify. Subaccounts will be reported on that sheet as
+Select the postings you want to report by passing metadata search terms in
+``name=value`` format.
+Run ``ledger-report --help`` for abbreviations and other options.
+Report all activity related to a given project::
+ ledger-report project=NAME
+Get all Assets postings for a given month to help with reconciliation::
+ ledger-report -a Assets -b 2018-05-01 -e 2018-06-01
+# Copyright © 2020 Brett Smith
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+import argparse
+import collections
+import datetime
+import enum
+import itertools
+import operator
+import logging
+import sys
+from typing import (
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ TextIO,
+ Tuple,
+from pathlib import Path
+import odf.table # type:ignore[import]
+from beancount.parser import printer as bc_printer
+from . import core
+from .. import books
+from .. import cliutil
+from .. import config as configmod
+from .. import data
+from .. import ranges
+from .. import rtutil
+PostTally = List[Tuple[int, data.Account]]
+PROGNAME = 'ledger-report'
+logger = logging.getLogger('conservancy_beancount.reports.ledger')
+class LedgerODS(core.BaseODS[data.Posting, data.Account]):
+ CORE_COLUMNS: Sequence[str] = [
+ 'Date',
+ data.Metadata.human_name('entity'),
+ 'Description',
+ 'Original Amount',
+ 'Booked Amount',
+ ]
+ ACCOUNT_COLUMNS: Dict[str, Sequence[str]] = collections.OrderedDict([
+ ('Income', ['project', 'rt-id', 'receipt', 'income-type']),
+ ('Expenses', ['project', 'rt-id', 'receipt', 'approval', 'expense-allocation']),
+ ('Equity', ['rt-id']),
+ ('Assets:Receivable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
+ ('Liabilities:Payable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
+ ('Assets:PayPal', ['rt-id', 'paypal-id', 'receipt', 'approval']),
+ ('Assets', ['rt-id', 'receipt', 'approval', 'bank-statement']),
+ ('Liabilities', ['rt-id', 'receipt', 'approval', 'bank-statement']),
+ ])
+ COLUMN_STYLES: Mapping[str, str] = {
+ 'Date': '',
+ 'Description': 'col_1_75',
+ data.Metadata.human_name('paypal-id'): 'col_1_5',
+ }
+ # Excel 2003 was limited to 65,536 rows per worksheet.
+ # While we can probably count on all our users supporting more modern
+ # formats (Excel 2007 supports over 1 million rows per worksheet),
+ # keeping the default limit conservative seems good to avoid running into
+ # other limits (like the number of hyperlinks per worksheet), plus just
+ # better for human organization and readability.
+ SHEET_SIZE = 65000
+ def __init__(self,
+ start_date: datetime.date,
+ stop_date: datetime.date,
+ sheet_names: Optional[Sequence[str]]=None,
+ rt_wrapper: Optional[rtutil.RT]=None,
+ sheet_size: Optional[int]=None,
+ ) -> None:
+ if sheet_names is None:
+ sheet_names = list(self.ACCOUNT_COLUMNS)
+ if sheet_size is None:
+ sheet_size = self.SHEET_SIZE
+ super().__init__(rt_wrapper)
+ self.date_range = ranges.DateRange(start_date, stop_date)
+ self.required_sheet_names = sheet_names
+ self.sheet_size = sheet_size
+ @classmethod
+ def _group_tally(
+ cls,
+ tally_by_account: PostTally,
+ key: Callable[[data.Account], Optional[str]],
+ ) -> Dict[str, PostTally]:
+ retval: Dict[str, PostTally] = collections.defaultdict(list)
+ for count, account in tally_by_account:
+ item_key = key(account)
+ if item_key is not None:
+ retval[item_key].append((count, account))
+ return retval
+ @classmethod
+ def _split_sheet(
+ cls,
+ tally_by_account: PostTally,
+ sheet_size: int,
+ sheet_name: str,
+ ) -> Iterator[str]:
+ total = 0
+ for index, (count, account) in enumerate(tally_by_account):
+ total += count
+ if total > sheet_size:
+ break
+ else:
+ # All the accounts fit in this sheet.
+ yield sheet_name
+ return
+ if index == 0 and len(tally_by_account) == 1:
+ # With one account, we can't split any further, so warn and stop.
+ logger.warning(
+ "%s has %s rows, over size %s",
+ account, f'{count:,g}', f'{sheet_size:,g}',
+ )
+ yield sheet_name
+ return
+ group_func = operator.methodcaller('root_part', sheet_name.count(':') + 2)
+ maybe_split = cls._group_tally(tally_by_account[:index], group_func)
+ must_split = cls._group_tally(tally_by_account[index:], group_func)
+ for subkey, must_split_tally in sorted(must_split.items()):
+ split_names = cls._split_sheet(
+ maybe_split.get(subkey, []) + must_split_tally, sheet_size, subkey,
+ )
+ # We must be willing to split out at least as many sheets as there
+ # are accounts that didn't fit. Do that first.
+ yield from itertools.islice(split_names, len(must_split_tally))
+ # After that, we can be in one of two cases:
+ # 1. There is no next sheet. All the accounts, including the
+ # maybe_splits and must_splits, fit on planned subsheets.
+ # Update state to note we don't need a sheet for them anymore.
+ # 2. The next sheet is named `subkey`, and is planned to include
+ # all of our maybe_split accounts. However, we don't need to
+ # yield that sheet name, because those accounts already fit in
+ # the sheet we're planning, and it would be a needless split.
+ next_sheet_name = next(split_names, None)
+ if next_sheet_name is None:
+ maybe_split.pop(subkey, None)
+ else:
+ assert next_sheet_name == subkey
+ assert not any(split_names)
+ if maybe_split:
+ yield sheet_name
+ @classmethod
+ def plan_sheets(
+ cls,
+ tally_by_account: Mapping[data.Account, int],
+ base_sheets: Sequence[str],
+ sheet_size: int,
+ ) -> Sequence[str]:
+ sorted_tally: PostTally = [
+ (count, account)
+ for account, count in tally_by_account.items()
+ ]
+ sorted_tally.sort()
+ split_tally = cls._group_tally(
+ sorted_tally,
+ operator.methodcaller('is_under', *base_sheets),
+ )
+ return [
+ sheet_name
+ for key in base_sheets
+ for sheet_name in cls._split_sheet(split_tally[key], sheet_size, key)
+ ]
+ @staticmethod
+ def _sort_and_filter_accounts(
+ accounts: Iterable[data.Account],
+ order: Sequence[str],
+ ) -> Iterator[Tuple[int, data.Account]]:
+ index_map = {s: ii for ii, s in enumerate(order)}
+ retval: Mapping[int, List[data.Account]] = collections.defaultdict(list)
+ for account in accounts:
+ acct_key = account.is_under(*order)
+ if acct_key is not None:
+ retval[index_map[acct_key]].append(account)
+ for key in sorted(retval):
+ acct_list = retval[key]
+ acct_list.sort()
+ for account in acct_list:
+ yield key, account
+ def section_key(self, row: data.Posting) -> data.Account:
+ return row.account
+ def start_sheet(self, sheet_name: str) -> None:
+ self.use_sheet(sheet_name.replace(':', ' '))
+ columns_key = data.Account(sheet_name).is_under(*self.ACCOUNT_COLUMNS)
+ # columns_key must not be None because ACCOUNT_COLUMNS has an entry
+ # for all five root accounts.
+ assert columns_key is not None
+ self.metadata_columns = self.ACCOUNT_COLUMNS[columns_key]
+ self.sheet_columns: Sequence[str] = [
+ *(data.Metadata.human_name(meta_key) for meta_key in self.metadata_columns),
+ ]
+ for col_name in self.sheet_columns:
+ self.sheet.addElement(odf.table.TableColumn(
+ stylename=self.COLUMN_STYLES.get(col_name, 'col_1_25'),
+ ))
+ self.add_row(*(
+ self.string_cell(col_name, stylename=self.style_bold)
+ for col_name in self.sheet_columns
+ ))
+ self.lock_first_row()
+ def _report_section_balance(self, key: data.Account, date_key: str) -> None:
+ uses_opening = key.is_under('Assets', 'Equity', 'Liabilities')
+ if date_key == 'start':
+ if not uses_opening:
+ return
+ date = self.date_range.start
+ description = "Opening Balance"
+ else:
+ date = self.date_range.stop
+ description = "Ending Balance" if uses_opening else "Period Total"
+ balance = self.norm_func(
+ self.account_groups[key].balance_at_cost_by_date(date)
+ )
+ self.add_row(
+ self.date_cell(date, stylename=self.merge_styles(
+ self.style_bold, self.style_date,
+ )),
+ odf.table.TableCell(),
+ self.string_cell(description, stylename=self.style_bold),
+ odf.table.TableCell(),
+ self.balance_cell(balance, stylename=self.style_bold),
+ )
+ def start_section(self, key: data.Account) -> None:
+ self.add_row()
+ self.add_row(
+ odf.table.TableCell(),
+ self.string_cell(
+ f"{key} Ledger"
+ f" From {self.date_range.start.isoformat()}"
+ f" To {self.date_range.stop.isoformat()}",
+ stylename=self.style_bold,
+ numbercolumnsspanned=len(self.sheet_columns) - 1,
+ ),
+ )
+ self.norm_func = core.normalize_amount_func(key)
+ self._report_section_balance(key, 'start')
+ def end_section(self, key: data.Account) -> None:
+ self._report_section_balance(key, 'stop')
+ def write_row(self, row: data.Posting) -> None:
+ if row.meta.date not in self.date_range:
+ return
+ elif row.cost is None:
+ amount_cell = odf.table.TableCell()
+ else:
+ amount_cell = self.currency_cell(self.norm_func(row.units))
+ self.add_row(
+ self.date_cell(row.meta.date),
+ self.string_cell(row.meta.get('entity') or ''),
+ self.string_cell(row.meta.txn.narration),
+ amount_cell,
+ self.currency_cell(self.norm_func(row.at_cost())),
+ *(self.meta_links_cell(row.meta.report_links(key))
+ if key in data.LINK_METADATA
+ else self.string_cell(row.meta.get(key, ''))
+ for key in self.metadata_columns),
+ )
+ def _combined_balance_row(self,
+ date: datetime.date,
+ balance_accounts: Sequence[str],
+ ) -> None:
+ balance = -sum((
+ related.balance_at_cost_by_date(date)
+ for account, related in self.account_groups.items()
+ if account.is_under(*balance_accounts)
+ ), core.MutableBalance())
+ self.add_row(
+ self.string_cell(
+ f"Balance as of {date.isoformat()}",
+ stylename=self.merge_styles(self.style_bold, self.style_endtext),
+ ),
+ self.balance_cell(balance, stylename=self.style_bold),
+ )
+ def write_balance_sheet(self) -> None:
+ balance_accounts = ['Equity', 'Income', 'Expenses']
+ # FIXME: This is a hack to exclude non-project Equity accounts from
+ # project reports.
+ if balance_accounts[0] not in self.required_sheet_names:
+ balance_accounts[0] = 'Equity:Funds'
+ self.use_sheet("Balance")
+ column_style = self.replace_child(
+ self.document.automaticstyles, odf.style.Style, name='col_3',
+ )
+ column_style.setAttribute('family', 'table-column')
+ column_style.addElement(odf.style.TableColumnProperties(columnwidth='3in'))
+ for _ in range(2):
+ self.sheet.addElement(odf.table.TableColumn(stylename=column_style))
+ self.add_row(
+ self.string_cell("Account", stylename=self.style_bold),
+ self.string_cell("Balance", stylename=self.style_bold),
+ )
+ self.lock_first_row()
+ self.add_row()
+ self.add_row(self.string_cell(
+ f"Ledger From {self.date_range.start.isoformat()}"
+ f" To {self.date_range.stop.isoformat()}",
+ stylename=self.merge_styles(self.style_centertext, self.style_bold),
+ numbercolumnsspanned=2,
+ ))
+ self.add_row()
+ self._combined_balance_row(self.date_range.start, balance_accounts)
+ for _, account in self._sort_and_filter_accounts(
+ self.account_groups, balance_accounts,
+ ):
+ related = self.account_groups[account]
+ # start_bal - stop_bal == -(stop_bal - start_bal)
+ balance = related.balance_at_cost_by_date(self.date_range.start)
+ balance -= related.balance_at_cost_by_date(self.date_range.stop)
+ if not balance.is_zero():
+ self.add_row(
+ self.string_cell(account, stylename=self.style_endtext),
+ self.balance_cell(balance),
+ )
+ self._combined_balance_row(self.date_range.stop, balance_accounts)
+ def write(self, rows: Iterable[data.Posting]) -> None:
+ self.account_groups = dict(core.RelatedPostings.group_by_account(rows))
+ self.write_balance_sheet()
+ tally_by_account_iter = (
+ (account, sum(1 for post in related if post.meta.date in self.date_range))
+ for account, related in self.account_groups.items()
+ )
+ tally_by_account = {
+ account: count
+ for account, count in tally_by_account_iter
+ if count
+ }
+ sheet_names = self.plan_sheets(
+ tally_by_account, self.required_sheet_names, self.sheet_size,
+ )
+ using_sheet_index = -1
+ for sheet_index, account in self._sort_and_filter_accounts(
+ tally_by_account, sheet_names,
+ ):
+ while using_sheet_index < sheet_index:
+ using_sheet_index += 1
+ self.start_sheet(sheet_names[using_sheet_index])
+ super().write(self.account_groups[account])
+ for index in range(using_sheet_index + 1, len(sheet_names)):
+ self.start_sheet(sheet_names[index])
+class ReturnFlag(enum.IntFlag):
+def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
+ parser = argparse.ArgumentParser(prog=PROGNAME)
+ cliutil.add_version_argument(parser)
+ parser.add_argument(
+ '--begin', '--start', '-b',
+ dest='start_date',
+ metavar='DATE',
+ type=cliutil.date_arg,
+ help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
+The default is the beginning of the last full fiscal year.
+ parser.add_argument(
+ '--end', '--stop', '-e',
+ dest='stop_date',
+ metavar='DATE',
+ type=cliutil.date_arg,
+ help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
+The default is the end of the begin date's fiscal year.
+ parser.add_argument(
+ '--account', '-a',
+ dest='sheet_names',
+ metavar='ACCOUNT',
+ action='append',
+ help="""Show this account in the report. You can specify this option
+multiple times. If not specified, the default set adapts to your search
+ parser.add_argument(
+ '--sheet-size', '--size',
+ metavar='SIZE',
+ type=int,
+ default=LedgerODS.SHEET_SIZE,
+ help="""Try to limit sheets to this many rows. The report will
+automatically create new sheets to make this happen. When that's not possible,
+it will issue a warning.
+ parser.add_argument(
+ '--output-file', '-O',
+ metavar='PATH',
+ type=Path,
+ help="""Write the report to this file, or stdout when PATH is `-`.
+The default is stdout for the balance and outgoing reports, and a generated
+filename for other reports.
+ cliutil.add_loglevel_argument(parser)
+ parser.add_argument(
+ 'search_terms',
+ metavar='FILTER',
+ type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
+ nargs=argparse.ZERO_OR_MORE,
+ help="""Report on postings that match this criteria. The format is
+NAME=TERM. TERM is a link or word that must exist in a posting's NAME
+metadata to match. A single ticket number is a shortcut for
+`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
+ args = parser.parse_args(arglist)
+ if args.sheet_names is None:
+ if any(term.meta_key == 'project' for term in args.search_terms):
+ args.sheet_names = ['Income', 'Expenses', 'Assets:Receivable', 'Liabilities:Payable']
+ else:
+ args.sheet_names = list(LedgerODS.ACCOUNT_COLUMNS)
+ return args
+def main(arglist: Optional[Sequence[str]]=None,
+ stdout: TextIO=sys.stdout,
+ stderr: TextIO=sys.stderr,
+ config: Optional[configmod.Config]=None,
+) -> int:
+ args = parse_arguments(arglist)
+ cliutil.set_loglevel(logger, args.loglevel)
+ if config is None:
+ config = configmod.Config()
+ config.load_file()
+ fy = config.fiscal_year_begin()
+ if args.start_date is None:
+ args.start_date = fy.first_date(fy.for_date() - 1)
+ if args.stop_date is None:
+ args.stop_date = fy.next_fy_date(args.start_date)
+ returncode = 0
+ books_loader = config.books_loader()
+ if books_loader is None:
+ entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
+ else:
+ entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
+ for error in load_errors:
+ bc_printer.print_error(error, file=stderr)
+ returncode |= ReturnFlag.LOAD_ERRORS
+ postings = data.Posting.from_entries(entries)
+ for search_term in args.search_terms:
+ postings = search_term.filter_postings(postings)
+ rt_wrapper = config.rt_wrapper()
+ if rt_wrapper is None:
+ logger.warning("could not initialize RT client; spreadsheet links will be broken")
+ report = LedgerODS(
+ args.start_date,
+ args.stop_date,
+ args.sheet_names,
+ rt_wrapper,
+ args.sheet_size,
+ )
+ report.write(postings)
+ if not report.account_groups:
+ logger.warning("no matching postings found to report")
+ returncode |= ReturnFlag.NOTHING_TO_REPORT
+ if args.output_file is None:
+ out_dir_path = config.repository_path() or Path()
+ args.output_file = out_dir_path / 'LedgerReport_{}_{}.ods'.format(
+ args.start_date.isoformat(), args.stop_date.isoformat(),
+ )
+ logger.info("Writing report to %s", args.output_file)
+ ods_file = cliutil.bytes_output(args.output_file, stdout)
+ report.save_file(ods_file)
+ return 0 if returncode == 0 else 16 + returncode
+entry_point = cliutil.make_entry_point(__name__, PROGNAME)
+if __name__ == '__main__':
+ exit(entry_point())
diff --git a/setup.py b/setup.py
index 1a8b240..7627870 100755
--- a/setup.py
+++ b/setup.py
@@ -5,7 +5,7 @@ from setuptools import setup
description="Plugin, library, and reports for reading Conservancy's books",
- version='1.1.13',
+ version='1.2.0',
author='Software Freedom Conservancy',
license='GNU AGPLv3+',
@@ -36,6 +36,7 @@ setup(
'console_scripts': [
'accrual-report = conservancy_beancount.reports.accrual:entry_point',
+ 'ledger-report = conservancy_beancount.reports.ledger:entry_point',
diff --git a/tests/books/ledger.beancount b/tests/books/ledger.beancount
new file mode 100644
index 0000000..a941379
--- /dev/null
+++ b/tests/books/ledger.beancount
@@ -0,0 +1,40 @@
+2018-01-01 open Equity:OpeningBalance
+2018-01-01 open Assets:Checking
+2018-01-01 open Assets:Receivable:Accounts
+2018-01-01 open Expenses:Other
+2018-01-01 open Income:Other
+2018-01-01 open Liabilities:CreditCard
+2018-01-01 open Liabilities:Payable:Accounts
+2018-02-28 * "Opening balance"
+ Equity:OpeningBalance -10,000 USD
+ Assets:Checking 10,000 USD
+2018-06-06 * "Accrued expense"
+ project: "eighteen"
+ Liabilities:Payable:Accounts -60 USD
+ Expenses:Other 60 USD
+2018-09-09 * "Paid expense"
+ Liabilities:Payable:Accounts 60 USD
+ project: "eighteen"
+ Assets:Checking -60 USD
+2018-12-12 * "Accrued income"
+ project: "eighteen"
+ Assets:Receivable:Accounts 120 USD
+ Income:Other -120 USD
+2019-03-03 * "Paid income"
+ Assets:Receivable:Accounts -120 USD
+ project: "eighteen"
+ Assets:Checking 120 USD
+2019-06-06 * "Credit card expense"
+ Liabilities:CreditCard -65 USD
+ Expenses:Other 65 USD
+ project: "nineteen"
+2019-09-09 * "Credit card paid"
+ Liabilities:CreditCard 65 USD
+ Assets:Checking -65 USD
diff --git a/tests/test_reports_ledger.py b/tests/test_reports_ledger.py
new file mode 100644
index 0000000..00df81a
--- /dev/null
+++ b/tests/test_reports_ledger.py
@@ -0,0 +1,317 @@
+"""test_reports_ledger.py - Unit tests for general ledger report"""
+# Copyright © 2020 Brett Smith
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+import collections
+import copy
+import datetime
+import io
+import re
+import pytest
+from . import testutil
+import odf.table
+import odf.text
+from beancount.core import data as bc_data
+from beancount import loader as bc_loader
+from conservancy_beancount import data
+from conservancy_beancount.reports import core
+from conservancy_beancount.reports import ledger
+Acct = data.Account
+_ledger_load = bc_loader.load_file(testutil.test_path('books/ledger.beancount'))
+ 'Balance',
+ 'Income',
+ 'Expenses',
+ 'Equity',
+ 'Assets:Receivable',
+ 'Liabilities:Payable',
+ 'Assets:PayPal',
+ 'Assets',
+ 'Liabilities',
+ 'Balance',
+ 'Income',
+ 'Expenses',
+ 'Assets:Receivable',
+ 'Liabilities:Payable',
+OVERSIZE_RE = re.compile(
+ r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
+START_DATE = datetime.date(2018, 3, 1)
+MID_DATE = datetime.date(2019, 3, 1)
+STOP_DATE = datetime.date(2020, 3, 1)
+def ledger_entries():
+ return copy.deepcopy(_ledger_load[0])
+class NotFound(Exception): pass
+class NoSheet(NotFound): pass
+class NoHeader(NotFound): pass
+class ExpectedPostings(core.RelatedPostings):
+ def slice_date_range(self, start_date, end_date):
+ postings = enumerate(self)
+ for start_index, post in postings:
+ if start_date <= post.meta.date:
+ break
+ else:
+ start_index += 1
+ if end_date <= post.meta.date:
+ end_index = start_index
+ else:
+ for end_index, post in postings:
+ if end_date <= post.meta.date:
+ break
+ else:
+ end_index = None
+ return (self[:start_index].balance_at_cost(),
+ self[start_index:end_index])
+ def check_report(self, ods, start_date, end_date):
+ account = self[0].account
+ norm_func = core.normalize_amount_func(account)
+ open_bal, expect_posts = self.slice_date_range(start_date, end_date)
+ open_bal = norm_func(open_bal)
+ for sheet in ods.getElementsByType(odf.table.Table):
+ sheet_account = sheet.getAttribute('name').replace(' ', ':')
+ if sheet_account and account.is_under(sheet_account):
+ break
+ else:
+ raise NoSheet(account)
+ rows = iter(sheet.getElementsByType(odf.table.TableRow))
+ for row in rows:
+ cells = row.childNodes
+ if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
+ break
+ else:
+ if expect_posts:
+ raise NoHeader(account)
+ else:
+ return
+ if account.is_under('Assets', 'Equity', 'Liabilities'):
+ opening_row = testutil.ODSCell.from_row(next(rows))
+ assert opening_row[0].value == start_date
+ assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
+ for expected in expect_posts:
+ cells = iter(testutil.ODSCell.from_row(next(rows)))
+ assert next(cells).value == expected.meta.date
+ assert next(cells).text == (expected.meta.get('entity') or '')
+ assert next(cells).text == (expected.meta.txn.narration or '')
+ if expected.cost is None:
+ assert not next(cells).text
+ assert next(cells).value == norm_func(expected.units.number)
+ else:
+ assert next(cells).value == norm_func(expected.units.number)
+ assert next(cells).value == norm_func(expected.at_cost().number)
+ closing_row = testutil.ODSCell.from_row(next(rows))
+ closing_bal = open_bal + norm_func(expect_posts.balance_at_cost())
+ assert closing_row[0].value == end_date
+ assert closing_row[4].text == closing_bal.format(None, empty='0', sep='\0')
+def get_sheet_names(ods):
+ return [sheet.getAttribute('name').replace(' ', ':')
+ for sheet in ods.getElementsByType(odf.table.Table)]
+def check_oversize_logs(caplog, accounts, sheet_size):
+ actual = {}
+ for log in caplog.records:
+ match = OVERSIZE_RE.match(log.message)
+ if match:
+ assert int(match.group(3).replace(',', '')) == sheet_size
+ actual[match.group(1)] = int(match.group(2).replace(',', ''))
+ expected = {name: size for name, size in accounts.items() if size > sheet_size}
+ assert actual == expected
+def test_plan_sheets_no_change():
+ have = {
+ Acct('Assets:Cash'): 10,
+ Acct('Income:Donations'): 20,
+ }
+ want = ['Assets', 'Income']
+ actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
+ assert actual == want
+@pytest.mark.parametrize('have', [
+ {},
+ {Acct('Income:Other'): 10},
+ {Acct('Assets:Checking'): 20, Acct('Expenses:Other'): 15},
+def test_plan_sheets_includes_accounts_without_transactions(have):
+ want = ['Assets', 'Income', 'Expenses']
+ actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
+ assert actual == want
+def test_plan_sheets_single_split():
+ have = {
+ Acct('Assets:Cash'): 60,
+ Acct('Assets:Checking'): 80,
+ Acct('Income:Donations'): 50,
+ Acct('Expenses:Travel'): 90,
+ Acct('Expenses:FilingFees'): 25,
+ }
+ want = ['Assets', 'Income', 'Expenses']
+ actual = ledger.LedgerODS.plan_sheets(have, want, 100)
+ assert actual == [
+ 'Assets:Checking',
+ 'Assets',
+ 'Income',
+ 'Expenses:Travel',
+ 'Expenses',
+ ]
+def test_plan_sheets_split_subtree():
+ have = {
+ Acct('Assets:Bank1:Checking'): 80,
+ Acct('Assets:Bank1:Savings'): 10,
+ Acct('Assets:Cash:USD'): 20,
+ Acct('Assets:Cash:EUR'): 15,
+ }
+ actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
+ assert actual == ['Assets:Bank1', 'Assets']
+def test_plan_sheets_ambiguous_split():
+ have = {
+ Acct('Assets:Bank1:Checking'): 80,
+ Acct('Assets:Bank1:Savings'): 40,
+ Acct('Assets:Receivable:Accounts'): 40,
+ Acct('Assets:Cash'): 10,
+ }
+ actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
+ # :Savings cannot fit with :Checking, so it's important that the return
+ # value disambiguate that.
+ assert actual == ['Assets:Bank1:Checking', 'Assets']
+def test_plan_sheets_oversize(caplog):
+ have = {
+ Acct('Assets:Checking'): 150,
+ Acct('Assets:Cash'): 50,
+ }
+ actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
+ assert actual == ['Assets:Checking', 'Assets']
+ check_oversize_logs(caplog, have, 100)
+def test_plan_sheets_all_oversize(caplog):
+ have = {
+ Acct('Assets:Checking'): 150,
+ Acct('Assets:Cash'): 150,
+ }
+ actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
+ # In this case, each account should appear in alphabetical order.
+ assert actual == ['Assets:Cash', 'Assets:Checking']
+ check_oversize_logs(caplog, have, 100)
+def test_plan_sheets_full_split_required(caplog):
+ have = {
+ Acct('Assets:Bank:Savings'): 98,
+ Acct('Assets:Bank:Checking'): 96,
+ Acct('Assets:Bank:Investment'): 94,
+ }
+ actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
+ assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
+ assert not caplog.records
+@pytest.mark.parametrize('start_date,stop_date', [
+ (START_DATE.replace(month=6), START_DATE.replace(month=12)),
+ (STOP_DATE, STOP_DATE.replace(month=12)),
+def test_date_range_report(ledger_entries, start_date, stop_date):
+ postings = list(data.Posting.from_entries(ledger_entries))
+ report = ledger.LedgerODS(start_date, stop_date)
+ report.write(iter(postings))
+ for _, expected in ExpectedPostings.group_by_account(postings):
+ expected.check_report(report.document, start_date, stop_date)
+@pytest.mark.parametrize('sheet_names', [
+ ('Income', 'Expenses'),
+ ('Assets:Receivable', 'Liabilities:Payable'),
+def test_account_names_report(ledger_entries, sheet_names):
+ postings = list(data.Posting.from_entries(ledger_entries))
+ report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names)
+ report.write(iter(postings))
+ for key, expected in ExpectedPostings.group_by_account(postings):
+ should_find = key.startswith(sheet_names)
+ try:
+ expected.check_report(report.document, START_DATE, STOP_DATE)
+ except NotFound:
+ assert not should_find
+ else:
+ assert should_find
+def run_main(arglist, config=None):
+ if config is None:
+ config = testutil.TestConfig(
+ books_path=testutil.test_path('books/ledger.beancount'),
+ rt_client=testutil.RTClient(),
+ )
+ arglist.insert(0, '--output-file=-')
+ output = io.BytesIO()
+ errors = io.StringIO()
+ retcode = ledger.main(arglist, output, errors, config)
+ output.seek(0)
+ return retcode, output, errors
+def test_main(ledger_entries):
+ retcode, output, errors = run_main([
+ '-b', START_DATE.isoformat(),
+ '-e', STOP_DATE.isoformat(),
+ ])
+ assert not errors.getvalue()
+ assert retcode == 0
+ ods = odf.opendocument.load(output)
+ assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
+ postings = data.Posting.from_entries(ledger_entries)
+ for _, expected in ExpectedPostings.group_by_account(postings):
+ expected.check_report(ods, START_DATE, STOP_DATE)
+@pytest.mark.parametrize('project,start_date,stop_date', [
+ ('eighteen', START_DATE, MID_DATE.replace(day=30)),
+ ('nineteen', MID_DATE, STOP_DATE),
+def test_main_project_report(ledger_entries, project, start_date, stop_date):
+ postings = data.Posting.from_entries(ledger_entries)
+ for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
+ if key == project:
+ break
+ assert key == project
+ retcode, output, errors = run_main([
+ f'--begin={start_date.isoformat()}',
+ f'--end={stop_date.isoformat()}',
+ project,
+ ])
+ assert not errors.getvalue()
+ assert retcode == 0
+ ods = odf.opendocument.load(output)
+ assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
+ for _, expected in ExpectedPostings.group_by_account(related):
+ expected.check_report(ods, start_date, stop_date)
+def test_main_no_postings(caplog):
+ retcode, output, errors = run_main(['NonexistentProject'])
+ assert retcode == 24
+ assert any(log.levelname == 'WARNING' for log in caplog.records)
diff --git a/tests/testutil.py b/tests/testutil.py
index 110369b..33f2150 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -237,6 +237,7 @@ class TestBooksLoader(books.Loader):
class TestConfig:
def __init__(self, *,
+ fiscal_year=(3, 1),
@@ -245,6 +246,7 @@ class TestConfig:
self._books_loader = None
self._books_loader = TestBooksLoader(books_path)
+ self.fiscal_year = fiscal_year
self._payment_threshold = Decimal(payment_threshold)
self.repo_path = test_path(repo_path)
self._rt_client = rt_client
@@ -259,6 +261,9 @@ class TestConfig:
def config_file_path(self):
return test_path('userconfig/conservancy_beancount/config.ini')
+ def fiscal_year_begin(self):
+ return books.FiscalYear(*self.fiscal_year)
def payment_threshold(self):
return self._payment_threshold