diff --git a/conservancy_beancount/reconcile/statement_reconciler.py b/conservancy_beancount/reconcile/statement_reconciler.py index 6343ed5..df665ed 100644 --- a/conservancy_beancount/reconcile/statement_reconciler.py +++ b/conservancy_beancount/reconcile/statement_reconciler.py @@ -58,7 +58,7 @@ from typing import Callable, Dict, List, Tuple, TextIO from beancount import loader from beancount.query.query import run_query -from colorama import Fore, Style +from colorama import Fore, Style # type: ignore if not sys.warnoptions: import warnings @@ -96,7 +96,7 @@ JUNK_WORDS = [ JUNK_WORDS_RES = [re.compile(word, re.IGNORECASE) for word in JUNK_WORDS] ZERO_RE = re.compile('^0+') -def remove_duplicate_words(text): +def remove_duplicate_words(text: str) -> str: unique_words = [] known_words = set() for word in text.split(): @@ -123,18 +123,19 @@ def remove_payee_junk(payee: str) -> str: payee.strip() return payee -# NOTE: Statement doesn't seem to give us a running balance or a final total. def read_transactions_from_csv(f: TextIO, standardize_statement_record: Callable) -> list: reader = csv.DictReader(f) - return sort_records([standardize_statement_record(row, reader.line_num) for row in reader]) + # The reader.line_num is the source line number, not the spreadsheet row + # number due to multi-line records. + return sort_records([standardize_statement_record(row, i) for i, row in enumerate(reader, 2)]) -# Does the account you entered match the CSV? -# Is the CSV in the format we expect? (ie. did they download through the right interface?) -# Logical CSV line numbers -# CSV reconciliation report + +# NOTE: Statement doesn't seem to give us a running balance or a final total. +# CSV reconciliation report. # Merge helper script. + def standardize_amex_record(row: Dict, line: int) -> Dict: """Turn an AMEX CSV row into a standard dict format representing a transaction.""" return { @@ -148,6 +149,20 @@ def standardize_amex_record(row: Dict, line: int) -> Dict: } +def validate_amex_csv(sample: str, account: str) -> None: + required_cols = {'Date', 'Amount', 'Description', 'Card Member'} + reader = csv.DictReader(io.StringIO(sample)) + if reader.fieldnames and not required_cols.issubset(reader.fieldnames): + sys.exit(f"This CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}") + + +def validate_fr_csv(sample: str, account: str) -> None: + required_cols = {'Date', 'Amount', 'Detail', 'Serial Num'} + reader = csv.DictReader(io.StringIO(sample)) + if reader.fieldnames and not required_cols.issubset(reader.fieldnames): + sys.exit(f"This CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}") + + def standardize_fr_record(row: Dict, line: int) -> Dict: return { 'date': datetime.datetime.strptime(row['Date'], '%m/%d/%Y').date(), @@ -181,7 +196,7 @@ def format_record(record: dict) -> str: return output -def format_multirecord(r1s, r2s, note): +def format_multirecord(r1s: list[dict], r2s: list[dict], note: str) -> list[list]: total = sum(x['amount'] for x in r2s) assert len(r1s) == 1 assert len(r2s) > 1 @@ -191,26 +206,29 @@ def format_multirecord(r1s, r2s, note): match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}: ↳ → {format_record(r2)} ✓ Matched{note}']) return match_output + def sort_records(records: List) -> List: return sorted(records, key=lambda x: (x['date'], x['amount'])) -def first_word_exact_match(a, b): +def first_word_exact_match(a: str, b: str) -> float: if len(a) == 0 or len(b) == 0: - return 0 + return 0.0 first_a = a.split()[0].strip() first_b = b.split()[0].strip() if first_a.casefold() == first_b.casefold(): return min(1.0, 0.2 * len(first_a)) else: - return 0; + return 0.0; -def payee_match(a, b): - fuzzy_match = fuzz.token_set_ratio(a, b) / 100.00 + +def payee_match(a: str, b: str) -> float: + fuzzy_match = float(fuzz.token_set_ratio(a, b) / 100.00) first_word_match = first_word_exact_match(a, b) return max(fuzzy_match, first_word_match) -def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: + +def records_match(r1: Dict, r2: Dict) -> Tuple[float, List[str]]: """Do these records represent the same transaction?""" date_score = date_proximity(r1['date'], r2['date']) @@ -254,7 +272,7 @@ def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: return overall_score, overall_message -def match_statement_and_books(statement_trans: list, books_trans: list): +def match_statement_and_books(statement_trans: List[Dict], books_trans: List[Dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: """ Runs through all the statement transactions to find a matching transaction in the books. If found, the books transaction is marked off so that it can @@ -266,9 +284,9 @@ def match_statement_and_books(statement_trans: list, books_trans: list): remaining_statement_trans = [] for r1 in statement_trans: - best_match_score = 0 + best_match_score = 0.0 best_match_index = None - best_match_note = '' + best_match_note = [] matches_found = 0 for i, r2 in enumerate(books_trans): score, note = records_match(r1, r2) @@ -280,7 +298,8 @@ def match_statement_and_books(statement_trans: list, books_trans: list): if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: matches.append(([r1], [books_trans[best_match_index]], best_match_note)) # Don't try to make a second match against this books entry. - del books_trans[best_match_index] + if best_match_index is not None: + del books_trans[best_match_index] else: remaining_statement_trans.append(r1) for r2 in books_trans: @@ -288,7 +307,9 @@ def match_statement_and_books(statement_trans: list, books_trans: list): return matches, remaining_statement_trans, remaining_books_trans -def format_matches(matches, csv_statement: str, show_reconciled_matches): +# TODO: Return list of tuples (instead of list of lists). + +def format_matches(matches: List, csv_statement: str, show_reconciled_matches: bool) -> List[List]: match_output = [] for r1s, r2s, note in matches: note = ', '.join(note) @@ -306,14 +327,15 @@ def format_matches(matches, csv_statement: str, show_reconciled_matches): return match_output -def date_proximity(d1, d2): - diff = abs((d1 - d2).days) +def date_proximity(d1: datetime.date, d2: datetime.date) -> float: + diff = abs(int((d1 - d2).days)) if diff > 60: - return 0 + return 0.0 else: return 1.0 - (diff / 60.0) -def metadata_for_match(match, statement_filename, csv_filename): + +def metadata_for_match(match: Tuple[List, List, List], statement_filename: str, csv_filename: str) -> List[Tuple[str, int, str]]: # Can we really ever have multiple statement entries? Probably not. statement_filename = get_repo_relative_path(statement_filename) csv_filename = get_repo_relative_path(csv_filename) @@ -361,15 +383,18 @@ def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> No f.writelines(contents) print(f'Wrote {filename}.') -def get_repo_relative_path(path): + +def get_repo_relative_path(path: str) -> str: return os.path.relpath(path, start=os.getenv('CONSERVANCY_REPOSITORY')) -def parse_path(path): + +def parse_path(path: str) -> str: if not os.path.exists(path): raise argparse.ArgumentTypeError(f'File {path} does not exist.') return path -def parse_repo_relative_path(path): + +def parse_repo_relative_path(path: str) -> str: if not os.path.exists(path): raise argparse.ArgumentTypeError(f'File {path} does not exist.') repo = os.getenv('CONSERVANCY_REPOSITORY') @@ -379,7 +404,8 @@ def parse_repo_relative_path(path): raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.') return path -def parse_args(argv): + +def parse_args(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description='Reconciliation helper') parser.add_argument('--beancount-file', required=True, type=parse_path) parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path) @@ -392,7 +418,8 @@ def parse_args(argv): parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books") return parser.parse_args(args=argv[1:]) -def totals(matches): + +def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]: total_matched = decimal.Decimal(0) total_missing_from_books = decimal.Decimal(0) total_missing_from_statement = decimal.Decimal(0) @@ -406,16 +433,16 @@ def totals(matches): return total_matched, total_missing_from_books, total_missing_from_statement -def subset_match(statement_trans, books_trans): +def subset_match(statement_trans: List[dict], books_trans: List[dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: matches = [] remaining_books_trans = [] remaining_statement_trans = [] groups = itertools.groupby(books_trans, key=lambda x: (x['date'], x['payee'])) for k, group in groups: - best_match_score = 0 + best_match_score = 0.0 best_match_index = None - best_match_note = '' + best_match_note = [] matches_found = 0 group_items = list(group) @@ -430,11 +457,11 @@ def subset_match(statement_trans, books_trans): best_match_index = i best_match_note = note if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: - if best_match_score <= 0.8: - best_match_note.append('only one decent match') matches.append(([statement_trans[best_match_index]], group_items, best_match_note)) - del statement_trans[best_match_index] + if best_match_index is not None: + del statement_trans[best_match_index] for item in group_items: + # TODO: Why? books_trans.remove(item) else: remaining_books_trans.append(r2) @@ -442,26 +469,33 @@ def subset_match(statement_trans, books_trans): remaining_statement_trans.append(r1) return matches, remaining_statement_trans, remaining_books_trans -def process_unmatched(statement_trans, books_trans): - matches = [] + +def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]: + matches: List[Tuple[List, List, List]] = [] for r1 in statement_trans: matches.append(([r1], [], ['no match'])) for r2 in books_trans: matches.append(([], [r2], ['no match'])) return matches -def main(args): + +def main(args: argparse.Namespace) -> None: # TODO: Should put in a sanity check to make sure the statement you're feeding # in matches the account you've provided. # TODO: Can we open the files first, then pass the streams on to the rest of the program? if 'AMEX' in args.account: + validate_csv = validate_amex_csv standardize_statement_record = standardize_amex_record else: + validate_csv = validate_fr_csv standardize_statement_record = standardize_fr_record with open(args.csv_statement) as f: + sample = f.read(200) + validate_csv(sample, args.account) + f.seek(0) statement_trans = read_transactions_from_csv(f, standardize_statement_record) begin_date = statement_trans[0]['date'] diff --git a/tests/test_reconcile.py b/tests/test_reconcile.py index fc00ef5..b7085a3 100644 --- a/tests/test_reconcile.py +++ b/tests/test_reconcile.py @@ -218,7 +218,7 @@ def test_payee_mismatch_ok_when_only_one_that_amount_and_date(): statement = [S3] books = [B3_payee_mismatch_1] assert match_statement_and_books(statement, books) == ( - [([S3], [B3_payee_mismatch_1], ['payee mismatch', 'only one decent match'])], + [([S3], [B3_payee_mismatch_1], ['payee mismatch'])], [], [], ) @@ -255,8 +255,8 @@ def test_payee_matches_when_first_word_matches(): def test_metadata_for_match(monkeypatch): monkeypatch.setenv('CONSERVANCY_REPOSITORY', '.') assert metadata_for_match(([S1], [B1], []), 'statement.pdf', 'statement.csv') == [ - ('2022/imports.beancount', 777, ' bank-statement: statement.pdf'), - ('2022/imports.beancount', 777, ' bank-statement-csv: statement.csv:222'), + ('2022/imports.beancount', 777, ' bank-statement: "statement.pdf"'), + ('2022/imports.beancount', 777, ' bank-statement-csv: "statement.csv:222"'), ] def test_no_metadata_if_no_matches():