257 lines
10 KiB
Python
257 lines
10 KiB
Python
"""Tool to help identify unreconciled postings.
|
|
|
|
Run like this:
|
|
|
|
python conservancy_beancount/reconcile/helper.py \
|
|
--beancount-file=$HOME/conservancy/beancount/books/2021.beancount \
|
|
--prev-end-date=2021-05-13 --cur-end-date=2021-12-31 \
|
|
--account="Liabilities:CreditCard:AMEX"
|
|
|
|
In the spirit of bc-reconcile-helper.plx (the original Perl code)
|
|
|
|
Not implemented:
|
|
- --report-group-regex
|
|
- git branch selection from bean-query-goofy-daemon.plx
|
|
|
|
"""
|
|
import argparse
|
|
import csv
|
|
from dateutil.relativedelta import relativedelta
|
|
import datetime
|
|
import decimal
|
|
import io
|
|
import sys
|
|
import tempfile
|
|
import textwrap
|
|
import typing
|
|
from typing import List
|
|
import os
|
|
|
|
from beancount import loader
|
|
from beancount.query.query import run_query
|
|
|
|
|
|
def end_of_month(date: datetime.date) -> datetime.date:
|
|
"""Given a date, return the last day of the month."""
|
|
# Using 'day' replaces, rather than adds.
|
|
return date + relativedelta(day=31)
|
|
|
|
|
|
def format_record_for_grep(row: typing.List, homedir: str) -> typing.List:
|
|
"""Return a line in a grep-style.
|
|
|
|
This is so the line can be fed into Emacs grep-mode for quickly jumping to
|
|
the relevant lines in the books.
|
|
"""
|
|
file = row[0].replace(homedir, '~')
|
|
return [f'{file}:{row[1]}:'] + row[2:]
|
|
|
|
|
|
def max_column_widths(rows: List) -> List[int]:
|
|
"""Return the max width for each column in a table of data."""
|
|
if not rows:
|
|
return []
|
|
else:
|
|
maxes = [0] * len(rows[0])
|
|
for row in rows:
|
|
for i, val in enumerate(row):
|
|
length = len(str(val))
|
|
maxes[i] = max(maxes[i], length)
|
|
return maxes
|
|
|
|
|
|
def tabulate(rows: List, headers: List = None) -> str:
|
|
"""Format a table of data as a string.
|
|
|
|
Implemented here to avoid adding dependency on "tabulate" package.
|
|
"""
|
|
output = io.StringIO()
|
|
if headers:
|
|
rows = [headers] + rows
|
|
widths = max_column_widths(rows)
|
|
for row in rows:
|
|
for i, col in enumerate(row):
|
|
width = widths[i]
|
|
if col is None:
|
|
print(' ' * width, end=' ', file=output)
|
|
elif isinstance(col, str):
|
|
print((str(col)).ljust(width), end=' ', file=output)
|
|
else:
|
|
print((str(col)).rjust(width), end=' ', file=output)
|
|
print('', file=output)
|
|
return output.getvalue().strip()
|
|
|
|
|
|
def reconciliation_report(account, end_date, bank_account_balance, uncleared, prev_end_date, our_account_balance, prev_uncleared):
|
|
*_, account_name = account.rpartition(':')
|
|
# end_date_iso = end_date.isoformat()
|
|
# prev_end_date_iso = prev_end_date.isoformat()
|
|
output = io.StringIO()
|
|
w = csv.writer(output)
|
|
w.writerow([f'title:{end_date}: {account_name}'])
|
|
w.writerow(['BANK RECONCILIATION', 'ENDING', end_date])
|
|
w.writerow([f' {account}'])
|
|
w.writerow([])
|
|
w.writerow(['DATE', 'CHECK NUM', 'PAYEE', 'AMOUNT'])
|
|
w.writerow([])
|
|
w.writerow([end_date, '', 'BANK ACCOUNT BALANCE', bank_account_balance])
|
|
w.writerow([])
|
|
for trans in uncleared:
|
|
w.writerow(trans)
|
|
w.writerow([])
|
|
w.writerow([end_date, '', 'OUR ACCOUNT BALANCE', our_account_balance])
|
|
if prev_uncleared:
|
|
w.writerow([])
|
|
w.writerow(['Items Still', 'Outstanding On', prev_end_date, 'Appeared By', end_date])
|
|
w.writerow([])
|
|
for trans in prev_uncleared:
|
|
w.writerow(trans)
|
|
return output.getvalue()
|
|
|
|
|
|
def reconciliation_report_path(account, end_date):
|
|
*_, account_name = account.rpartition(':')
|
|
return f'Financial/Controls/Reports-for-Treasurer/{end_date}_{account_name}_bank-reconciliation.csv'
|
|
|
|
|
|
def parse_args(argv):
|
|
parser = argparse.ArgumentParser(description='Reconciliation helper')
|
|
parser.add_argument('--beancount-file', required=True)
|
|
parser.add_argument('--account', help='Full account name, e.g. "Liabilities:CreditCard:AMEX"', required=True)
|
|
parser.add_argument('--prev-end-date', type=datetime.date.fromisoformat)
|
|
parser.add_argument('--cur-end-date', type=datetime.date.fromisoformat)
|
|
parser.add_argument('--month', help='YYYY-MM of ending month. Use with --period.')
|
|
parser.add_argument('--period', help='Months in the past to consider. Use with --month.', type=int, choices=[1, 3, 12])
|
|
parser.add_argument('--statement-match')
|
|
parser.add_argument('--cost-function', default='COST')
|
|
parser.add_argument('--grep-output-filename')
|
|
# parser.add_argument('--report-group-regex')
|
|
args = parser.parse_args(args=argv[1:])
|
|
if args.month or args.period:
|
|
if not (args.month and args.period):
|
|
parser.error('--month and --period must be used together')
|
|
else:
|
|
if not (args.cur_end_date and args.prev_end_date):
|
|
parser.error(' --prev-end-date and --cur-end-date must be used together')
|
|
return args
|
|
|
|
|
|
def beancount_file_exists(path):
|
|
return os.path.isfile(path)
|
|
|
|
|
|
def main(args):
|
|
if not beancount_file_exists(args.beancount_file):
|
|
sys.exit(f'Beancount file does not exist: {args.beancount_file}')
|
|
if args.month or args.period:
|
|
parsed_date = datetime.datetime.strptime(args.month, '%Y-%m').date()
|
|
preDate = end_of_month(parsed_date - relativedelta(months=args.period)).isoformat()
|
|
lastDateInPeriod = end_of_month(parsed_date).isoformat()
|
|
month = args.month
|
|
else:
|
|
preDate = args.prev_end_date
|
|
lastDateInPeriod = args.cur_end_date.isoformat()
|
|
month = args.cur_end_date.strftime('%Y-%m')
|
|
grep_output_file: typing.IO
|
|
if args.grep_output_filename:
|
|
grep_output_file = open(args.grep_output_filename, 'w')
|
|
else:
|
|
grep_output_file = tempfile.NamedTemporaryFile(prefix='bc-reconcile-grep-output_', mode='w', delete=False)
|
|
beancount_file = args.beancount_file
|
|
account = args.account
|
|
cost_function = args.cost_function
|
|
statement_match = args.statement_match if args.statement_match else month
|
|
|
|
QUERIES = {
|
|
f"00: CLEARED BAL ENDING DAY BEFORE {preDate}":
|
|
# $CONLEDGER -V -C -e "$preDate" bal "/$acct/"
|
|
f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
|
|
AND date < {preDate} AND META('bank-statement') != NULL""",
|
|
|
|
f"01: ALL TRANSACTION BAL ENDING DAY BEFORE {preDate}":
|
|
# $CONLEDGER -V -e "$preDate" bal "/$acct/"
|
|
f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
|
|
AND date < {preDate}""",
|
|
|
|
f"02: ALL TRANSACTION BAL, ending {lastDateInPeriod}":
|
|
# $CONLEDGER -V -e "$date" bal "/$acct/"
|
|
f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
|
|
AND date <= {lastDateInPeriod}""",
|
|
|
|
f"03: UNCLEARED TRANSACTIONS, ending {lastDateInPeriod}":
|
|
f"""SELECT date, {cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ENTRY_META('code') as code
|
|
WHERE account = "{account}"
|
|
AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
|
|
ORDER BY date, payee, narration""",
|
|
|
|
"04: UNCLEARED TRANSACTION FILE, SUITABLE FOR GREP":
|
|
# $CONLEDGER -w -F "%(filename):%(beg_line): %(date) %(code) %(payee) %(amount)\n" --sort d -U -e "$date" reg "/$acct/" > "$TMPDIR/unreconciled-lines"
|
|
f"""SELECT ENTRY_META('filename') as file, META('lineno') as line, date,
|
|
{cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ANY_META("entity") as entity, ENTRY_META('code') as c
|
|
WHERE account = "{account}"
|
|
AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
|
|
ORDER BY date, payee, narration""",
|
|
|
|
f"05: CLEARED BALANCE ending {lastDateInPeriod}":
|
|
# $CONLEDGER -V -C -e "$date" bal "/$acct/"
|
|
f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
|
|
AND date <= {lastDateInPeriod} AND META('bank-statement') != NULL""",
|
|
|
|
f"06: CLEARED SUBTRACTIONS on {month}'s statement":
|
|
# $CONLEDGER -V -C --limit "a > 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
|
|
f"""SELECT sum(number({cost_function}(position))) AS aa
|
|
WHERE account = "{account}"
|
|
and META("bank-statement") ~ "{statement_match}" and number({cost_function}(position)) < 0""",
|
|
|
|
f"07: CLEARED ADDITIONS on {month}'s statement":
|
|
# $CONLEDGER -V -C --limit "a < 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
|
|
f"""SELECT sum(number({cost_function}(position))) AS aa
|
|
WHERE account = "{account}"
|
|
and META("bank-statement") ~ "{statement_match}" and number({cost_function}(position)) > 0""",
|
|
}
|
|
|
|
# Run Beancount queries.
|
|
print(f"START RECONCILIATION FOR {account} ENDING {lastDateInPeriod} (previous end date {preDate})")
|
|
entries, _, options = loader.load_file(beancount_file)
|
|
uncleared_rows = [] # Hack to capture results of query 03.
|
|
cleared_balance = decimal.Decimal('0')
|
|
all_trans_balance = decimal.Decimal('0')
|
|
for desc, query in QUERIES.items():
|
|
rtypes, rrows = run_query(entries, options, query, numberify=True)
|
|
if not rrows:
|
|
print(f'{desc:<55} {"N/A":>11}')
|
|
elif desc.startswith('04'):
|
|
homedir = os.getenv('HOME', '')
|
|
print(f'{desc}\n See {grep_output_file.name}')
|
|
grep_rows = [format_record_for_grep(row, homedir) for row in rrows]
|
|
print(tabulate(grep_rows), file=grep_output_file)
|
|
elif len(rrows) == 1 and isinstance(rrows[0][0], decimal.Decimal):
|
|
result = rrows[0][0]
|
|
print(f'{desc:<55} {result:11,.2f}')
|
|
if desc.startswith('02'):
|
|
all_trans_balance = result
|
|
if desc.startswith('05'):
|
|
cleared_balance = result
|
|
else:
|
|
headers = [c[0].capitalize() for c in rtypes]
|
|
if desc.startswith('03'):
|
|
uncleared_rows = rrows
|
|
print(desc)
|
|
print(textwrap.indent(tabulate(rrows, headers=headers), ' '))
|
|
|
|
uncleared = [(r[0], r[2], r[4] or r[3], r[1]) for r in uncleared_rows]
|
|
report_path = os.path.join(os.getenv('CONSERVANCY_REPOSITORY', ''), reconciliation_report_path(account, lastDateInPeriod))
|
|
# TODO: Make the directory if it doesn't exist.
|
|
with open(report_path, 'w') as f:
|
|
f.write(reconciliation_report(account, lastDateInPeriod, cleared_balance, uncleared, '1900-01-01', all_trans_balance, []))
|
|
print(f'Wrote reconciliation report: {report_path}.')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
args = parse_args(sys.argv)
|
|
main(args)
|
|
|
|
|
|
def entry_point():
|
|
args = parse_args(sys.argv)
|
|
main(args)
|