diff --git a/conservancy_beancount/reconcile/prototype_amex_reconciler.py b/conservancy_beancount/reconcile/prototype_amex_reconciler.py index 5126875..5a9b012 100644 --- a/conservancy_beancount/reconcile/prototype_amex_reconciler.py +++ b/conservancy_beancount/reconcile/prototype_amex_reconciler.py @@ -1,5 +1,12 @@ """Reconcile an AMEX CSV statement against the books and print differences. +Beancount users often write importers to create bookkeeping entries direct from +a bank statement or similar. That approach automates data entry and +reconciliation in one step. In some cases though, it's useful to manually enter +transactions and reconcile them later on. This workflow helpful in cases like +writing a paper check when there's a time lag between committing to making a +payment and the funds being debited. That's the workflow we're using here. + Run like this: $ python3 -m pip install thefuzz @@ -36,18 +43,31 @@ TODO/ISSUES: """ import argparse +import collections import csv import datetime import decimal +import io import os -from typing import Dict, List, Tuple +import sys +from typing import Callable, Dict, List, Tuple, TextIO from beancount import loader from beancount.query.query import run_query + +if not sys.warnoptions: + import warnings + # Disable annoying warning from thefuzz prompting for a C extension. The + # current pure-Python implementation isn't a bottleneck for us. + warnings.filterwarnings('ignore', category=UserWarning, module='thefuzz.fuzz') from thefuzz import fuzz # type: ignore # NOTE: Statement doesn't seem to give us a running balance or a final total. +def read_transactions_from_csv(f: TextIO, standardize_statement_record: Callable) -> list: + reader = csv.DictReader(f) + return sort_records([standardize_statement_record(row, reader.line_num) for row in reader]) + def standardize_amex_record(row: Dict, line: int) -> Dict: """Turn an AMEX CSV row into a standard dict format representing a transaction.""" @@ -59,6 +79,15 @@ def standardize_amex_record(row: Dict, line: int) -> Dict: } +def standardize_fr_record(row: Dict, line: int) -> Dict: + return { + 'date': datetime.datetime.strptime(row['Date'], '%m/%d/%Y').date(), + 'amount': decimal.Decimal(row['Amount']), + 'payee': row['Detail'] or row['Description'], + 'line': line, + } + + def standardize_beancount_record(row) -> Dict: # type: ignore[no-untyped-def] """Turn a Beancount query result row into a standard dict representing a transaction.""" return { @@ -70,14 +99,6 @@ def standardize_beancount_record(row) -> Dict: # type: ignore[no-untyped-def] 'statement': row.posting_statement, } -def standardize_fr_record(row: Dict, line: int) -> Dict: - return { - 'date': datetime.datetime.strptime(row['Date'], '%m/%d/%Y').date(), - 'amount': decimal.Decimal(row['Amount']), - 'payee': row['Detail'] or row['Description'], - 'line': line, - } - def format_record(record: Dict) -> str: return f"{record['date'].isoformat()}: {record['amount']:12,.2f} {record['payee'][:20]:<20}" @@ -87,6 +108,49 @@ def sort_records(records: List) -> List: return sorted(records, key=lambda x: (x['date'], x['amount'])) +def match_statement_and_books(statement_trans: list, books_trans: list, show_reconciled_matches: bool, csv_statement: str) -> tuple[list, list, decimal.Decimal]: + matches = [] + metadata_to_apply = [] + total_matched = decimal.Decimal(0) + total_missing_from_books = decimal.Decimal(0) + total_missing_from_statement = decimal.Decimal(0) + + # Run through all the statement transactions to find a matching transaction in + # the books. If found, the books transaction is marked off so that it can only + # be matched once. Some transactions will be matched, some will be on the + # statement but not the books and some on the books but not the statement. + # + # Doesn't currently find exact matches when there are a bunch of transactions + # for the same amount on the same day. Probably ought to do a first pass through + # the books transactions to find an closely matching payee, then do another pass + # disregarding payee. + + # TODO: What if th + for r1 in statement_trans: + for r2 in books_trans: + match, note = records_match(r1, r2) + if match: + if not r2['statement'] or show_reconciled_matches: + matches.append([r2['date'], f'{format_record(r1)} → {format_record(r2)} ✓ {note}']) + total_matched += r2['amount'] + if not r2['statement']: + metadata_to_apply.append((r2['filename'], r2['line'], f' bank-statement: "{os.path.basename(csv_statement)}:{r2["line"]}"\n')) + books_trans.remove(r2) + break + else: + matches.append([r1['date'], f'{format_record(r1)} → {" ":^45} ✗ Not in books ({os.path.basename(csv_statement)}:{r1["line"]})']) + total_missing_from_books += r1['amount'] + for r2 in books_trans: + matches.append([r2['date'], f'{" ":^45} → {format_record(r2)} ✗ Not on statement ({os.path.basename(r2["filename"])}:{r2["line"]})']) + total_missing_from_statement += r2['amount'] + return matches, metadata_to_apply, total_matched, total_missing_from_books, total_missing_from_statement + +# TODO: Time for some test cases I think. + +# TODO: Could potentially return a score so that we can find the best match from +# a pool of candidates. How would be then remove that candidate from the global +# pool? + def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: """Do these records represent the same transaction?""" date_matches_exactly = r1['date'] == r2['date'] @@ -106,123 +170,126 @@ def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: return False, '' -parser = argparse.ArgumentParser(description='Reconciliation helper') -parser.add_argument('--beancount-file', required=True) -parser.add_argument('--csv-statement', required=True) -parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX') -parser.add_argument('--grep-output-filename') -# parser.add_argument('--report-group-regex') -parser.add_argument('--show-reconciled-matches', action='store_true') -parser.add_argument('--statement-total', required=True) -args = parser.parse_args() +# TODO: Is there a way to pull the side-effecting code out of this function? -statement_total = decimal.Decimal(args.statement_total) +def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> None: + """Insert reconciliation metadata in the books files. -# TODO: Should put in a sanity check to make sure the statement you're feeding -# in matches the account you've provided. -if 'AMEX' in args.account: - standardize_statement_record = standardize_amex_record -else: - standardize_statement_record = standardize_fr_record + Takes a list of edits to make as tuples of form (filename, lineno, metadata): -with open(args.csv_statement) as f: - reader = csv.DictReader(f) - statement_trans = sort_records([standardize_statement_record(row, reader.line_num) for row in reader]) + [ + ('2021/main.beancount', 4245, ' bank-statement: statement.pdf'), + ('2021/main.beancount', 1057, ' bank-statement: statement.pdf'), + ('2021/payroll.beancount', 257, ' bank-statement: statement.pdf'), + ..., + ] -begin_date = statement_trans[0]['date'] -end_date = statement_trans[-1]['date'] + """ + file_contents: dict[str, list] = {} + file_offsets: dict[str, int] = collections.defaultdict(int) + # Load each books file into memory and insert the relevant metadata lines. + # Line numbers change as we do this, so we keep track of the offset for each + # file. Changes must be sorted by line number first or else the offsets will + # break because we're jumping around making edits. + for filename, line, metadata in sorted(metadata_to_apply): + if filename not in file_contents: + with open(filename, 'r') as f: + file_contents[filename] = f.readlines() + # Insert is inefficient, but fast enough for now in practise. + file_contents[filename].insert(line + file_offsets[filename], metadata) + file_offsets[filename] += 1 + # Writes each updated file back to disk. + for filename, contents in file_contents.items(): + with open(filename, 'w') as f: + f.writelines(contents) + print(f'Wrote {filename}.') -# Do we traverse and filter the in-memory entries list and filter that, or do we -# use Beancount Query Language (BQL) to get a list of transactions? Currently -# using BQL. -# -# beancount.query.query_compile.compile() and -# beancount.query.query_execute.filter_entries() look useful in this respect, -# but I'm not clear on how to use compile(). An example would help. -entries, _, options = loader.load_file(args.beancount_file) -cleared_query = f"""SELECT sum(COST(position)) AS aa WHERE account = "{args.account}" - AND date <= {end_date.isoformat()} AND META('bank-statement') != NULL""" -result_types, result_rows = run_query( - entries, - options, - cleared_query, - numberify=True, -) +def parse_args(argv): + parser = argparse.ArgumentParser(description='Reconciliation helper') + parser.add_argument('--beancount-file', required=True) + parser.add_argument('--csv-statement', required=True) + parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX') + parser.add_argument('--grep-output-filename') + # parser.add_argument('--report-group-regex') + parser.add_argument('--show-reconciled-matches', action='store_true') + parser.add_argument('--statement-balance', type=decimal.Decimal, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals") + parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books") + return parser.parse_args(args=argv[1:]) -cleared_total = result_rows[0][0] -# String concatenation looks bad, but there's no SQL injection possible here -# because BQL can't write back to the Beancount files. I hope! -query = f"SELECT filename, META('lineno') AS posting_line, META('bank-statement') AS posting_statement, date, number(cost(position)), payee, narration where account = '{args.account}' and date >= {begin_date} and date <= {end_date}" -result_types, result_rows = run_query( - entries, - options, - query, -) - -books_trans = sort_records([standardize_beancount_record(row) for row in result_rows]) - -num_statement_records = len(statement_trans) -num_books_trans = len(books_trans) -statement_index = 0 -books_index = 0 -matches = [] -metadata_to_apply = [] - -# Run through all the statement transactions to find a matching transaction in -# the books. If found, the books transaction is marked off so that it can only -# be matched once. Some transactions will be matched, some will be on the -# statement but not the books and some on the books but not the statement. -# -# Doesn't currently find exact matches when there are a bunch of transactions -# for the same amount on the same day. Probably ought to do a first pass through -# the books transactions to find an closely matching payee, then do another pass -# disregarding payee. -for r1 in statement_trans: - for r2 in books_trans: - match, note = records_match(r1, r2) - if match: - if not r2['statement'] or args.show_reconciled_matches: - matches.append([r2['date'], f'{format_record(r1)} → {format_record(r2)} ✓ {note}']) - if not r2['statement']: - metadata_to_apply.append((r2['filename'], r2['line'], f' bank-statement: "{os.path.basename(args.csv_statement)}:{r2["line"]}"\n')) - books_trans.remove(r2) - break +def main(args): + # TODO: Should put in a sanity check to make sure the statement you're feeding + # in matches the account you've provided. + if 'AMEX' in args.account: + standardize_statement_record = standardize_amex_record else: - matches.append([r1['date'], f'{format_record(r1)} → {" ":^45} ✗ Not in books ({os.path.basename(args.csv_statement)}:{r1["line"]})']) -for r2 in books_trans: - matches.append([r2['date'], f'{" ":^45} → {format_record(r2)} ✗ Not on statement ({os.path.basename(r2["filename"])}:{r2["line"]})']) + standardize_statement_record = standardize_fr_record -print('-' * 155) -print(f'{"Statement transaction":<38} {"Books transaction":<44} Notes') -print('-' * 155) -for _, output in sorted(matches): - print(output) -print('-' * 155) -print(f'STATEMENT TOTAL: {statement_total}') -print(f'CLEARED TOTAL: {cleared_total:12,.2f}') -print('-' * 155) + with open(args.csv_statement) as f: + statement_trans = read_transactions_from_csv(f, standardize_statement_record) -# Write statement metadata back to books -if metadata_to_apply: - print('Mark matched transactions as reconciled in the books? (y/N) ', end='') - if input().lower() == 'y': - files = {} - # Query results aren't necessarily sequential in a file, so need to sort - # so that our line number offsets work. - for filename, line, metadata in sorted(metadata_to_apply): - if filename not in files: - with open(filename, 'r') as f: - # print(f'Opening {filename}.') - files[filename] = [0, f.readlines()] # Offset and contents - files[filename][1].insert(line + files[filename][0], metadata) - files[filename][0] += 1 - # print(f'File {filename} offset {files[filename][0]}') - for filename in files: - with open(filename, 'w') as f: - f.writelines(files[filename][1]) - print(f'Wrote {filename}.') + begin_date = statement_trans[0]['date'] + end_date = statement_trans[-1]['date'] + + # Do we traverse and filter the in-memory entries list and filter that, or do we + # use Beancount Query Language (BQL) to get a list of transactions? Currently + # using BQL. + # + # beancount.query.query_compile.compile() and + # beancount.query.query_execute.filter_entries() look useful in this respect, + # but I'm not clear on how to use compile(). An example would help. + entries, _, options = loader.load_file(args.beancount_file) + + books_balance_query = f"""SELECT sum(COST(position)) AS aa WHERE account = "{args.account}" + AND date <= {end_date.isoformat()}""" + result_types, result_rows = run_query(entries, options, books_balance_query, numberify=True) + books_balance = result_rows[0][0] if result_rows else 0 + + books_balance_reconciled_query = f"""SELECT sum(COST(position)) AS aa WHERE account = "{args.account}" + AND date <= {end_date.isoformat()} AND META('bank-statement') != NULL""" + result_types, result_rows = run_query(entries, options, books_balance_reconciled_query, numberify=True) + books_balance_reconciled = result_rows[0][0] if result_rows else 0 + + # String concatenation looks bad, but there's no SQL injection possible here + # because BQL can't write back to the Beancount files. I hope! + query = f"SELECT filename, META('lineno') AS posting_line, META('bank-statement') AS posting_statement, date, number(cost(position)), payee, narration where account = '{args.account}' and date >= {begin_date} and date <= {end_date}" + result_types, result_rows = run_query(entries, options, query) + + books_trans = sort_records([standardize_beancount_record(row) for row in result_rows]) + + matches, metadata_to_apply, total_matched, total_missing_from_books, total_missing_from_statement = match_statement_and_books( + statement_trans, books_trans, args.show_reconciled_matches, args.csv_statement) + + out = io.StringIO() + print('-' * 155) + print(f'{"Statement transaction":<38} {"Books transaction":<44} Notes') + print('-' * 155) + for _, output in sorted(matches): + print(output) + print('-' * 155) + print(f'Period: {begin_date} to {end_date}') + print(f'Statement/cleared balance: {args.statement_balance:12,.2f} (as provided by you)') + print(f'Books balance (all): {books_balance:12,.2f} (all transactions, includes unreconciled)') + print(f'Books balance (reconciled): {books_balance_reconciled:12,.2f} (transactions with "bank-statement" tag only)') + print(f'Matched above: {total_matched:12,.2f} ("bank-statement" tag yet to be applied)') + print(f'On statement only: {total_missing_from_books:12,.2f} (no match in books)') + print(f'On books only: {total_missing_from_statement:12,.2f} (no match on statement)') + print('-' * 155) + # print(f'Remaning to reconcile: {books_balance - books_balance_reconciled - total_matched:12,.2f}') + # print(f'Total reconciled inc. above: {books_balance_reconciled + total_matched:12,.2f}') + # print('-' * 155) + + # Write statement metadata back to books + if metadata_to_apply and not args.non_interactive: + print('Mark matched transactions as reconciled in the books? (y/N) ', end='') + if input().lower() == 'y': + write_metadata_to_books(metadata_to_apply) + + +if __name__ == '__main__': + args = parse_args(sys.argv) + main(args) # Local Variables: # python-shell-interpreter: "/home/ben/\.virtualenvs/conservancy-beancount-py39/bin/python"