reports: Move Balance class and friends to core.

This will be used for the fund report and the upcoming budget variance
report.
This commit is contained in:
Brett Smith 2020-10-16 10:53:15 -04:00
parent 73bbc1e4ec
commit ffc20b6899
4 changed files with 318 additions and 241 deletions

View file

@ -59,168 +59,9 @@ from .. import ranges
PROGNAME = 'balance-sheet-report'
logger = logging.getLogger('conservancy_beancount.reports.balance_sheet')
Fund = core.Fund
KWArgs = Mapping[str, Any]
class Fund(enum.IntFlag):
RESTRICTED = enum.auto()
UNRESTRICTED = enum.auto()
ANY = RESTRICTED | UNRESTRICTED
class Period(enum.IntFlag):
OPENING = enum.auto()
PRIOR = enum.auto()
MIDDLE = enum.auto()
PERIOD = enum.auto()
THRU_PRIOR = OPENING | PRIOR
THRU_MIDDLE = THRU_PRIOR | MIDDLE
ANY = THRU_MIDDLE | PERIOD
class BalanceKey(NamedTuple):
account: data.Account
classification: data.Account
period: Period
fund: Fund
post_type: Optional[str]
class Balances:
def __init__(self,
postings: Iterable[data.Posting],
start_date: datetime.date,
stop_date: datetime.date,
fund_key: str='project',
unrestricted_fund_value: str='Conservancy',
) -> None:
year_diff = (stop_date - start_date).days // 365
if year_diff == 0:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -1),
cliutil.diff_year(stop_date, -1),
)
self.period_desc = "Period"
else:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -year_diff),
start_date,
)
self.period_desc = f"Year{'s' if year_diff > 1 else ''}"
self.middle_range = ranges.DateRange(self.prior_range.stop, start_date)
self.period_range = ranges.DateRange(start_date, stop_date)
self.balances: Mapping[BalanceKey, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for post in postings:
post_date = post.meta.date
if post_date >= stop_date:
continue
elif post_date in self.period_range:
period = Period.PERIOD
elif post_date in self.middle_range:
period = Period.MIDDLE
elif post_date in self.prior_range:
period = Period.PRIOR
else:
period = Period.OPENING
if post.meta.get(fund_key) == unrestricted_fund_value:
fund = Fund.UNRESTRICTED
else:
fund = Fund.RESTRICTED
try:
classification_s = post.account.meta['classification']
if isinstance(classification_s, str):
classification = data.Account(classification_s)
else:
raise TypeError()
except (KeyError, TypeError):
classification = post.account
if post.account.root_part() == 'Expenses':
post_type = post.meta.get('expense-type')
else:
post_type = None
key = BalanceKey(post.account, classification, period, fund, post_type)
self.balances[key] += post.at_cost()
def total(self,
account: Union[None, str, Collection[str]]=None,
classification: Optional[str]=None,
period: int=Period.ANY,
fund: int=Fund.ANY,
post_type: Optional[str]=None,
*,
account_exact: bool=False,
) -> core.Balance:
if isinstance(account, str):
account = (account,)
acct_pred: Callable[[data.Account], bool]
if account is None:
acct_pred = lambda acct: True
elif account_exact:
# At this point, between this isinstance() above and the earlier
# `account is None` check, we've collapsed the type of `account` to
# `Collection[str]`. Unfortunately the logic is too involved for
# mypy to follow, so ignore the type problem.
acct_pred = lambda acct: acct in account # type:ignore[operator]
else:
acct_pred = lambda acct: acct.is_under(*account) is not None # type:ignore[misc]
retval = core.MutableBalance()
for key, balance in self.balances.items():
if not acct_pred(key.account):
pass
elif not (classification is None
or key.classification.is_under(classification)):
pass
elif not period & key.period:
pass
elif not fund & key.fund:
pass
elif not (post_type is None or post_type == key.post_type):
pass
else:
retval += balance
return retval
def classifications(self,
account: str,
sort_period: Optional[int]=None,
) -> Sequence[data.Account]:
if sort_period is None:
if account in data.EQUITY_ACCOUNTS:
sort_period = Period.PERIOD
else:
sort_period = Period.ANY
class_bals: Mapping[data.Account, core.MutableBalance] \
= collections.defaultdict(core.MutableBalance)
for key, balance in self.balances.items():
if not key.account.is_under(account):
pass
elif key.period & sort_period:
class_bals[key.classification] += balance
else:
# Ensure the balance exists in the mapping
class_bals[key.classification]
norm_func = core.normalize_amount_func(f'{account}:RootsOK')
def sortkey(acct: data.Account) -> Hashable:
prefix, _, _ = acct.rpartition(':')
balance = norm_func(class_bals[acct])
try:
max_bal = max(amount.number for amount in balance.values())
except ValueError:
max_bal = Decimal(0)
return prefix, -max_bal
return sorted(class_bals, key=sortkey)
def iter_accounts(self, root: Optional[str]=None) -> Sequence[data.Account]:
start_date = self.period_range.start
stop_date = self.period_range.stop
return sorted(
account
for account in data.Account.iter_accounts(root)
if account.meta.open_date < stop_date
and (account.meta.close_date is None
or account.meta.close_date > start_date)
)
Period = core.Period
class Report(core.BaseODS[Sequence[None], None]):
C_CASH = 'Cash'
@ -228,7 +69,7 @@ class Report(core.BaseODS[Sequence[None], None]):
SPACE = ' ' * 4
def __init__(self,
balances: Balances,
balances: core.Balances,
*,
date_fmt: str='%B %d, %Y',
) -> None:
@ -717,7 +558,7 @@ def main(arglist: Optional[Sequence[str]]=None,
return cliutil.ExitCode.RewriteRulesError
postings = ruleset.rewrite(postings)
balances = Balances(
balances = core.Balances(
postings,
args.start_date,
args.stop_date,

View file

@ -46,9 +46,10 @@ from pathlib import Path
from beancount.core import amount as bc_amount
from odf.namespaces import TOOLSVERSION # type:ignore[import]
from ..cliutil import VERSION
from .. import cliutil
from .. import data
from .. import filters
from .. import ranges
from .. import rtutil
from typing import (
@ -57,13 +58,16 @@ from typing import (
Any,
BinaryIO,
Callable,
Collection,
Dict,
Generic,
Hashable,
Iterable,
Iterator,
List,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Set,
@ -258,6 +262,214 @@ class MutableBalance(Balance):
return self
class Fund(enum.IntFlag):
RESTRICTED = enum.auto()
UNRESTRICTED = enum.auto()
ANY = RESTRICTED | UNRESTRICTED
class Period(enum.IntFlag):
"""Constants to represent reporting periods
Chronologically the periods go::
OPENINGPRIORMIDDLE (may be empty)PERIOD
PERIOD is the time period being reportedi.e., the time period covered
by the user's ``--begin`` and ``--end`` options.
PRIOR is the same time period one year prior (for reporting periods up to
one whole year, inclusive), or the same time duration as PERIOD going
backward from the beginning of the reporting period.
MIDDLE is the time period between PRIOR and PERIOD. This is empty if
PERIOD is a year or more.
OPENING is all time before PRIOR.
"""
OPENING = enum.auto()
PRIOR = enum.auto()
MIDDLE = enum.auto()
PERIOD = enum.auto()
THRU_PRIOR = OPENING | PRIOR
THRU_MIDDLE = THRU_PRIOR | MIDDLE
ANY = THRU_MIDDLE | PERIOD
class BalanceKey(NamedTuple):
account: data.Account
classification: data.Account
period: Period
fund: Fund
post_type: Optional[str]
class Balances:
"""Queryable database of balances
Given an iterable of Postings and a reporting period, this class tallies up
balances divided by account, time period, fund, and posting metadata.
You can then use the ``total()`` method to get a single balance for postings
that match different criteria you specify along those divisions. For reports
that don't need to report posting-level data, just high-level balances,
this class is usually best to use, providing good performance with a
just-powerful-enough access API.
"""
def __init__(self,
postings: Iterable[data.Posting],
start_date: datetime.date,
stop_date: datetime.date,
fund_key: str='project',
unrestricted_fund_value: str='Conservancy',
) -> None:
year_diff = (stop_date - start_date).days // 365
if year_diff == 0:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -1),
cliutil.diff_year(stop_date, -1),
)
self.period_desc = "Period"
else:
self.prior_range = ranges.DateRange(
cliutil.diff_year(start_date, -year_diff),
start_date,
)
self.period_desc = f"Year{'s' if year_diff > 1 else ''}"
self.middle_range = ranges.DateRange(self.prior_range.stop, start_date)
self.period_range = ranges.DateRange(start_date, stop_date)
self.balances: Mapping[BalanceKey, MutableBalance] \
= collections.defaultdict(MutableBalance)
for post in postings:
post_date = post.meta.date
if post_date >= stop_date:
continue
elif post_date in self.period_range:
period = Period.PERIOD
elif post_date in self.middle_range:
period = Period.MIDDLE
elif post_date in self.prior_range:
period = Period.PRIOR
else:
period = Period.OPENING
if post.meta.get(fund_key) == unrestricted_fund_value:
fund = Fund.UNRESTRICTED
else:
fund = Fund.RESTRICTED
try:
classification_s = post.account.meta['classification']
if isinstance(classification_s, str):
classification = data.Account(classification_s)
else:
raise TypeError()
except (KeyError, TypeError):
classification = post.account
if post.account.root_part() == 'Expenses':
post_type = post.meta.get('expense-type')
else:
post_type = None
key = BalanceKey(post.account, classification, period, fund, post_type)
self.balances[key] += post.at_cost()
def total(self,
account: Union[None, str, Collection[str]]=None,
classification: Optional[str]=None,
period: int=Period.ANY,
fund: int=Fund.ANY,
post_type: Optional[str]=None,
*,
account_exact: bool=False,
) -> Balance:
"""Return the balance of postings that match given criteria
Given ``account`` and/or ``classification`` criteria, returns the total
balance of postings *under* that account and/or classification. If you
pass ``account_exact=True``, the postings must have exactly the
``account`` you specify instead.
Given ``period``, ``fund``, or ``post_type`` criteria, limits to
reporting the balance of postings that match that reporting period,
fund type, or metadata value, respectively.
"""
if isinstance(account, str):
account = (account,)
acct_pred: Callable[[data.Account], bool]
if account is None:
acct_pred = lambda acct: True
elif account_exact:
# At this point, between this isinstance() above and the earlier
# `account is None` check, we've collapsed the type of `account` to
# `Collection[str]`. Unfortunately the logic is too involved for
# mypy to follow, so ignore the type problem.
acct_pred = lambda acct: acct in account # type:ignore[operator]
else:
acct_pred = lambda acct: acct.is_under(*account) is not None # type:ignore[misc]
retval = MutableBalance()
for key, balance in self.balances.items():
if not acct_pred(key.account):
pass
elif not (classification is None
or key.classification.is_under(classification)):
pass
elif not period & key.period:
pass
elif not fund & key.fund:
pass
elif not (post_type is None or post_type == key.post_type):
pass
else:
retval += balance
return retval
def classifications(self,
account: str,
sort_period: Optional[int]=None,
) -> Sequence[data.Account]:
"""Return a sequence of seen account classifications
Given an account name, returns a sequence of all the account
classifications seen in the postings under that part of the account
hierarchy. The classifications are sorted in descending order by the
balance of postings under them for the ``sort_period`` time period.
"""
if sort_period is None:
if account in data.EQUITY_ACCOUNTS:
sort_period = Period.PERIOD
else:
sort_period = Period.ANY
class_bals: Mapping[data.Account, MutableBalance] \
= collections.defaultdict(MutableBalance)
for key, balance in self.balances.items():
if not key.account.is_under(account):
pass
elif key.period & sort_period:
class_bals[key.classification] += balance
else:
# Ensure the balance exists in the mapping
class_bals[key.classification]
norm_func = normalize_amount_func(f'{account}:RootsOK')
def sortkey(acct: data.Account) -> Hashable:
prefix, _, _ = acct.rpartition(':')
balance = norm_func(class_bals[acct])
try:
max_bal = max(amount.number for amount in balance.values())
except ValueError:
max_bal = Decimal(0)
return prefix, -max_bal
return sorted(class_bals, key=sortkey)
def iter_accounts(self, root: Optional[str]=None) -> Sequence[data.Account]:
"""Return a sequence of accounts open during the reporting period
The sequence is sorted by account name.
"""
start_date = self.period_range.start
stop_date = self.period_range.stop
return sorted(
account
for account in data.Account.iter_accounts(root)
if account.meta.open_date < stop_date
and (account.meta.close_date is None
or account.meta.close_date > start_date)
)
class RelatedPostings(Sequence[data.Posting]):
"""Collect and query related postings
@ -1106,7 +1318,7 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
created_elem.addText(created.isoformat())
generator_elem = self.ensure_child(self.document.meta, odf.meta.Generator)
generator_elem.childNodes.clear()
generator_elem.addText(f'{generator}/{VERSION} {TOOLSVERSION}')
generator_elem.addText(f'{generator}/{cliutil.VERSION} {TOOLSVERSION}')
### Rows and cells

View file

@ -18,92 +18,16 @@ import datetime
import io
import itertools
from decimal import Decimal
import pytest
from . import testutil
import odf.opendocument
from beancount.core.data import Open
from conservancy_beancount import data
from conservancy_beancount.reports import balance_sheet
Fund = balance_sheet.Fund
Period = balance_sheet.Period
clean_account_meta = pytest.fixture(scope='module')(testutil.clean_account_meta)
@pytest.fixture(scope='module')
def income_expense_balances():
txns = []
prior_date = datetime.date(2019, 2, 2)
period_date = datetime.date(2019, 4, 4)
for (acct, post_type), fund in itertools.product([
('Income:Donations', 'Donations'),
('Income:Sales', 'RBI'),
('Expenses:Postage', 'fundraising'),
('Expenses:Postage', 'management'),
('Expenses:Postage', 'program'),
('Expenses:Services', 'fundraising'),
('Expenses:Services', 'program'),
], ['Conservancy', 'Alpha']):
root_acct, _, classification = acct.partition(':')
try:
data.Account(acct).meta
except KeyError:
data.Account.load_opening(Open(
{'classification': classification},
datetime.date(2000, 1, 1),
acct, None, None,
))
meta = {
'project': fund,
f'{root_acct.lower().rstrip("s")}-type': post_type,
}
sign = '' if root_acct == 'Expenses' else '-'
txns.append(testutil.Transaction(date=prior_date, postings=[
(acct, f'{sign}2.40', meta),
]))
txns.append(testutil.Transaction(date=period_date, postings=[
(acct, f'{sign}2.60', meta),
]))
return balance_sheet.Balances(
data.Posting.from_entries(txns),
datetime.date(2019, 3, 1),
datetime.date(2020, 3, 1),
)
@pytest.mark.parametrize('kwargs,expected', [
({'account': 'Income:Donations'}, -10),
({'account': 'Income'}, -20),
({'account': 'Income:Nonexistent'}, None),
({'classification': 'Postage'}, 30),
({'classification': 'Services'}, 20),
({'classification': 'Nonexistent'}, None),
({'period': Period.PRIOR, 'account': 'Income'}, '-9.60'),
({'period': Period.PERIOD, 'account': 'Expenses'}, 26),
({'fund': Fund.RESTRICTED, 'account': 'Income'}, -10),
({'fund': Fund.UNRESTRICTED, 'account': 'Expenses'}, 25),
({'post_type': 'fundraising'}, 20),
({'post_type': 'management'}, 10),
({'post_type': 'Nonexistent'}, None),
({'period': Period.PRIOR, 'post_type': 'fundraising'}, '9.60'),
({'fund': Fund.RESTRICTED, 'post_type': 'program'}, 10),
({'period': Period.PRIOR, 'fund': Fund.RESTRICTED, 'post_type': 'program'}, '4.80'),
({'period': Period.PERIOD, 'fund': Fund.RESTRICTED, 'post_type': 'ø'}, None),
({'account': ('Income', 'Expenses')}, 30),
({'account': ('Income', 'Expenses'), 'fund': Fund.UNRESTRICTED}, 15),
])
def test_balance_total(income_expense_balances, kwargs, expected):
actual = income_expense_balances.total(**kwargs)
if expected is None:
assert not actual
else:
assert actual == {'USD': testutil.Amount(expected)}
def run_main(arglist=[], config=None):
if config is None:
config = testutil.TestConfig(books_path=testutil.test_path('books/fund.beancount'))

View file

@ -0,0 +1,100 @@
"""test_reports_balances.py - Unit tests for Balances class"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
import itertools
import pytest
from . import testutil
from beancount.core.data import Open
from conservancy_beancount import data
from conservancy_beancount.reports import core
Fund = core.Fund
Period = core.Period
clean_account_meta = pytest.fixture(scope='module')(testutil.clean_account_meta)
@pytest.fixture(scope='module')
def income_expense_balances():
txns = []
prior_date = datetime.date(2019, 2, 2)
period_date = datetime.date(2019, 4, 4)
for (acct, post_type), fund in itertools.product([
('Income:Donations', 'Donations'),
('Income:Sales', 'RBI'),
('Expenses:Postage', 'fundraising'),
('Expenses:Postage', 'management'),
('Expenses:Postage', 'program'),
('Expenses:Services', 'fundraising'),
('Expenses:Services', 'program'),
], ['Conservancy', 'Alpha']):
root_acct, _, classification = acct.partition(':')
try:
data.Account(acct).meta
except KeyError:
data.Account.load_opening(Open(
{'classification': classification},
datetime.date(2000, 1, 1),
acct, None, None,
))
meta = {
'project': fund,
f'{root_acct.lower().rstrip("s")}-type': post_type,
}
sign = '' if root_acct == 'Expenses' else '-'
txns.append(testutil.Transaction(date=prior_date, postings=[
(acct, f'{sign}2.40', meta),
]))
txns.append(testutil.Transaction(date=period_date, postings=[
(acct, f'{sign}2.60', meta),
]))
return core.Balances(
data.Posting.from_entries(txns),
datetime.date(2019, 3, 1),
datetime.date(2020, 3, 1),
)
@pytest.mark.parametrize('kwargs,expected', [
({'account': 'Income:Donations'}, -10),
({'account': 'Income'}, -20),
({'account': 'Income:Nonexistent'}, None),
({'classification': 'Postage'}, 30),
({'classification': 'Services'}, 20),
({'classification': 'Nonexistent'}, None),
({'period': Period.PRIOR, 'account': 'Income'}, '-9.60'),
({'period': Period.PERIOD, 'account': 'Expenses'}, 26),
({'fund': Fund.RESTRICTED, 'account': 'Income'}, -10),
({'fund': Fund.UNRESTRICTED, 'account': 'Expenses'}, 25),
({'post_type': 'fundraising'}, 20),
({'post_type': 'management'}, 10),
({'post_type': 'Nonexistent'}, None),
({'period': Period.PRIOR, 'post_type': 'fundraising'}, '9.60'),
({'fund': Fund.RESTRICTED, 'post_type': 'program'}, 10),
({'period': Period.PRIOR, 'fund': Fund.RESTRICTED, 'post_type': 'program'}, '4.80'),
({'period': Period.PERIOD, 'fund': Fund.RESTRICTED, 'post_type': 'ø'}, None),
({'account': ('Income', 'Expenses')}, 30),
({'account': ('Income', 'Expenses'), 'fund': Fund.UNRESTRICTED}, 15),
])
def test_balance_total(income_expense_balances, kwargs, expected):
actual = income_expense_balances.total(**kwargs)
if expected is None:
assert not actual
else:
assert actual == {'USD': testutil.Amount(expected)}