reconcile: Add further typing info; update tests.
This commit is contained in:
		
							parent
							
								
									9ae36feed2
								
							
						
					
					
						commit
						54d11f2437
					
				
					 2 changed files with 75 additions and 41 deletions
				
			
		|  | @ -58,7 +58,7 @@ from typing import Callable, Dict, List, Tuple, TextIO | |||
| 
 | ||||
| from beancount import loader | ||||
| from beancount.query.query import run_query | ||||
| from colorama import Fore, Style | ||||
| from colorama import Fore, Style  # type: ignore | ||||
| 
 | ||||
| if not sys.warnoptions: | ||||
|     import warnings | ||||
|  | @ -96,7 +96,7 @@ JUNK_WORDS = [ | |||
| JUNK_WORDS_RES = [re.compile(word, re.IGNORECASE) for word in JUNK_WORDS] | ||||
| ZERO_RE = re.compile('^0+') | ||||
| 
 | ||||
| def remove_duplicate_words(text): | ||||
| def remove_duplicate_words(text: str) -> str: | ||||
|     unique_words = [] | ||||
|     known_words = set() | ||||
|     for word in text.split(): | ||||
|  | @ -123,18 +123,19 @@ def remove_payee_junk(payee: str) -> str: | |||
|     payee.strip() | ||||
|     return payee | ||||
| 
 | ||||
| # NOTE: Statement doesn't seem to give us a running balance or a final total. | ||||
| 
 | ||||
| def read_transactions_from_csv(f: TextIO, standardize_statement_record: Callable) -> list: | ||||
|     reader = csv.DictReader(f) | ||||
|     return sort_records([standardize_statement_record(row, reader.line_num) for row in reader]) | ||||
|     # The reader.line_num is the source line number, not the spreadsheet row | ||||
|     # number due to multi-line records. | ||||
|     return sort_records([standardize_statement_record(row, i) for i, row in enumerate(reader, 2)]) | ||||
| 
 | ||||
| # Does the account you entered match the CSV? | ||||
| # Is the CSV in the format we expect? (ie. did they download through the right interface?) | ||||
| # Logical CSV line numbers | ||||
| # CSV reconciliation report | ||||
| 
 | ||||
| # NOTE: Statement doesn't seem to give us a running balance or a final total. | ||||
| # CSV reconciliation report. | ||||
| # Merge helper script. | ||||
| 
 | ||||
| 
 | ||||
| def standardize_amex_record(row: Dict, line: int) -> Dict: | ||||
|     """Turn an AMEX CSV row into a standard dict format representing a transaction.""" | ||||
|     return { | ||||
|  | @ -148,6 +149,20 @@ def standardize_amex_record(row: Dict, line: int) -> Dict: | |||
|     } | ||||
| 
 | ||||
| 
 | ||||
| def validate_amex_csv(sample: str, account: str) -> None: | ||||
|     required_cols = {'Date', 'Amount', 'Description', 'Card Member'} | ||||
|     reader = csv.DictReader(io.StringIO(sample)) | ||||
|     if reader.fieldnames and not required_cols.issubset(reader.fieldnames): | ||||
|         sys.exit(f"This CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}") | ||||
| 
 | ||||
| 
 | ||||
| def validate_fr_csv(sample: str, account: str) -> None: | ||||
|     required_cols = {'Date', 'Amount', 'Detail', 'Serial Num'} | ||||
|     reader = csv.DictReader(io.StringIO(sample)) | ||||
|     if reader.fieldnames and not required_cols.issubset(reader.fieldnames): | ||||
|         sys.exit(f"This CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}") | ||||
| 
 | ||||
| 
 | ||||
| def standardize_fr_record(row: Dict, line: int) -> Dict: | ||||
|     return { | ||||
|         'date': datetime.datetime.strptime(row['Date'], '%m/%d/%Y').date(), | ||||
|  | @ -181,7 +196,7 @@ def format_record(record: dict) -> str: | |||
|     return output | ||||
| 
 | ||||
| 
 | ||||
| def format_multirecord(r1s, r2s, note): | ||||
| def format_multirecord(r1s: list[dict], r2s: list[dict], note: str) -> list[list]: | ||||
|     total = sum(x['amount'] for x in r2s) | ||||
|     assert len(r1s) == 1 | ||||
|     assert len(r2s) > 1 | ||||
|  | @ -191,26 +206,29 @@ def format_multirecord(r1s, r2s, note): | |||
|         match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}:             ↳                                    →  {format_record(r2)}  ✓ Matched{note}']) | ||||
|     return match_output | ||||
| 
 | ||||
| 
 | ||||
| def sort_records(records: List) -> List: | ||||
|     return sorted(records, key=lambda x: (x['date'], x['amount'])) | ||||
| 
 | ||||
| 
 | ||||
| def first_word_exact_match(a, b): | ||||
| def first_word_exact_match(a: str, b: str) -> float: | ||||
|     if len(a) == 0 or len(b) == 0: | ||||
|         return 0 | ||||
|         return 0.0 | ||||
|     first_a = a.split()[0].strip() | ||||
|     first_b = b.split()[0].strip() | ||||
|     if first_a.casefold() == first_b.casefold(): | ||||
|         return min(1.0, 0.2 * len(first_a)) | ||||
|     else: | ||||
|         return 0; | ||||
|         return 0.0; | ||||
| 
 | ||||
| def payee_match(a, b): | ||||
|     fuzzy_match = fuzz.token_set_ratio(a, b) / 100.00 | ||||
| 
 | ||||
| def payee_match(a: str, b: str) -> float: | ||||
|     fuzzy_match = float(fuzz.token_set_ratio(a, b) / 100.00) | ||||
|     first_word_match = first_word_exact_match(a, b) | ||||
|     return max(fuzzy_match, first_word_match) | ||||
| 
 | ||||
| def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: | ||||
| 
 | ||||
| def records_match(r1: Dict, r2: Dict) -> Tuple[float, List[str]]: | ||||
|     """Do these records represent the same transaction?""" | ||||
| 
 | ||||
|     date_score = date_proximity(r1['date'], r2['date']) | ||||
|  | @ -254,7 +272,7 @@ def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]: | |||
|     return overall_score, overall_message | ||||
| 
 | ||||
| 
 | ||||
| def match_statement_and_books(statement_trans: list, books_trans: list): | ||||
| def match_statement_and_books(statement_trans: List[Dict], books_trans: List[Dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: | ||||
|     """ | ||||
|     Runs through all the statement transactions to find a matching transaction | ||||
|     in the books. If found, the books transaction is marked off so that it can | ||||
|  | @ -266,9 +284,9 @@ def match_statement_and_books(statement_trans: list, books_trans: list): | |||
|     remaining_statement_trans = [] | ||||
| 
 | ||||
|     for r1 in statement_trans: | ||||
|         best_match_score = 0 | ||||
|         best_match_score = 0.0 | ||||
|         best_match_index = None | ||||
|         best_match_note = '' | ||||
|         best_match_note = [] | ||||
|         matches_found = 0 | ||||
|         for i, r2 in enumerate(books_trans): | ||||
|             score, note = records_match(r1, r2) | ||||
|  | @ -280,7 +298,8 @@ def match_statement_and_books(statement_trans: list, books_trans: list): | |||
|         if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: | ||||
|             matches.append(([r1], [books_trans[best_match_index]], best_match_note)) | ||||
|             # Don't try to make a second match against this books entry. | ||||
|             del books_trans[best_match_index] | ||||
|             if best_match_index is not None: | ||||
|                 del books_trans[best_match_index] | ||||
|         else: | ||||
|             remaining_statement_trans.append(r1) | ||||
|     for r2 in books_trans: | ||||
|  | @ -288,7 +307,9 @@ def match_statement_and_books(statement_trans: list, books_trans: list): | |||
|     return matches, remaining_statement_trans, remaining_books_trans | ||||
| 
 | ||||
| 
 | ||||
| def format_matches(matches, csv_statement: str, show_reconciled_matches): | ||||
| # TODO: Return list of tuples (instead of list of lists). | ||||
| 
 | ||||
| def format_matches(matches: List, csv_statement: str, show_reconciled_matches: bool) -> List[List]: | ||||
|     match_output = [] | ||||
|     for r1s, r2s, note in matches: | ||||
|         note = ', '.join(note) | ||||
|  | @ -306,14 +327,15 @@ def format_matches(matches, csv_statement: str, show_reconciled_matches): | |||
|     return match_output | ||||
| 
 | ||||
| 
 | ||||
| def date_proximity(d1, d2): | ||||
|     diff = abs((d1 - d2).days) | ||||
| def date_proximity(d1: datetime.date, d2: datetime.date) -> float: | ||||
|     diff = abs(int((d1 - d2).days)) | ||||
|     if diff > 60: | ||||
|         return 0 | ||||
|         return 0.0 | ||||
|     else: | ||||
|         return 1.0 - (diff / 60.0) | ||||
| 
 | ||||
| def metadata_for_match(match, statement_filename, csv_filename): | ||||
| 
 | ||||
| def metadata_for_match(match: Tuple[List, List, List], statement_filename: str, csv_filename: str) -> List[Tuple[str, int, str]]: | ||||
|     # Can we really ever have multiple statement entries? Probably not. | ||||
|     statement_filename = get_repo_relative_path(statement_filename) | ||||
|     csv_filename = get_repo_relative_path(csv_filename) | ||||
|  | @ -361,15 +383,18 @@ def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> No | |||
|             f.writelines(contents) | ||||
|             print(f'Wrote {filename}.') | ||||
| 
 | ||||
| def get_repo_relative_path(path): | ||||
| 
 | ||||
| def get_repo_relative_path(path: str) -> str: | ||||
|     return os.path.relpath(path, start=os.getenv('CONSERVANCY_REPOSITORY')) | ||||
| 
 | ||||
| def parse_path(path): | ||||
| 
 | ||||
| def parse_path(path: str) -> str: | ||||
|     if not os.path.exists(path): | ||||
|         raise argparse.ArgumentTypeError(f'File {path} does not exist.') | ||||
|     return path | ||||
| 
 | ||||
| def parse_repo_relative_path(path): | ||||
| 
 | ||||
| def parse_repo_relative_path(path: str) -> str: | ||||
|     if not os.path.exists(path): | ||||
|         raise argparse.ArgumentTypeError(f'File {path} does not exist.') | ||||
|     repo = os.getenv('CONSERVANCY_REPOSITORY') | ||||
|  | @ -379,7 +404,8 @@ def parse_repo_relative_path(path): | |||
|         raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.') | ||||
|     return path | ||||
| 
 | ||||
| def parse_args(argv): | ||||
| 
 | ||||
| def parse_args(argv: List[str]) -> argparse.Namespace: | ||||
|     parser = argparse.ArgumentParser(description='Reconciliation helper') | ||||
|     parser.add_argument('--beancount-file', required=True, type=parse_path) | ||||
|     parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path) | ||||
|  | @ -392,7 +418,8 @@ def parse_args(argv): | |||
|     parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books") | ||||
|     return parser.parse_args(args=argv[1:]) | ||||
| 
 | ||||
| def totals(matches): | ||||
| 
 | ||||
| def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]: | ||||
|     total_matched = decimal.Decimal(0) | ||||
|     total_missing_from_books = decimal.Decimal(0) | ||||
|     total_missing_from_statement = decimal.Decimal(0) | ||||
|  | @ -406,16 +433,16 @@ def totals(matches): | |||
|     return total_matched, total_missing_from_books, total_missing_from_statement | ||||
| 
 | ||||
| 
 | ||||
| def subset_match(statement_trans, books_trans): | ||||
| def subset_match(statement_trans: List[dict], books_trans: List[dict]) ->  Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: | ||||
|     matches = [] | ||||
|     remaining_books_trans = [] | ||||
|     remaining_statement_trans = [] | ||||
| 
 | ||||
|     groups = itertools.groupby(books_trans, key=lambda x: (x['date'], x['payee'])) | ||||
|     for k, group in groups: | ||||
|         best_match_score = 0 | ||||
|         best_match_score = 0.0 | ||||
|         best_match_index = None | ||||
|         best_match_note = '' | ||||
|         best_match_note = [] | ||||
|         matches_found = 0 | ||||
| 
 | ||||
|         group_items = list(group) | ||||
|  | @ -430,11 +457,11 @@ def subset_match(statement_trans, books_trans): | |||
|                 best_match_index = i | ||||
|                 best_match_note = note | ||||
|         if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: | ||||
|             if best_match_score <= 0.8: | ||||
|                 best_match_note.append('only one decent match') | ||||
|             matches.append(([statement_trans[best_match_index]], group_items, best_match_note)) | ||||
|             del statement_trans[best_match_index] | ||||
|             if best_match_index is not None: | ||||
|                 del statement_trans[best_match_index] | ||||
|             for item in group_items: | ||||
|                 # TODO: Why? | ||||
|                 books_trans.remove(item) | ||||
|         else: | ||||
|             remaining_books_trans.append(r2) | ||||
|  | @ -442,26 +469,33 @@ def subset_match(statement_trans, books_trans): | |||
|          remaining_statement_trans.append(r1) | ||||
|     return matches, remaining_statement_trans, remaining_books_trans | ||||
| 
 | ||||
| def process_unmatched(statement_trans, books_trans): | ||||
|     matches = [] | ||||
| 
 | ||||
| def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]: | ||||
|     matches: List[Tuple[List, List, List]] = [] | ||||
|     for r1 in statement_trans: | ||||
|         matches.append(([r1], [], ['no match'])) | ||||
|     for r2 in books_trans: | ||||
|         matches.append(([], [r2], ['no match'])) | ||||
|     return matches | ||||
| 
 | ||||
| def main(args): | ||||
| 
 | ||||
| def main(args: argparse.Namespace) -> None: | ||||
|     # TODO: Should put in a sanity check to make sure the statement you're feeding | ||||
|     # in matches the account you've provided. | ||||
| 
 | ||||
|     # TODO: Can we open the files first, then pass the streams on to the rest of the program? | ||||
| 
 | ||||
|     if 'AMEX' in args.account: | ||||
|         validate_csv = validate_amex_csv | ||||
|         standardize_statement_record = standardize_amex_record | ||||
|     else: | ||||
|         validate_csv = validate_fr_csv | ||||
|         standardize_statement_record = standardize_fr_record | ||||
| 
 | ||||
|     with open(args.csv_statement) as f: | ||||
|         sample = f.read(200) | ||||
|         validate_csv(sample, args.account) | ||||
|         f.seek(0) | ||||
|         statement_trans = read_transactions_from_csv(f, standardize_statement_record) | ||||
| 
 | ||||
|     begin_date = statement_trans[0]['date'] | ||||
|  |  | |||
|  | @ -218,7 +218,7 @@ def test_payee_mismatch_ok_when_only_one_that_amount_and_date(): | |||
|     statement = [S3] | ||||
|     books = [B3_payee_mismatch_1] | ||||
|     assert match_statement_and_books(statement, books) == ( | ||||
|         [([S3], [B3_payee_mismatch_1], ['payee mismatch', 'only one decent match'])], | ||||
|         [([S3], [B3_payee_mismatch_1], ['payee mismatch'])], | ||||
|         [], | ||||
|         [], | ||||
|     ) | ||||
|  | @ -255,8 +255,8 @@ def test_payee_matches_when_first_word_matches(): | |||
| def test_metadata_for_match(monkeypatch): | ||||
|     monkeypatch.setenv('CONSERVANCY_REPOSITORY', '.') | ||||
|     assert metadata_for_match(([S1], [B1], []), 'statement.pdf', 'statement.csv') == [ | ||||
|         ('2022/imports.beancount', 777, '    bank-statement: statement.pdf'), | ||||
|         ('2022/imports.beancount', 777, '    bank-statement-csv: statement.csv:222'), | ||||
|         ('2022/imports.beancount', 777, '    bank-statement: "statement.pdf"'), | ||||
|         ('2022/imports.beancount', 777, '    bank-statement-csv: "statement.csv:222"'), | ||||
|     ] | ||||
| 
 | ||||
| def test_no_metadata_if_no_matches(): | ||||
|  |  | |||
		Loading…
	
	Add table
		
		Reference in a new issue