conservancy_beancount/conservancy_beancount/reports/balance_sheet.py
Brett Smith e05f55659a balance_sheet: Balance only considers post_type for Expenses.
This simplifies the code and slightly optimizes it, since now Balance
won't store and keep re-summing income-type breakdowns that nothing
needs.
2020-08-18 01:28:37 -04:00

782 lines
30 KiB
Python

"""balance_sheet.py - Balance sheet report"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import collections
import datetime
import enum
import logging
import os
import sys
from decimal import Decimal
from pathlib import Path
from typing import (
Any,
Dict,
Hashable,
Iterable,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
TextIO,
Tuple,
)
import odf.table # type:ignore[import]
from beancount.parser import printer as bc_printer
from . import core
from .. import books
from .. import cliutil
from .. import config as configmod
from .. import data
from .. import ranges
EQUITY_ACCOUNTS = frozenset(['Equity', 'Income', 'Expenses'])
PROGNAME = 'balance-sheet-report'
logger = logging.getLogger('conservancy_beancount.tools.balance_sheet')
class Fund(enum.IntFlag):
RESTRICTED = enum.auto()
UNRESTRICTED = enum.auto()
ANY = RESTRICTED | UNRESTRICTED
class Period(enum.IntFlag):
OPENING = enum.auto()
PRIOR = enum.auto()
PERIOD = enum.auto()
BEFORE_PERIOD = OPENING | PRIOR
ANY = OPENING | PRIOR | PERIOD
class BalanceKey(NamedTuple):
account: data.Account
classification: data.Account
period: Period
fund: Fund
post_type: Optional[str]
class Balances:
def __init__(self,
postings: Iterable[data.Posting],
start_date: datetime.date,
stop_date: datetime.date,
fund_key: str='project',
unrestricted_fund_value: str='Conservancy',
) -> None:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -1),
cliutil.diff_year(stop_date, -1),
)
assert self.prior_range.stop <= start_date
self.period_range = ranges.DateRange(start_date, stop_date)
self.balances: Mapping[BalanceKey, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for post in postings:
post_date = post.meta.date
if post_date in self.period_range:
period = Period.PERIOD
elif post_date in self.prior_range:
period = Period.PRIOR
elif post_date < self.prior_range.start:
period = Period.OPENING
else:
continue
if post.account == 'Expenses:CurrencyConversion':
account = data.Account('Income:CurrencyConversion')
else:
account = post.account
if post.meta.get(fund_key) == unrestricted_fund_value:
fund = Fund.UNRESTRICTED
else:
fund = Fund.RESTRICTED
try:
classification_s = account.meta['classification']
if isinstance(classification_s, str):
classification = data.Account(classification_s)
else:
raise TypeError()
except (KeyError, TypeError):
classification = account
if account.root_part() == 'Expenses':
post_type = post.meta.get('expense-type')
else:
post_type = None
key = BalanceKey(account, classification, period, fund, post_type)
self.balances[key] += post.at_cost()
def total(self,
account: Optional[str]=None,
classification: Optional[str]=None,
period: int=Period.ANY,
fund: int=Fund.ANY,
post_type: Optional[str]=None,
) -> core.Balance:
retval = core.MutableBalance()
for key, balance in self.balances.items():
if not (account is None or key.account.is_under(account)):
pass
elif not (classification is None
or key.classification.is_under(classification)):
pass
elif not period & key.period:
pass
elif not fund & key.fund:
pass
elif not (post_type is None or post_type == key.post_type):
pass
else:
retval += balance
return retval
def classifications(self,
account: str,
sort_period: Optional[int]=None,
) -> Sequence[data.Account]:
if sort_period is None:
if account in EQUITY_ACCOUNTS:
sort_period = Period.PERIOD
else:
sort_period = Period.ANY
class_bals: Mapping[data.Account, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for key, balance in self.balances.items():
if not key.account.is_under(account):
pass
elif key.period & sort_period:
class_bals[key.classification] += balance
else:
# Ensure the balance exists in the mapping
class_bals[key.classification]
norm_func = core.normalize_amount_func(f'{account}:RootsOK')
def sortkey(acct: data.Account) -> Hashable:
prefix, _, _ = acct.rpartition(':')
balance = norm_func(class_bals[acct])
try:
max_bal = max(amount.number for amount in balance.values())
except ValueError:
max_bal = Decimal(0)
return prefix, -max_bal
return sorted(class_bals, key=sortkey)
class Report(core.BaseODS[Sequence[None], None]):
C_CASH = 'Cash'
C_SATISFIED = 'Satisfaction of program restrictions'
NO_BALANCE = core.Balance()
SPACE = ' ' * 4
def __init__(self,
balances: Balances,
*,
date_fmt: str='%B %d, %Y',
) -> None:
super().__init__()
self.balances = balances
one_day = datetime.timedelta(days=1)
date = balances.period_range.stop - one_day
self.period_name = date.strftime(date_fmt)
date = balances.prior_range.stop - one_day
self.opening_name = date.strftime(date_fmt)
def section_key(self, row: Sequence[None]) -> None:
raise NotImplementedError("balance_sheet.Report.section_key")
def init_styles(self) -> None:
super().init_styles()
self.style_header = self.merge_styles(self.style_bold, self.style_centertext)
self.style_huline = self.merge_styles(
self.style_header,
self.border_style(core.Border.BOTTOM, '1pt'),
)
self.style_subtotline = self.border_style(core.Border.TOP, '1pt')
self.style_totline = self.border_style(core.Border.TOP | core.Border.BOTTOM, '1pt')
self.style_bottomline = self.merge_styles(
self.style_subtotline,
self.border_style(core.Border.BOTTOM, '2pt', 'double'),
)
def write_all(self) -> None:
self.write_financial_position()
self.write_activities()
self.write_functional_expenses()
self.write_cash_flows()
def walk_classifications(self, cseq: Iterable[data.Account]) \
-> Iterator[Tuple[str, Optional[data.Account]]]:
last_prefix: Sequence[str] = []
for classification in cseq:
parts = classification.split(':')
tail = parts.pop()
space = self.SPACE * len(parts)
if parts != last_prefix:
yield f'{space[len(self.SPACE):]}{parts[-1]}', None
last_prefix = parts
yield f'{space}{tail}', classification
def walk_classifications_by_account(
self,
account: str,
sort_period: Optional[int]=None,
) -> Iterator[Tuple[str, Optional[data.Account]]]:
return self.walk_classifications(self.balances.classifications(
account, sort_period,
))
def write_financial_position(self) -> None:
self.use_sheet("Financial Position")
for width in [3, 1.5, 1.5]:
col_style = self.column_style(width)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
self.add_row(
self.multiline_cell([
"DRAFT Statement of Financial Position",
self.period_name,
], numbercolumnsspanned=3, stylename=self.style_header)
)
self.add_row()
self.add_row(
odf.table.TableCell(),
self.string_cell(self.period_name, stylename=self.style_huline),
self.string_cell(self.opening_name, stylename=self.style_huline),
)
prior_assets = core.MutableBalance()
period_assets = core.MutableBalance()
self.add_row(self.string_cell("Assets", stylename=self.style_bold))
self.add_row()
for text, classification in self.walk_classifications_by_account('Assets'):
text_cell = self.string_cell(text)
if classification is None:
self.add_row(text_cell)
else:
period_bal = self.balances.total(classification=classification)
prior_bal = period_bal - self.balances.total(
classification=classification, period=Period.PERIOD,
)
self.add_row(
text_cell,
self.balance_cell(period_bal),
self.balance_cell(prior_bal),
)
prior_assets += prior_bal
period_assets += period_bal
self.add_row()
self.add_row(
self.string_cell("Total Assets"),
self.balance_cell(period_assets, stylename=self.style_bottomline),
self.balance_cell(prior_assets, stylename=self.style_bottomline),
)
self.add_row()
self.add_row()
prior_liabilities = core.MutableBalance()
period_liabilities = core.MutableBalance()
self.add_row(self.string_cell("Liabilities and Net Assets",
stylename=self.style_bold))
self.add_row()
self.add_row(self.string_cell("Liabilities", stylename=self.style_bold))
self.add_row()
for text, classification in self.walk_classifications_by_account('Liabilities'):
text_cell = self.string_cell(text)
if classification is None:
self.add_row(text_cell)
else:
period_bal = -self.balances.total(classification=classification)
prior_bal = period_bal + self.balances.total(
classification=classification, period=Period.PERIOD,
)
self.add_row(
text_cell,
self.balance_cell(period_bal),
self.balance_cell(prior_bal),
)
prior_liabilities += prior_bal
period_liabilities += period_bal
self.add_row(
self.string_cell("Total Liabilities"),
self.balance_cell(period_liabilities, stylename=self.style_totline),
self.balance_cell(prior_liabilities, stylename=self.style_totline),
)
self.add_row()
self.add_row()
prior_net = core.MutableBalance()
period_net = core.MutableBalance()
self.add_row(self.string_cell("Net Assets", stylename=self.style_bold))
self.add_row()
for fund in [Fund.UNRESTRICTED, Fund.RESTRICTED]:
preposition = "Without" if fund is Fund.UNRESTRICTED else "With"
period_bal = -sum(
(self.balances.total(account=account, fund=fund)
for account in EQUITY_ACCOUNTS), core.MutableBalance(),
)
prior_bal = period_bal + sum(
(self.balances.total(account=account, fund=fund, period=Period.PERIOD)
for account in EQUITY_ACCOUNTS), core.MutableBalance(),
)
self.add_row(
self.string_cell(f"{preposition} donor restrictions"),
self.balance_cell(period_bal),
self.balance_cell(prior_bal),
)
prior_net += prior_bal
period_net += period_bal
self.add_row(
self.string_cell("Total Net Assets"),
self.balance_cell(period_net, stylename=self.style_subtotline),
self.balance_cell(prior_net, stylename=self.style_subtotline),
)
self.add_row()
self.add_row(
self.string_cell("Total Liabilities and Net Assets"),
self.balance_cell(period_liabilities + period_net,
stylename=self.style_bottomline),
self.balance_cell(prior_liabilities + prior_net,
stylename=self.style_bottomline),
)
def write_activities(self) -> None:
self.use_sheet("Activities")
bal_kwargs: Sequence[Dict[str, Any]] = [
{'period': Period.PERIOD, 'fund': Fund.UNRESTRICTED},
{'period': Period.PERIOD, 'fund': Fund.RESTRICTED},
{'period': Period.PERIOD},
{'period': Period.PRIOR},
]
col_count = len(bal_kwargs) + 1
for index in range(col_count):
col_style = self.column_style(1.5 if index else 3)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
self.add_row(
self.multiline_cell([
"DRAFT Statement of Activities",
self.period_name,
], numbercolumnsspanned=col_count, stylename=self.style_header)
)
self.add_row()
self.add_row(
odf.table.TableCell(),
self.multiline_cell(["Without Donor", "Restrictions"],
stylename=self.style_huline),
self.multiline_cell(["With Donor", "Restrictions"],
stylename=self.style_huline),
self.multiline_cell(["Total Year Ended", self.period_name],
stylename=self.style_huline),
self.multiline_cell(["Total Year Ended", self.opening_name],
stylename=self.style_huline),
)
totals = [core.MutableBalance() for _ in bal_kwargs]
self.add_row(self.string_cell("Support and Revenue", stylename=self.style_bold))
self.add_row()
for text, classification in self.walk_classifications_by_account('Income'):
text_cell = self.string_cell(text)
if classification is None:
self.add_row(text_cell)
elif classification == self.C_SATISFIED:
continue
else:
balances = [
-self.balances.total(classification=classification, **kwargs)
for kwargs in bal_kwargs
]
self.add_row(
text_cell,
*(self.balance_cell(bal) for bal in balances),
)
for total, bal in zip(totals, balances):
total += bal
self.add_row(
odf.table.TableCell(),
*(self.balance_cell(total, stylename=self.style_subtotline)
for total in totals),
)
self.add_row()
self.add_row(
self.string_cell("Net Assets released from restrictions:"),
)
released = self.balances.total(
account='Expenses', period=Period.PERIOD, fund=Fund.RESTRICTED,
) - self.balances.total(
classification=self.C_SATISFIED, period=Period.PERIOD, fund=Fund.RESTRICTED,
)
totals[0] += released
totals[1] -= released
self.add_row(
self.string_cell(self.C_SATISFIED),
self.balance_cell(released),
self.balance_cell(-released),
self.balance_cell(self.NO_BALANCE),
self.balance_cell(self.NO_BALANCE),
)
self.add_row()
self.add_row(
self.string_cell("Total Support and Revenue"),
*(self.balance_cell(total, stylename=self.style_totline)
for total in totals),
)
period_expenses = core.MutableBalance()
prior_expenses = core.MutableBalance()
self.add_row()
self.add_row(self.string_cell("Expenses", stylename=self.style_bold))
self.add_row()
for text, type_value in [
("Program services", 'program'),
("Management and administrative services", 'management'),
("Fundraising", 'fundraising'),
]:
period_bal = self.balances.total(
account='Expenses', period=Period.PERIOD, post_type=type_value,
)
prior_bal = self.balances.total(
account='Expenses', period=Period.PRIOR, post_type=type_value,
)
self.add_row(
self.string_cell(text),
self.balance_cell(period_bal),
self.balance_cell(self.NO_BALANCE),
self.balance_cell(period_bal),
self.balance_cell(prior_bal),
)
period_expenses += period_bal
prior_expenses += prior_bal
period_bal = self.balances.total(account='Expenses', period=Period.PERIOD)
if (period_expenses - period_bal).clean_copy(1).is_zero():
period_bal = period_expenses
else:
logger.warning("Period functional expenses do not match total; math in columns B+D is wrong")
prior_bal = self.balances.total(account='Expenses', period=Period.PRIOR)
if (prior_expenses - prior_bal).clean_copy(1).is_zero():
prior_bal = prior_expenses
else:
logger.warning("Prior functional expenses do not match total; math in column E is wrong")
self.add_row(
self.string_cell("Total Expenses"),
self.balance_cell(period_bal, stylename=self.style_totline),
self.balance_cell(self.NO_BALANCE, stylename=self.style_totline),
self.balance_cell(period_bal, stylename=self.style_totline),
self.balance_cell(prior_bal, stylename=self.style_totline),
)
totals[0] -= period_bal
totals[2] -= period_bal
totals[3] -= prior_bal
self.add_row()
self.add_row(
self.string_cell("Change in Net Assets"),
*(self.balance_cell(total) for total in totals),
)
for kwargs in bal_kwargs:
if kwargs['period'] is Period.PERIOD:
kwargs['period'] = Period.BEFORE_PERIOD
else:
kwargs['period'] = Period.OPENING
beginnings = [
-sum((self.balances.total(account=account, **kwargs)
for account in EQUITY_ACCOUNTS), core.MutableBalance())
for kwargs in bal_kwargs
]
self.add_row()
self.add_row(
self.string_cell("Beginning Net Assets"),
*(self.balance_cell(beg_bal) for beg_bal in beginnings),
)
self.add_row()
self.add_row(
self.string_cell("Ending Net Assets"),
*(self.balance_cell(beg_bal + tot_bal, stylename=self.style_bottomline)
for beg_bal, tot_bal in zip(beginnings, totals)),
)
def write_functional_expenses(self) -> None:
self.use_sheet("Functional Expenses")
bal_kwargs: Sequence[Dict[str, Any]] = [
{'period': Period.PERIOD, 'post_type': 'program'},
{'period': Period.PERIOD, 'post_type': 'management'},
{'period': Period.PERIOD, 'post_type': 'fundraising'},
{'period': Period.PERIOD},
{'period': Period.PRIOR},
]
col_count = len(bal_kwargs) + 1
for index in range(col_count):
col_style = self.column_style(1.5 if index else 3)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
self.add_row(
self.multiline_cell([
"DRAFT Statement of Functional Expenses",
self.period_name,
], numbercolumnsspanned=col_count, stylename=self.style_header)
)
self.add_row()
self.add_row(
odf.table.TableCell(),
self.multiline_cell(["Program", "Services"],
stylename=self.style_huline),
self.multiline_cell(["Management and", "Administrative"],
stylename=self.style_huline),
self.multiline_cell(["Fundraising"],
stylename=self.style_huline),
self.multiline_cell(["Total Year Ended", self.period_name],
stylename=self.style_huline),
self.multiline_cell(["Total Year Ended", self.opening_name],
stylename=self.style_huline),
)
totals = [core.MutableBalance() for _ in bal_kwargs]
for text, classification in self.walk_classifications_by_account('Expenses'):
text_cell = self.string_cell(text)
if classification is None:
if not text[0].isspace():
self.add_row()
self.add_row(text_cell)
else:
balances = [
self.balances.total(classification=classification, **kwargs)
for kwargs in bal_kwargs
]
self.add_row(
text_cell,
*(self.balance_cell(bal) for bal in balances),
)
break_bal = sum(balances[:3], core.MutableBalance())
if not (break_bal - balances[3]).clean_copy(1).is_zero():
logger.warning(
"Functional expenses breakdown does not match total on row %s",
len(self.sheet.childNodes) - col_count,
)
for total, bal in zip(totals, balances):
total += bal
self.add_row()
self.add_row(
self.string_cell("Total Expenses"),
*(self.balance_cell(tot_bal, stylename=self.style_bottomline)
for tot_bal in totals),
)
def write_cash_flows(self) -> None:
self.use_sheet("Cash Flows")
bal_kwargs: Sequence[Dict[str, Any]] = [
{'period': Period.PERIOD},
{'period': Period.PRIOR},
]
col_count = len(bal_kwargs) + 1
for index in range(col_count):
col_style = self.column_style(1.5 if index else 3)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
self.add_row(
self.multiline_cell([
"DRAFT Statement of Cash Flows",
self.period_name,
], numbercolumnsspanned=col_count, stylename=self.style_header)
)
self.add_row()
self.add_row(
odf.table.TableCell(),
self.string_cell(self.period_name, stylename=self.style_huline),
self.string_cell(self.opening_name, stylename=self.style_huline),
)
self.add_row(self.string_cell(
"Cash Flows from Operating Activities",
stylename=self.style_bold,
))
self.add_row()
totals = [
-sum((self.balances.total(account=account, **kwargs)
for account in EQUITY_ACCOUNTS), core.MutableBalance())
for kwargs in bal_kwargs
]
self.add_row(
self.string_cell("Change in Net Assets"),
*(self.balance_cell(bal) for bal in totals),
)
self.add_row(self.string_cell(
"(Increase) decrease in operating assets:",
))
for text, classification in self.walk_classifications_by_account('Assets'):
text_cell = self.string_cell(self.SPACE + text)
if classification is None:
self.add_row(text_cell)
elif classification == self.C_CASH:
continue
else:
balances = [
-self.balances.total(classification=classification, **kwargs)
for kwargs in bal_kwargs
]
self.add_row(
text_cell,
*(self.balance_cell(bal) for bal in balances),
)
for total, bal in zip(totals, balances):
total += bal
self.add_row(self.string_cell(
"Increase (decrease) in operating liabilities:",
))
for text, classification in self.walk_classifications_by_account('Liabilities'):
text_cell = self.string_cell(self.SPACE + text)
if classification is None:
self.add_row(text_cell)
else:
balances = [
-self.balances.total(classification=classification, **kwargs)
for kwargs in bal_kwargs
]
self.add_row(
text_cell,
*(self.balance_cell(bal) for bal in balances),
)
for total, bal in zip(totals, balances):
total += bal
self.add_row(
self.string_cell("Net cash provided by operating activites"),
*(self.balance_cell(tot_bal, stylename=self.style_totline)
for tot_bal in totals),
)
self.add_row()
self.add_row(
self.string_cell("Net Increase in Cash"),
*(self.balance_cell(tot_bal) for tot_bal in totals),
)
self.add_row()
balances = [
self.balances.total(classification=self.C_CASH, period=period)
for period in [Period.BEFORE_PERIOD, Period.OPENING]
]
self.add_row(
self.string_cell("Beginning Cash"),
*(self.balance_cell(bal) for bal in balances),
)
self.add_row()
self.add_row(
self.string_cell("Ending Cash"),
*(self.balance_cell(tot + bal, stylename=self.style_bottomline)
for tot, bal in zip(totals, balances)),
)
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(prog=PROGNAME)
cliutil.add_version_argument(parser)
parser.add_argument(
'--begin', '--start', '-b',
dest='start_date',
metavar='DATE',
type=cliutil.date_arg,
help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
The default is one year ago.
""")
parser.add_argument(
'--end', '--stop', '-e',
dest='stop_date',
metavar='DATE',
type=cliutil.date_arg,
help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
The default is a year after the start date.
""")
parser.add_argument(
'--fund-metadata-key', '-m',
metavar='KEY',
default='project',
help="""Name of the fund metadata key. Default %(default)s.
""")
parser.add_argument(
'--unrestricted-fund', '-u',
metavar='PROJECT',
default='Conservancy',
help="""Name of the unrestricted fund. Default %(default)s.
""")
parser.add_argument(
'--output-file', '-O',
metavar='PATH',
type=Path,
help="""Write the report to this file, or stdout when PATH is `-`.
""")
cliutil.add_loglevel_argument(parser)
return parser.parse_args(arglist)
def main(arglist: Optional[Sequence[str]]=None,
stdout: TextIO=sys.stdout,
stderr: TextIO=sys.stderr,
config: Optional[configmod.Config]=None,
) -> int:
args = parse_arguments(arglist)
cliutil.set_loglevel(logger, args.loglevel)
if config is None:
config = configmod.Config()
config.load_file()
if args.stop_date is None:
if args.start_date is None:
args.stop_date = datetime.date.today()
else:
args.stop_date = cliutil.diff_year(args.start_date, 1)
if args.start_date is None:
args.start_date = cliutil.diff_year(args.stop_date, -1)
returncode = 0
books_loader = config.books_loader()
if books_loader is None:
entries, load_errors, options_map = books.Loader.load_none(config.config_file_path())
returncode = cliutil.ExitCode.NoConfiguration
else:
start_fy = config.fiscal_year_begin().for_date(args.start_date) - 1
entries, load_errors, options_map = books_loader.load_fy_range(start_fy, args.stop_date)
if load_errors:
returncode = cliutil.ExitCode.BeancountErrors
elif not entries:
returncode = cliutil.ExitCode.NoDataLoaded
for error in load_errors:
bc_printer.print_error(error, file=stderr)
data.Account.load_from_books(entries, options_map)
balances = Balances(
data.Posting.from_entries(entries),
args.start_date,
args.stop_date,
args.fund_metadata_key,
args.unrestricted_fund,
)
report = Report(balances)
report.set_common_properties(config.books_repo())
report.write_all()
if args.output_file is None:
out_dir_path = config.repository_path() or Path()
args.output_file = out_dir_path / 'BalanceSheet_{}_{}.ods'.format(
args.start_date.isoformat(), args.stop_date.isoformat(),
)
logger.info("Writing report to %s", args.output_file)
ods_file = cliutil.bytes_output(args.output_file, stdout)
report.save_file(ods_file)
return returncode
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
if __name__ == '__main__':
exit(entry_point())