diff --git a/conservancy_beancount/reports/balance_sheet.py b/conservancy_beancount/reports/balance_sheet.py
new file mode 100644
index 0000000..2a21406
--- /dev/null
+++ b/conservancy_beancount/reports/balance_sheet.py
@@ -0,0 +1,425 @@
+"""balance_sheet.py - Balance sheet report"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import argparse
+import collections
+import datetime
+import enum
+import logging
+import os
+import sys
+
+from pathlib import Path
+
+from typing import (
+ Hashable,
+ Iterable,
+ Iterator,
+ Mapping,
+ NamedTuple,
+ Optional,
+ Sequence,
+ TextIO,
+ Tuple,
+)
+
+import odf.table # type:ignore[import]
+
+from beancount.parser import printer as bc_printer
+
+from . import core
+from .. import books
+from .. import cliutil
+from .. import config as configmod
+from .. import data
+from .. import ranges
+
+PROGNAME = 'balance-sheet-report'
+logger = logging.getLogger('conservancy_beancount.tools.balance_sheet')
+
+class Fund(enum.Enum):
+ RESTRICTED = enum.auto()
+ UNRESTRICTED = enum.auto()
+
+
+class Period(enum.Enum):
+ OPENING = enum.auto()
+ PERIOD = enum.auto()
+
+
+class BalanceKey(NamedTuple):
+ account: data.Account
+ classification: data.Account
+ period: Period
+ fund: Fund
+ post_type: Optional[str]
+
+
+class Balances:
+ POST_TYPES = {
+ 'Income': 'income-type',
+ 'Expenses': 'expense-type',
+ }
+
+ def __init__(self,
+ postings: Iterable[data.Posting],
+ start_date: datetime.date,
+ stop_date: datetime.date,
+ fund_key: str='project',
+ unrestricted_fund_value: str='Conservancy',
+ ) -> None:
+ self.opening_range = ranges.DateRange(
+ cliutil.diff_year(start_date, -1),
+ cliutil.diff_year(stop_date, -1),
+ )
+ assert self.opening_range.stop <= start_date
+ self.period_range = ranges.DateRange(start_date, stop_date)
+ self.balances: Mapping[BalanceKey, core.MutableBalance] \
+ = collections.defaultdict(core.MutableBalance)
+ for post in postings:
+ post_date = post.meta.date
+ if post_date in self.period_range:
+ period = Period.PERIOD
+ elif post_date < self.period_range.start:
+ period = Period.OPENING
+ else:
+ continue
+ if post.meta.get(fund_key) == unrestricted_fund_value:
+ fund = Fund.UNRESTRICTED
+ else:
+ fund = Fund.RESTRICTED
+ try:
+ classification_s = post.account.meta['classification']
+ if isinstance(classification_s, str):
+ classification = data.Account(classification_s)
+ else:
+ raise TypeError()
+ except (KeyError, TypeError):
+ classification = post.account
+ try:
+ post_type = post.meta[self.POST_TYPES[post.account.root_part()]]
+ except KeyError:
+ post_type = None
+ key = BalanceKey(post.account, classification, period, fund, post_type)
+ self.balances[key] += post.at_cost()
+
+ def total(self,
+ account: Optional[str]=None,
+ classification: Optional[str]=None,
+ period: Optional[Period]=None,
+ fund: Optional[Fund]=None,
+ post_type: Optional[str]=None,
+ ) -> core.Balance:
+ retval = core.MutableBalance()
+ for key, balance in self.balances.items():
+ if not (account is None or key.account.is_under(account)):
+ pass
+ elif not (classification is None
+ or key.classification.is_under(classification)):
+ pass
+ elif not (period is None or period is key.period):
+ pass
+ elif not (fund is None or fund is key.fund):
+ pass
+ elif not (post_type is None or post_type == key.post_type):
+ pass
+ else:
+ retval += balance
+ return retval
+
+ def classifications(self, account: str) -> Sequence[data.Account]:
+ class_bals: Mapping[data.Account, core.MutableBalance] \
+ = collections.defaultdict(core.MutableBalance)
+ for key, balance in self.balances.items():
+ if key.account.is_under(account):
+ class_bals[key.classification] += balance
+ norm_func = core.normalize_amount_func(f'{account}:RootsOK')
+ def sortkey(acct: data.Account) -> Hashable:
+ prefix, _, _ = acct.rpartition(':')
+ balance = norm_func(class_bals[acct])
+ max_bal = max(amount.number for amount in balance.values())
+ return prefix, -max_bal
+ return sorted(class_bals, key=sortkey)
+
+
+class Report(core.BaseODS[Sequence[None], None]):
+ def __init__(self,
+ balances: Balances,
+ *,
+ date_fmt: str='%B %d, %Y',
+ ) -> None:
+ super().__init__()
+ self.balances = balances
+ one_day = datetime.timedelta(days=1)
+ date = balances.period_range.stop - one_day
+ self.period_name = date.strftime(date_fmt)
+ date = balances.opening_range.stop - one_day
+ self.opening_name = date.strftime(date_fmt)
+
+ def section_key(self, row: Sequence[None]) -> None:
+ raise NotImplementedError("balance_sheet.Report.section_key")
+
+ def init_styles(self) -> None:
+ super().init_styles()
+ self.style_header = self.merge_styles(self.style_bold, self.style_centertext)
+ self.style_huline = self.merge_styles(
+ self.style_header,
+ self.border_style(core.Border.BOTTOM, '1pt'),
+ )
+ self.style_subtotline = self.border_style(core.Border.TOP, '1pt')
+ self.style_totline = self.border_style(core.Border.TOP | core.Border.BOTTOM, '1pt')
+ self.style_bottomline = self.merge_styles(
+ self.style_subtotline,
+ self.border_style(core.Border.BOTTOM, '2pt', 'double'),
+ )
+
+ def write_all(self) -> None:
+ self.write_financial_position()
+
+ def walk_classifications(self, cseq: Iterable[data.Account]) \
+ -> Iterator[Tuple[str, Optional[data.Account]]]:
+ last_prefix: Sequence[str] = []
+ for classification in cseq:
+ parts = classification.split(':')
+ tail = parts.pop()
+ tabs = '\t' * len(parts)
+ if parts != last_prefix:
+ yield f'{tabs[1:]}{parts[-1]}', None
+ last_prefix = parts
+ yield f'{tabs}{tail}', classification
+
+ def walk_classifications_by_account(self, account: str) \
+ -> Iterator[Tuple[str, Optional[data.Account]]]:
+ return self.walk_classifications(self.balances.classifications(account))
+
+ def write_financial_position(self) -> None:
+ self.use_sheet("Financial Position")
+ for width in [3, 1.5, 1.5]:
+ col_style = self.column_style(width)
+ self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
+ self.add_row(
+ self.multiline_cell([
+ "DRAFT Statement of Financial Position",
+ self.period_name,
+ ], numbercolumnsspanned=3, stylename=self.style_header)
+ )
+ self.add_row()
+ self.add_row(
+ odf.table.TableCell(),
+ self.string_cell(self.period_name, stylename=self.style_huline),
+ self.string_cell(self.opening_name, stylename=self.style_huline),
+ )
+
+ prior_assets = core.MutableBalance()
+ period_assets = core.MutableBalance()
+ self.add_row(self.string_cell("Assets", stylename=self.style_bold))
+ self.add_row()
+ for text, classification in self.walk_classifications_by_account('Assets'):
+ text_cell = self.string_cell(text)
+ if classification is None:
+ self.add_row(text_cell)
+ else:
+ prior_bal = self.balances.total(
+ classification=classification, period=Period.OPENING,
+ )
+ period_bal = prior_bal + self.balances.total(
+ classification=classification, period=Period.PERIOD,
+ )
+ self.add_row(
+ text_cell,
+ self.balance_cell(period_bal),
+ self.balance_cell(prior_bal),
+ )
+ prior_assets += prior_bal
+ period_assets += period_bal
+ self.add_row()
+ self.add_row(
+ self.string_cell("Total Assets"),
+ self.balance_cell(period_assets, stylename=self.style_bottomline),
+ self.balance_cell(prior_assets, stylename=self.style_bottomline),
+ )
+ self.add_row()
+ self.add_row()
+
+ prior_liabilities = core.MutableBalance()
+ period_liabilities = core.MutableBalance()
+ self.add_row(self.string_cell("Liabilities and Net Assets",
+ stylename=self.style_bold))
+ self.add_row()
+ self.add_row(self.string_cell("Liabilities", stylename=self.style_bold))
+ self.add_row()
+ for text, classification in self.walk_classifications_by_account('Liabilities'):
+ text_cell = self.string_cell(text)
+ if classification is None:
+ self.add_row(text_cell)
+ else:
+ prior_bal = -self.balances.total(
+ classification=classification, period=Period.OPENING,
+ )
+ period_bal = prior_bal - self.balances.total(
+ classification=classification, period=Period.PERIOD,
+ )
+ self.add_row(
+ text_cell,
+ self.balance_cell(period_bal),
+ self.balance_cell(prior_bal),
+ )
+ prior_liabilities += prior_bal
+ period_liabilities += period_bal
+ self.add_row(
+ self.string_cell("Total Liabilities"),
+ self.balance_cell(period_liabilities, stylename=self.style_totline),
+ self.balance_cell(prior_liabilities, stylename=self.style_totline),
+ )
+ self.add_row()
+ self.add_row()
+
+ prior_net = core.MutableBalance()
+ period_net = core.MutableBalance()
+ self.add_row(self.string_cell("Net Assets", stylename=self.style_bold))
+ self.add_row()
+ accounts = ['Equity', 'Income', 'Expenses']
+ for fund in [Fund.UNRESTRICTED, Fund.RESTRICTED]:
+ preposition = "Without" if fund is Fund.UNRESTRICTED else "With"
+ prior_bal = -sum(
+ (self.balances.total(account=account, period=Period.OPENING, fund=fund)
+ for account in accounts), core.MutableBalance(),
+ )
+ period_bal = prior_bal - sum(
+ (self.balances.total(account=account, period=Period.PERIOD, fund=fund)
+ for account in accounts), core.MutableBalance(),
+ )
+ self.add_row(
+ self.string_cell(f"{preposition} donor restrictions"),
+ self.balance_cell(period_bal),
+ self.balance_cell(prior_bal),
+ )
+ prior_net += prior_bal
+ period_net += period_bal
+ self.add_row(
+ self.string_cell("Total Net Assets"),
+ self.balance_cell(period_net, stylename=self.style_subtotline),
+ self.balance_cell(prior_net, stylename=self.style_subtotline),
+ )
+ self.add_row()
+ self.add_row(
+ self.string_cell("Total Liabilities and Net Assets"),
+ self.balance_cell(period_liabilities + period_net,
+ stylename=self.style_bottomline),
+ self.balance_cell(prior_liabilities + prior_net,
+ stylename=self.style_bottomline),
+ )
+
+
+def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
+ parser = argparse.ArgumentParser(prog=PROGNAME)
+ cliutil.add_version_argument(parser)
+ parser.add_argument(
+ '--begin', '--start', '-b',
+ dest='start_date',
+ metavar='DATE',
+ type=cliutil.date_arg,
+ help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
+The default is one year ago.
+""")
+ parser.add_argument(
+ '--end', '--stop', '-e',
+ dest='stop_date',
+ metavar='DATE',
+ type=cliutil.date_arg,
+ help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
+The default is a year after the start date.
+""")
+ parser.add_argument(
+ '--fund-metadata-key', '-m',
+ metavar='KEY',
+ default='project',
+ help="""Name of the fund metadata key. Default %(default)s.
+""")
+ parser.add_argument(
+ '--unrestricted-fund', '-u',
+ metavar='PROJECT',
+ default='Conservancy',
+ help="""Name of the unrestricted fund. Default %(default)s.
+""")
+ parser.add_argument(
+ '--output-file', '-O',
+ metavar='PATH',
+ type=Path,
+ help="""Write the report to this file, or stdout when PATH is `-`.
+""")
+ cliutil.add_loglevel_argument(parser)
+ return parser.parse_args(arglist)
+
+def main(arglist: Optional[Sequence[str]]=None,
+ stdout: TextIO=sys.stdout,
+ stderr: TextIO=sys.stderr,
+ config: Optional[configmod.Config]=None,
+) -> int:
+ args = parse_arguments(arglist)
+ cliutil.set_loglevel(logger, args.loglevel)
+ if config is None:
+ config = configmod.Config()
+ config.load_file()
+
+ if args.stop_date is None:
+ if args.start_date is None:
+ args.stop_date = datetime.date.today()
+ else:
+ args.stop_date = cliutil.diff_year(args.start_date, 1)
+ if args.start_date is None:
+ args.start_date = cliutil.diff_year(args.stop_date, -1)
+
+ returncode = 0
+ books_loader = config.books_loader()
+ if books_loader is None:
+ entries, load_errors, options_map = books.Loader.load_none(config.config_file_path())
+ returncode = cliutil.ExitCode.NoConfiguration
+ else:
+ start_fy = config.fiscal_year_begin().for_date(args.start_date) - 1
+ entries, load_errors, options_map = books_loader.load_fy_range(start_fy, args.stop_date)
+ if load_errors:
+ returncode = cliutil.ExitCode.BeancountErrors
+ elif not entries:
+ returncode = cliutil.ExitCode.NoDataLoaded
+ for error in load_errors:
+ bc_printer.print_error(error, file=stderr)
+
+ data.Account.load_from_books(entries, options_map)
+ balances = Balances(
+ data.Posting.from_entries(entries),
+ args.start_date,
+ args.stop_date,
+ args.fund_metadata_key,
+ args.unrestricted_fund,
+ )
+ report = Report(balances)
+ report.set_common_properties(config.books_repo())
+ report.write_all()
+ if args.output_file is None:
+ out_dir_path = config.repository_path() or Path()
+ args.output_file = out_dir_path / 'BalanceSheet_{}_{}.ods'.format(
+ args.start_date.isoformat(), args.stop_date.isoformat(),
+ )
+ logger.info("Writing report to %s", args.output_file)
+ ods_file = cliutil.bytes_output(args.output_file, stdout)
+ report.save_file(ods_file)
+ return returncode
+
+entry_point = cliutil.make_entry_point(__name__, PROGNAME)
+
+if __name__ == '__main__':
+ exit(entry_point())
diff --git a/setup.py b/setup.py
index de8c6d6..50ae439 100755
--- a/setup.py
+++ b/setup.py
@@ -5,7 +5,7 @@ from setuptools import setup
setup(
name='conservancy_beancount',
description="Plugin, library, and reports for reading Conservancy's books",
- version='1.7.1',
+ version='1.7.2',
author='Software Freedom Conservancy',
author_email='info@sfconservancy.org',
license='GNU AGPLv3+',
@@ -37,6 +37,7 @@ setup(
entry_points={
'console_scripts': [
'accrual-report = conservancy_beancount.reports.accrual:entry_point',
+ 'balance-sheet-report = conservancy_beancount.reports.balance_sheet:entry_point',
'extract-odf-links = conservancy_beancount.tools.extract_odf_links:entry_point',
'fund-report = conservancy_beancount.reports.fund:entry_point',
'ledger-report = conservancy_beancount.reports.ledger:entry_point',
diff --git a/tests/test_reports_balance_sheet.py b/tests/test_reports_balance_sheet.py
new file mode 100644
index 0000000..5239a76
--- /dev/null
+++ b/tests/test_reports_balance_sheet.py
@@ -0,0 +1,122 @@
+"""test_reports_balance_sheet.py - Unit tests for balance sheet report"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import datetime
+import io
+import itertools
+
+from decimal import Decimal
+
+import pytest
+
+from . import testutil
+
+import odf.opendocument
+
+from beancount.core.data import Open
+
+from conservancy_beancount import data
+from conservancy_beancount.reports import balance_sheet
+
+Fund = balance_sheet.Fund
+Period = balance_sheet.Period
+
+clean_account_meta = pytest.fixture(scope='module')(testutil.clean_account_meta)
+
+@pytest.fixture(scope='module')
+def income_expense_balances():
+ txns = []
+ prior_date = datetime.date(2019, 2, 2)
+ period_date = datetime.date(2019, 4, 4)
+ for (acct, post_type), fund in itertools.product([
+ ('Income:Donations', 'Donations'),
+ ('Income:Sales', 'RBI'),
+ ('Expenses:Postage', 'fundraising'),
+ ('Expenses:Postage', 'management'),
+ ('Expenses:Postage', 'program'),
+ ('Expenses:Services', 'fundraising'),
+ ('Expenses:Services', 'program'),
+ ], ['Conservancy', 'Alpha']):
+ root_acct, _, classification = acct.partition(':')
+ try:
+ data.Account(acct).meta
+ except KeyError:
+ data.Account.load_opening(Open(
+ {'classification': classification},
+ datetime.date(2000, 1, 1),
+ acct, None, None,
+ ))
+ meta = {
+ 'project': fund,
+ f'{root_acct.lower().rstrip("s")}-type': post_type,
+ }
+ sign = '' if root_acct == 'Expenses' else '-'
+ txns.append(testutil.Transaction(date=prior_date, postings=[
+ (acct, f'{sign}2.40', meta),
+ ]))
+ txns.append(testutil.Transaction(date=period_date, postings=[
+ (acct, f'{sign}2.60', meta),
+ ]))
+ return balance_sheet.Balances(
+ data.Posting.from_entries(txns),
+ datetime.date(2019, 3, 1),
+ datetime.date(2020, 3, 1),
+ )
+
+@pytest.mark.parametrize('kwargs,expected', [
+ ({'account': 'Income:Donations'}, -10),
+ ({'account': 'Income'}, -20),
+ ({'account': 'Income:Nonexistent'}, None),
+ ({'classification': 'Postage'}, 30),
+ ({'classification': 'Services'}, 20),
+ ({'classification': 'Nonexistent'}, None),
+ ({'period': Period.OPENING, 'account': 'Income'}, '-9.60'),
+ ({'period': Period.PERIOD, 'account': 'Expenses'}, 26),
+ ({'fund': Fund.RESTRICTED, 'account': 'Income'}, -10),
+ ({'fund': Fund.UNRESTRICTED, 'account': 'Expenses'}, 25),
+ ({'post_type': 'Donations'}, -10),
+ ({'post_type': 'fundraising'}, 20),
+ ({'post_type': 'management'}, 10),
+ ({'post_type': 'Nonexistent'}, None),
+ ({'period': Period.OPENING, 'post_type': 'RBI'}, '-4.80'),
+ ({'fund': Fund.RESTRICTED, 'post_type': 'program'}, 10),
+ ({'period': Period.PERIOD, 'fund': Fund.UNRESTRICTED, 'post_type': 'RBI'}, '-2.60'),
+ ({'period': Period.OPENING, 'fund': Fund.RESTRICTED, 'post_type': 'program'}, '4.80'),
+ ({'period': Period.PERIOD, 'fund': Fund.RESTRICTED, 'post_type': 'ø'}, None),
+])
+def test_balance_total(income_expense_balances, kwargs, expected):
+ actual = income_expense_balances.total(**kwargs)
+ if expected is None:
+ assert not actual
+ else:
+ assert actual == {'USD': testutil.Amount(expected)}
+
+def run_main(arglist=[], config=None):
+ if config is None:
+ config = testutil.TestConfig(books_path=testutil.test_path('books/fund.beancount'))
+ stdout = io.BytesIO()
+ stderr = io.StringIO()
+ retcode = balance_sheet.main(['-O', '-'] + arglist, stdout, stderr, config)
+ stdout.seek(0)
+ stderr.seek(0)
+ return retcode, stdout, stderr
+
+def test_main():
+ retcode, stdout, stderr = run_main()
+ assert retcode == 0
+ assert not stderr.getvalue()
+ report = odf.opendocument.load(stdout)
+ assert report.spreadsheet.childNodes