reconcile: New statement reconciler.

For now, this is basically just a specialized ledger report. It highlights
rows that already appear reconciled, and reports different balances, with
appropriate formulas to assist interactive reconciliation.

In the future I hope we can extend this to read various CSV statements and
then highlight rows different based on discrepancies between the statement
and the books, sort of like the PayPal reconciler does now.
This commit is contained in:
Brett Smith 2021-01-31 17:06:40 -05:00
parent ca38e45178
commit f23e0431e2
2 changed files with 455 additions and 1 deletions

View file

@ -0,0 +1,453 @@
"""statement.py - Reconciliation report based on statement metadata"""
# Copyright © 2021 Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.
import argparse
import collections
import datetime
import enum
import logging
import os
import sys
from typing import (
Any,
Collection,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Union,
)
from ..beancount_types import (
MetaKey,
)
from pathlib import Path
import odf.table # type:ignore[import]
from beancount.parser import printer as bc_printer
from .. import books
from .. import cliutil
from .. import config as configmod
from .. import data
from .. import rtutil
from ..ranges import DateRange
from ..reports import core
PROGNAME = 'reconcile-statement'
logger = logging.getLogger('conservancy_beancount.reconcile.statement')
class StatementReconciliation(core.BaseODS[data.Posting, data.Account]):
COLUMNS: Mapping[str, Union[None, int, float]] = collections.OrderedDict([
("Date", None),
(data.Metadata.human_name('entity'), 1.5),
("Description", 2),
("Amount", 1),
("ID", 1.5),
("Statement", 2),
])
ID_META_MAP: Mapping[str, MetaKey] = collections.OrderedDict([
('Assets,PayPal', 'paypal-id'),
('Assets', 'check-id'),
])
STATEMENT_META_MAP: Mapping[str, MetaKey] = {
'Assets': 'bank-statement',
'Expenses': 'tax-statement',
}
class Display(enum.Enum):
RECONCILED = '#00ffff'
NORMAL = '<normal>'
HIDDEN = '<hidden>'
def __init__(
self,
rt_wrapper: Optional[rtutil.RT],
pre_range: DateRange,
rec_range: DateRange,
post_range: DateRange,
accounts: Collection[data.Account],
statement_metakey: Optional[str]=None,
id_metakey: Optional[str]=None,
) -> None:
super().__init__(rt_wrapper)
self.pre_range = pre_range
self.rec_range = rec_range
self.post_range = post_range
self.accounts = sorted(accounts)
self.statement_metakey = statement_metakey
self.id_metakey = id_metakey
def section_key(self, row: data.Posting) -> data.Account:
return row.account
def init_styles(self) -> None:
super().init_styles()
self.style_baldate = self.merge_styles(self.style_date, self.style_bold)
self.style_balance = self.merge_styles(self.style_bold, self.style_endtotal)
def start_section(self, account: str) -> None:
self.use_sheet(account.replace(':', ' '))
for width in self.COLUMNS.values():
col_style = None if width is None else self.column_style(width)
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
self.add_row(*(
self.string_cell(text, stylename=self.style_bold)
for text in self.COLUMNS
))
self.lock_first_row()
def _date_range_posts(
self,
postings: Sequence[data.Posting],
date_range: DateRange,
) -> core.RelatedPostings:
return core.RelatedPostings(p for p in postings if p.meta.date in date_range)
def _most_common_first_link(self, posts: core.RelatedPostings, key: MetaKey) -> str:
counter = collections.Counter(post.meta.first_link(key, '') for post in posts)
for value, _ in counter.most_common(2):
if value:
return value
else:
return ''
def write_posts(
self,
posts: core.RelatedPostings,
statement_metakey: MetaKey,
id_metakey: MetaKey,
common_display: Display=Display.NORMAL,
common_statement: Optional[str]=None,
) -> Tuple[int, int]:
if common_statement is None:
common_statement = self._most_common_first_link(posts, statement_metakey)
if common_display.value.startswith('#'):
common_style: Optional[odf.style.Style] = self.border_style(
core.Border.LEFT, '10pt', 'solid', common_display.value,
)
else:
common_style = None
self.add_row()
start_row = self.row_count()
for posting in posts:
if posting.meta.first_link(statement_metakey) == common_statement:
if common_display is self.Display.HIDDEN:
continue
row_style = self.merge_styles(self.style_date, common_style)
else:
row_style = self.style_date
self.add_row(
self.date_cell(posting.meta.date, stylename=row_style),
self.string_cell(posting.meta.get('entity', '<none>')),
self.string_cell(posting.meta.txn.narration),
self.currency_cell(posting.units),
self.string_cell(posting.meta.get(id_metakey, ''), stylename=self.style_endtext),
self.meta_links_cell(posting.meta.report_links(statement_metakey)),
)
end_row = self.row_count()
if start_row == end_row:
self.sheet.childNodes.pop()
else:
start_row += 1
return (start_row, end_row)
def write_pre_posts(
self,
posts: core.RelatedPostings,
statement_metakey: MetaKey,
id_metakey: MetaKey,
) -> None:
posts = self._date_range_posts(posts, self.pre_range)
header_cell = self.string_cell(
"",
stylename=self.merge_styles(self.style_centertext, self.style_bold),
numbercolumnsspanned=len(self.COLUMNS),
)
self.add_row()
self.add_row(header_cell)
start_row, end_row = self.write_posts(
posts, statement_metakey, id_metakey, self.Display.HIDDEN,
)
if start_row == end_row:
text = "No unreconciled postings found in prior period"
else:
text = "Unreconciled postings in prior period"
header_cell.firstChild.addText(text)
def write_bal_row(
self,
description: str,
balance: core.Balance,
balance_style: Union[None, str, odf.style.Style]=None,
date: Optional[datetime.date]=None,
) -> odf.table.TableRow:
balance_style = self.merge_styles(self.style_bold, balance_style)
if date is None:
date_cell = odf.table.TableCell()
else:
date_cell = self.date_cell(date, stylename=self.style_baldate)
return self.add_row(
date_cell,
odf.table.TableCell(),
self.string_cell(description, stylename=self.style_bold),
self.balance_cell(balance, stylename=balance_style),
)
def _address(
self,
start_col: str,
start_row: Optional[int]=None,
end_row: Optional[int]=None,
end_col: Optional[str]=None,
) -> str:
if '.' not in start_col:
start_col = f'.{start_col}'
if start_row is None:
start_row = 1
if end_row is None:
end_row = 2 ** 20
if end_row is None:
return f'[{start_col}{start_row}]'
if end_col is None:
end_col = start_col
elif '.' not in end_col:
end_col = f'.{end_col}'
return f'[{start_col}{start_row}:{end_col}{end_row}]'
def write_rec_posts(
self,
posts: core.RelatedPostings,
statement_metakey: MetaKey,
id_metakey: MetaKey,
open_balance: core.Balance,
) -> None:
posts = self._date_range_posts(posts, self.rec_range)
period_bal = posts.balance()
common_statement = self._most_common_first_link(posts, statement_metakey)
self.add_row()
self.write_bal_row(
"Opening Balance", open_balance, self.style_endtotal, self.rec_range.start,
)
start_row, end_row = self.write_posts(
posts, statement_metakey, id_metakey, self.Display.RECONCILED, common_statement,
)
post_range = self._address('D', start_row, end_row)
rec_addr = self._address('D', end_row + 3)
self.add_row()
row = self.write_bal_row("Unreconciled Total", core.Balance(
p.units for p in posts if p.meta.first_link(statement_metakey) != common_statement
), self.style_total)
row.lastChild.setAttribute('formula', f'of:=SUM({post_range})-{rec_addr}')
row = self.write_bal_row("Reconciled Total", core.Balance(
p.units for p in posts if p.meta.first_link(statement_metakey) == common_statement
))
stmt_range = self._address('F', start_row, end_row)
if common_statement:
_, stmt_op = next(self._meta_link_pairs((common_statement,)))
stmt_op += '*'
else:
stmt_op = '<>'
row.lastChild.setAttribute(
'formula',
f'of:=SUMIFS({post_range};{stmt_range};"{stmt_op}")',
)
row = self.write_bal_row("Period Total", period_bal, self.style_total)
row.lastChild.setAttribute('formula', f'of:=SUM({post_range})')
row = self.write_bal_row(
"Ending Balance",
open_balance + period_bal,
self.style_bottomline,
self.rec_range.stop - datetime.timedelta(days=1),
)
bal_addr = self._address('D', start_row - 2)
row.lastChild.setAttribute('formula', f'of:=SUM({post_range})+{bal_addr}')
def _get_metakey(self, account: data.Account, key: str, default: str) -> str:
user_spec: Optional[str] = getattr(self, f'{key.lower()}_metakey')
if user_spec is not None:
return user_spec
account_map: Mapping[str, MetaKey] = getattr(self, f'{key.upper()}_META_MAP')
acct_key = account.is_under(*account_map)
if acct_key is None:
return default
else:
return account_map[acct_key]
def write(self, rows: Iterable[data.Posting]) -> None:
acct_posts = dict(core.RelatedPostings.group_by_account(
post for post in rows
if post.meta.date < self.post_range.stop
and post.account.is_under(*self.accounts)
))
for account in self.accounts:
try:
postings = acct_posts[account]
except KeyError:
postings = core.RelatedPostings()
statement_metakey = self._get_metakey(account, 'statement', 'statement')
id_metakey = self._get_metakey(account, 'id', 'lineno')
open_balance = core.Balance()
for post, open_balance in postings.iter_with_balance():
if post.meta.date >= self.rec_range.start:
break
after_posts = self._date_range_posts(postings, self.post_range)
self.start_section(account)
self.write_pre_posts(postings, statement_metakey, id_metakey)
self.write_rec_posts(postings, statement_metakey, id_metakey, open_balance)
self.write_posts(after_posts, statement_metakey, id_metakey)
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(prog=PROGNAME)
cliutil.add_version_argument(parser)
cliutil.add_loglevel_argument(parser)
parser.add_argument(
'--begin', '--start', '-b',
dest='start_date',
metavar='DATE',
type=cliutil.date_arg,
required=True,
help="""Date to start reconciliation, inclusive, in YYYY-MM-DD format.
""")
parser.add_argument(
'--end', '--stop', '-e',
dest='stop_date',
metavar='DATE',
type=cliutil.date_arg,
help="""Date to end reconciliation, exclusive, in YYYY-MM-DD format.
The default is one month after the start date.
""")
parser.add_argument(
'--account', '-a',
dest='accounts',
metavar='ACCOUNT',
action='append',
help="""Reconcile this account. You can specify this option
multiple times. You can specify a part of the account hierarchy, or an account
classification from metadata. Default 'Cash'.
""")
parser.add_argument(
'--id-metadata-key', '-i',
metavar='METAKEY',
help="""Show the named metadata as a posting identifier.
Default varies by account.
""")
parser.add_argument(
'--statement-metadata-key', '-s',
metavar='METAKEY',
help="""Use the named metadata to determine which postings have already
been reconciled. Default varies by account.
""")
parser.add_argument(
'--output-file', '-O',
metavar='PATH',
type=Path,
help="""Write the report to this file, or stdout when PATH is `-`.
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
""")
args = parser.parse_args(arglist)
if not args.accounts:
args.accounts = ['Cash']
return args
def main(arglist: Optional[Sequence[str]]=None,
stdout: TextIO=sys.stdout,
stderr: TextIO=sys.stderr,
config: Optional[configmod.Config]=None,
) -> int:
args = parse_arguments(arglist)
cliutil.set_loglevel(logger, args.loglevel)
if config is None:
config = configmod.Config()
config.load_file()
if args.stop_date is None:
year_diff, smonth = divmod(args.start_date.month, 12)
syear = args.start_date.year + year_diff
smonth += 1
try:
args.stop_date = datetime.date(syear, smonth, args.start_date.day)
except ValueError:
year_diff, smonth = divmod(smonth, 12)
syear += year_diff
smonth += 1
args.stop_date = datetime.date(syear, smonth, 1)
days_diff = args.stop_date - args.start_date
pre_range = DateRange(args.start_date - days_diff, args.start_date)
rec_range = DateRange(args.start_date, args.stop_date)
post_range = DateRange(args.stop_date, args.stop_date + days_diff)
returncode = os.EX_OK
books_loader = config.books_loader()
if books_loader is None:
entries, load_errors, options = books.Loader.load_none(config.config_file_path())
returncode = cliutil.ExitCode.NoConfiguration
else:
entries, load_errors, options = books_loader.load_fy_range(pre_range.start, post_range.stop)
if load_errors:
returncode = cliutil.ExitCode.BeancountErrors
elif not entries:
returncode = cliutil.ExitCode.NoDataLoaded
data.Account.load_from_books(entries, options)
real_accounts: Set[data.Account] = set()
for account_spec in args.accounts:
new_accounts = frozenset(
account
for account in data.Account.iter_accounts(account_spec)
if account.is_open_on_date(args.start_date) is not False
)
if new_accounts:
real_accounts.update(new_accounts)
else:
logger.critical("account %r did not match any open accounts", account_spec)
return 2
for error in load_errors:
bc_printer.print_error(error, file=stderr)
rt_wrapper = config.rt_wrapper()
if rt_wrapper is None:
logger.warning("could not initialize RT client; spreadsheet links will be broken")
report = StatementReconciliation(
rt_wrapper,
pre_range,
rec_range,
post_range,
real_accounts,
args.statement_metadata_key,
args.id_metadata_key,
)
report.write(data.Posting.from_entries(entries))
if args.output_file is None:
out_dir_path = config.repository_path() or Path()
args.output_file = out_dir_path / 'ReconciliationReport_{}_{}.ods'.format(
args.start_date.isoformat(),
args.stop_date.isoformat(),
)
logger.info("Writing report to %s", args.output_file)
ods_file = cliutil.bytes_output(args.output_file, stdout)
report.save_file(ods_file)
return returncode
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
if __name__ == '__main__':
exit(entry_point())

View file

@ -5,7 +5,7 @@ from setuptools import setup
setup(
name='conservancy_beancount',
description="Plugin, library, and reports for reading Conservancy's books",
version='1.16.2',
version='1.17.0',
author='Software Freedom Conservancy',
author_email='info@sfconservancy.org',
license='GNU AGPLv3+',
@ -54,6 +54,7 @@ setup(
'pdfform-extract-irs990scheduleA = conservancy_beancount.pdfforms.extract.irs990scheduleA:entry_point',
'pdfform-fill = conservancy_beancount.pdfforms.fill:entry_point',
'reconcile-paypal = conservancy_beancount.reconcile.paypal:entry_point',
'reconcile-statement = conservancy_beancount.reconcile.statement:entry_point',
'split-ods-links = conservancy_beancount.tools.split_ods_links:entry_point',
],
},