conservancy_beancount/conservancy_beancount/reports/balance_sheet.py

656 lines
24 KiB
Python

"""balance_sheet.py - Balance sheet report"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import collections
import datetime
import enum
import logging
import operator
import os
import sys
from decimal import Decimal
from pathlib import Path
from typing import (
Any,
Callable,
Collection,
Dict,
Hashable,
Iterable,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
TextIO,
Tuple,
Union,
)
import odf.style # type:ignore[import]
import odf.table # type:ignore[import]
from beancount.parser import printer as bc_printer
from . import core
from .. import books
from .. import cliutil
from .. import config as configmod
from .. import data
from .. import ranges
EQUITY_ACCOUNTS = frozenset(['Equity', 'Income', 'Expenses'])
PROGNAME = 'balance-sheet-report'
logger = logging.getLogger('conservancy_beancount.tools.balance_sheet')
KWArgs = Mapping[str, Any]
class Fund(enum.IntFlag):
RESTRICTED = enum.auto()
UNRESTRICTED = enum.auto()
ANY = RESTRICTED | UNRESTRICTED
class Period(enum.IntFlag):
OPENING = enum.auto()
PRIOR = enum.auto()
PERIOD = enum.auto()
BEFORE_PERIOD = OPENING | PRIOR
ANY = OPENING | PRIOR | PERIOD
class BalanceKey(NamedTuple):
account: data.Account
classification: data.Account
period: Period
fund: Fund
post_type: Optional[str]
class Balances:
def __init__(self,
postings: Iterable[data.Posting],
start_date: datetime.date,
stop_date: datetime.date,
fund_key: str='project',
unrestricted_fund_value: str='Conservancy',
) -> None:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -1),
cliutil.diff_year(stop_date, -1),
)
assert self.prior_range.stop <= start_date
self.period_range = ranges.DateRange(start_date, stop_date)
self.balances: Mapping[BalanceKey, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for post in postings:
post_date = post.meta.date
if post_date in self.period_range:
period = Period.PERIOD
elif post_date in self.prior_range:
period = Period.PRIOR
elif post_date < self.prior_range.start:
period = Period.OPENING
else:
continue
if post.account == 'Expenses:CurrencyConversion':
account = data.Account('Income:CurrencyConversion')
else:
account = post.account
if post.meta.get(fund_key) == unrestricted_fund_value:
fund = Fund.UNRESTRICTED
else:
fund = Fund.RESTRICTED
try:
classification_s = account.meta['classification']
if isinstance(classification_s, str):
classification = data.Account(classification_s)
else:
raise TypeError()
except (KeyError, TypeError):
classification = account
if account.root_part() == 'Expenses':
post_type = post.meta.get('expense-type')
else:
post_type = None
key = BalanceKey(account, classification, period, fund, post_type)
self.balances[key] += post.at_cost()
def total(self,
account: Union[None, str, Collection[str]]=None,
classification: Optional[str]=None,
period: int=Period.ANY,
fund: int=Fund.ANY,
post_type: Optional[str]=None,
) -> core.Balance:
if isinstance(account, str):
account = (account,)
retval = core.MutableBalance()
for key, balance in self.balances.items():
if not (account is None or key.account.is_under(*account)):
pass
elif not (classification is None
or key.classification.is_under(classification)):
pass
elif not period & key.period:
pass
elif not fund & key.fund:
pass
elif not (post_type is None or post_type == key.post_type):
pass
else:
retval += balance
return retval
def classifications(self,
account: str,
sort_period: Optional[int]=None,
) -> Sequence[data.Account]:
if sort_period is None:
if account in EQUITY_ACCOUNTS:
sort_period = Period.PERIOD
else:
sort_period = Period.ANY
class_bals: Mapping[data.Account, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for key, balance in self.balances.items():
if not key.account.is_under(account):
pass
elif key.period & sort_period:
class_bals[key.classification] += balance
else:
# Ensure the balance exists in the mapping
class_bals[key.classification]
norm_func = core.normalize_amount_func(f'{account}:RootsOK')
def sortkey(acct: data.Account) -> Hashable:
prefix, _, _ = acct.rpartition(':')
balance = norm_func(class_bals[acct])
try:
max_bal = max(amount.number for amount in balance.values())
except ValueError:
max_bal = Decimal(0)
return prefix, -max_bal
return sorted(class_bals, key=sortkey)
class Report(core.BaseODS[Sequence[None], None]):
C_CASH = 'Cash'
C_SATISFIED = 'Satisfaction of program restrictions'
NO_BALANCE = core.Balance()
SPACE = ' ' * 4
def __init__(self,
balances: Balances,
*,
date_fmt: str='%B %d, %Y',
) -> None:
super().__init__()
self.balances = balances
self.date_fmt = date_fmt
one_day = datetime.timedelta(days=1)
date = balances.period_range.stop - one_day
self.period_name = date.strftime(date_fmt)
date = balances.prior_range.stop - one_day
self.opening_name = date.strftime(date_fmt)
self.last_totals_row = odf.table.TableRow()
def section_key(self, row: Sequence[None]) -> None:
raise NotImplementedError("balance_sheet.Report.section_key")
def init_styles(self) -> None:
super().init_styles()
self.style_header = self.merge_styles(self.style_bold, self.style_centertext)
self.style_huline = self.merge_styles(
self.style_header,
self.border_style(core.Border.BOTTOM, '1pt'),
)
self.style_subtotline = self.border_style(core.Border.TOP, '1pt')
self.style_totline = self.border_style(core.Border.TOP | core.Border.BOTTOM, '1pt')
self.style_bottomline = self.merge_styles(
self.style_subtotline,
self.border_style(core.Border.BOTTOM, '2pt', 'double'),
)
def write_all(self) -> None:
self.write_financial_position()
self.write_activities()
self.write_functional_expenses()
self.write_cash_flows()
def walk_classifications(self, cseq: Iterable[data.Account]) \
-> Iterator[Tuple[str, Optional[data.Account]]]:
last_prefix: Sequence[str] = []
for classification in cseq:
parts = classification.split(':')
tail = parts.pop()
space = self.SPACE * len(parts)
if parts != last_prefix:
yield f'{space[len(self.SPACE):]}{parts[-1]}', None
last_prefix = parts
yield f'{space}{tail}', classification
def walk_classifications_by_account(
self,
account: str,
sort_period: Optional[int]=None,
) -> Iterator[Tuple[str, Optional[data.Account]]]:
return self.walk_classifications(self.balances.classifications(
account, sort_period,
))
def start_sheet(self,
sheet_name: str,
*headers: Iterable[str],
totals_prefix: Sequence[str]=(),
first_width: Union[float, str]=3,
width: Union[float, str]=1.5,
) -> None:
header_cells: Sequence[odf.table.TableCell] = [
odf.table.TableCell(),
*(self.multiline_cell(header_lines, stylename=self.style_huline)
for header_lines in headers),
*(self.multiline_cell([*totals_prefix, date_s], stylename=self.style_huline)
for date_s in [self.period_name, self.opening_name]),
]
self.col_count = len(header_cells)
self.use_sheet(sheet_name)
for index in range(self.col_count):
col_style = self.column_style(width if index else first_width)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
start_date = self.balances.period_range.start.strftime(self.date_fmt)
self.add_row(
self.multiline_cell([
f"DRAFT Statement of {sheet_name}",
f"{start_date}{self.period_name}",
], numbercolumnsspanned=self.col_count, stylename=self.style_header)
)
self.add_row()
self.add_row(*header_cells)
def write_classifications_by_account(
self,
account: str,
balance_kwargs: Sequence[KWArgs],
exclude_classifications: Collection[str]=frozenset(),
text_prefix: str='',
norm_func: Optional[Callable[[core.Balance], core.Balance]]=None,
) -> Sequence[core.Balance]:
if norm_func is None:
norm_func = core.normalize_amount_func(f'{account}:RootsOK')
assert len(balance_kwargs) + 1 == self.col_count, \
"called write_classifications with wrong number of balance_kwargs"
retval = [core.MutableBalance() for _ in balance_kwargs]
for text, classification in self.walk_classifications_by_account(account):
text_cell = self.string_cell(text_prefix + text)
if classification is None:
if not text[0].isspace():
self.add_row()
self.add_row(text_cell)
elif classification in exclude_classifications:
pass
else:
row = self.add_row(text_cell)
for kwargs, total_bal in zip(balance_kwargs, retval):
balance = norm_func(self.balances.total(
classification=classification, **kwargs,
))
row.addElement(self.balance_cell(balance))
total_bal += balance
return retval
def write_totals_row(
self,
text: str,
*balances: Sequence[core.Balance],
stylename: Union[None, str, odf.style.Style]=None,
leading_rows: Optional[int]=None,
) -> odf.table.TableRow:
if leading_rows is None:
if (self.sheet.lastChild is self.last_totals_row
or stylename is self.style_bottomline):
leading_rows = 1
else:
leading_rows = 0
expect_len = self.col_count - 1
assert all(len(seq) == expect_len for seq in balances), \
"called write_totals_row with the wrong length of balance columns"
for _ in range(leading_rows):
self.add_row()
self.last_totals_row = self.add_row(
self.string_cell(text),
*(self.balance_cell(
sum(sum_bals, core.MutableBalance()),
stylename=stylename,
) for sum_bals in zip(*balances)),
)
return self.last_totals_row
def write_financial_position(self) -> None:
self.start_sheet("Financial Position")
balance_kwargs: Sequence[KWArgs] = [
{'period': Period.ANY},
{'period': Period.BEFORE_PERIOD},
]
asset_totals = self.write_classifications_by_account('Assets', balance_kwargs)
self.write_totals_row(
"Total Assets", asset_totals, stylename=self.style_bottomline,
)
self.add_row()
self.add_row()
liabilities = self.write_classifications_by_account('Liabilities', balance_kwargs)
self.write_totals_row(
"Total Liabilities", liabilities, stylename=self.style_totline,
)
self.add_row()
self.add_row()
equity_totals = [core.MutableBalance() for _ in balance_kwargs]
self.add_row(self.string_cell("Net Assets", stylename=self.style_bold))
self.add_row()
for fund in [Fund.UNRESTRICTED, Fund.RESTRICTED]:
preposition = "Without" if fund is Fund.UNRESTRICTED else "With"
row = self.add_row(self.string_cell(f"{preposition} donor restrictions"))
for kwargs, total_bal in zip(balance_kwargs, equity_totals):
balance = -self.balances.total(account=EQUITY_ACCOUNTS, fund=fund, **kwargs)
row.addElement(self.balance_cell(balance))
total_bal += balance
self.write_totals_row(
"Total Net Assets", equity_totals, stylename=self.style_subtotline,
)
self.write_totals_row(
"Total Liabilities and Net Assets",
liabilities, equity_totals,
stylename=self.style_bottomline,
)
def write_activities(self) -> None:
self.start_sheet(
"Activities",
["Without Donor", "Restrictions"],
["With Donor", "Restrictions"],
totals_prefix=["Total Year Ended"],
)
bal_kwargs: Sequence[Dict[str, Any]] = [
{'period': Period.PERIOD, 'fund': Fund.UNRESTRICTED},
{'period': Period.PERIOD, 'fund': Fund.RESTRICTED},
{'period': Period.PERIOD},
{'period': Period.PRIOR},
]
self.add_row(self.string_cell("Support and Revenue", stylename=self.style_bold))
self.add_row()
income_totals = self.write_classifications_by_account(
'Income', bal_kwargs, (self.C_SATISFIED,),
)
self.write_totals_row("", income_totals, stylename=self.style_subtotline)
self.add_row()
self.add_row(
self.string_cell("Net Assets released from restrictions:"),
)
released = self.balances.total(
account='Expenses', period=Period.PERIOD, fund=Fund.RESTRICTED,
) - self.balances.total(
classification=self.C_SATISFIED, period=Period.PERIOD, fund=Fund.RESTRICTED,
)
other_totals = [core.MutableBalance() for _ in bal_kwargs]
other_totals[0] += released
other_totals[1] -= released
self.write_totals_row(self.C_SATISFIED, other_totals)
self.write_totals_row(
"Total Support and Revenue",
income_totals, other_totals,
stylename=self.style_totline,
)
period_expenses = core.MutableBalance()
prior_expenses = core.MutableBalance()
self.add_row()
self.add_row(self.string_cell("Expenses", stylename=self.style_bold))
self.add_row()
for text, type_value in [
("Program services", 'program'),
("Management and administrative services", 'management'),
("Fundraising", 'fundraising'),
]:
period_bal = self.balances.total(
account='Expenses', period=Period.PERIOD, post_type=type_value,
)
prior_bal = self.balances.total(
account='Expenses', period=Period.PRIOR, post_type=type_value,
)
self.write_totals_row(text, [
period_bal,
self.NO_BALANCE,
period_bal,
prior_bal,
], leading_rows=0)
period_expenses += period_bal
prior_expenses += prior_bal
period_bal = self.balances.total(account='Expenses', period=Period.PERIOD)
if (period_expenses - period_bal).clean_copy(1).is_zero():
period_bal = period_expenses
else:
logger.warning("Period functional expenses do not match total; math in columns B+D is wrong")
prior_bal = self.balances.total(account='Expenses', period=Period.PRIOR)
if (prior_expenses - prior_bal).clean_copy(1).is_zero():
prior_bal = prior_expenses
else:
logger.warning("Prior functional expenses do not match total; math in column E is wrong")
self.write_totals_row("Total Expenses", [
period_bal,
self.NO_BALANCE,
period_bal,
prior_bal,
], stylename=self.style_totline, leading_rows=0)
other_totals[0] -= period_bal
other_totals[2] -= period_bal
other_totals[3] -= prior_bal
self.write_totals_row("Change in Net Assets", income_totals, other_totals)
for kwargs in bal_kwargs:
if kwargs['period'] is Period.PERIOD:
kwargs['period'] = Period.BEFORE_PERIOD
else:
kwargs['period'] = Period.OPENING
equity_totals = [
-self.balances.total(account=EQUITY_ACCOUNTS, **kwargs)
for kwargs in bal_kwargs
]
self.write_totals_row("Beginning Net Assets", equity_totals)
self.write_totals_row(
"Ending Net Assets",
income_totals, other_totals, equity_totals,
stylename=self.style_bottomline,
)
def write_functional_expenses(self) -> None:
self.start_sheet(
"Functional Expenses",
["Program", "Services"],
["Management and", "Administrative"],
["Fundraising"],
totals_prefix=["Total Year Ended"],
)
totals = self.write_classifications_by_account('Expenses', [
{'period': Period.PERIOD, 'post_type': 'program'},
{'period': Period.PERIOD, 'post_type': 'management'},
{'period': Period.PERIOD, 'post_type': 'fundraising'},
{'period': Period.PERIOD},
{'period': Period.PRIOR},
])
self.write_totals_row(
"Total Expenses",
totals,
stylename=self.style_bottomline,
)
def write_cash_flows(self) -> None:
self.start_sheet("Cash Flows")
bal_kwargs: Sequence[Dict[str, Any]] = [
{'period': Period.PERIOD},
{'period': Period.PRIOR},
]
norm_func = operator.neg
self.add_row(self.string_cell(
"Cash Flows from Operating Activities",
stylename=self.style_bold,
))
equity_totals = [
-self.balances.total(account=EQUITY_ACCOUNTS, **kwargs)
for kwargs in bal_kwargs
]
self.write_totals_row("Change in Net Assets", equity_totals, leading_rows=1)
self.add_row(self.string_cell(
"(Increase) decrease in operating assets:",
))
asset_totals = self.write_classifications_by_account(
'Assets', bal_kwargs, (self.C_CASH,), self.SPACE, norm_func,
)
self.add_row(self.string_cell(
"Increase (decrease) in operating liabilities:",
))
liabilities = self.write_classifications_by_account(
'Liabilities', bal_kwargs, (), self.SPACE, norm_func,
)
period_totals = [
sum(bals, core.MutableBalance())
for bals in zip(equity_totals, asset_totals, liabilities)
]
self.write_totals_row(
"Net cash provided by operating activites",
period_totals,
stylename=self.style_totline,
)
self.write_totals_row("Net Increase in Cash", period_totals)
begin_totals = [
self.balances.total(classification=self.C_CASH, period=period)
for period in [Period.BEFORE_PERIOD, Period.OPENING]
]
self.write_totals_row("Beginning Cash", begin_totals)
self.write_totals_row(
"Ending Cash",
period_totals, begin_totals,
stylename=self.style_bottomline,
)
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(prog=PROGNAME)
cliutil.add_version_argument(parser)
parser.add_argument(
'--begin', '--start', '-b',
dest='start_date',
metavar='DATE',
type=cliutil.date_arg,
help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
The default is one year ago.
""")
parser.add_argument(
'--end', '--stop', '-e',
dest='stop_date',
metavar='DATE',
type=cliutil.date_arg,
help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
The default is a year after the start date.
""")
parser.add_argument(
'--fund-metadata-key', '-m',
metavar='KEY',
default='project',
help="""Name of the fund metadata key. Default %(default)s.
""")
parser.add_argument(
'--unrestricted-fund', '-u',
metavar='PROJECT',
default='Conservancy',
help="""Name of the unrestricted fund. Default %(default)s.
""")
parser.add_argument(
'--output-file', '-O',
metavar='PATH',
type=Path,
help="""Write the report to this file, or stdout when PATH is `-`.
""")
cliutil.add_loglevel_argument(parser)
return parser.parse_args(arglist)
def main(arglist: Optional[Sequence[str]]=None,
stdout: TextIO=sys.stdout,
stderr: TextIO=sys.stderr,
config: Optional[configmod.Config]=None,
) -> int:
args = parse_arguments(arglist)
cliutil.set_loglevel(logger, args.loglevel)
if config is None:
config = configmod.Config()
config.load_file()
if args.stop_date is None:
if args.start_date is None:
args.stop_date = datetime.date.today()
else:
args.stop_date = cliutil.diff_year(args.start_date, 1)
if args.start_date is None:
args.start_date = cliutil.diff_year(args.stop_date, -1)
returncode = 0
books_loader = config.books_loader()
if books_loader is None:
entries, load_errors, options_map = books.Loader.load_none(config.config_file_path())
returncode = cliutil.ExitCode.NoConfiguration
else:
start_fy = config.fiscal_year_begin().for_date(args.start_date) - 1
entries, load_errors, options_map = books_loader.load_fy_range(start_fy, args.stop_date)
if load_errors:
returncode = cliutil.ExitCode.BeancountErrors
elif not entries:
returncode = cliutil.ExitCode.NoDataLoaded
for error in load_errors:
bc_printer.print_error(error, file=stderr)
data.Account.load_from_books(entries, options_map)
balances = Balances(
data.Posting.from_entries(entries),
args.start_date,
args.stop_date,
args.fund_metadata_key,
args.unrestricted_fund,
)
report = Report(balances)
report.set_common_properties(config.books_repo())
report.write_all()
if args.output_file is None:
out_dir_path = config.repository_path() or Path()
args.output_file = out_dir_path / 'BalanceSheet_{}_{}.ods'.format(
args.start_date.isoformat(), args.stop_date.isoformat(),
)
logger.info("Writing report to %s", args.output_file)
ods_file = cliutil.bytes_output(args.output_file, stdout)
report.save_file(ods_file)
return returncode
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
if __name__ == '__main__':
exit(entry_point())