reconciler: Apply black -S -l 90

This commit is contained in:
Ben Sturmfels 2023-02-11 16:04:00 +11:00
parent 8b08997fda
commit baa299c4c5
Signed by: bsturmfels
GPG key ID: 023C05E2C9C068F0

View file

@ -118,6 +118,7 @@ from .. import config as configmod
if not sys.warnoptions: if not sys.warnoptions:
import warnings import warnings
# Disable annoying warning from thefuzz prompting for a C extension. The # Disable annoying warning from thefuzz prompting for a C extension. The
# current pure-Python implementation isn't a bottleneck for us. # current pure-Python implementation isn't a bottleneck for us.
warnings.filterwarnings('ignore', category=UserWarning, module='thefuzz.fuzz') warnings.filterwarnings('ignore', category=UserWarning, module='thefuzz.fuzz')
@ -200,7 +201,9 @@ def validate_amex_csv(sample: str) -> None:
required_cols = {'Date', 'Amount', 'Description', 'Card Member'} required_cols = {'Date', 'Amount', 'Description', 'Card Member'}
reader = csv.DictReader(io.StringIO(sample)) reader = csv.DictReader(io.StringIO(sample))
if reader.fieldnames and not required_cols.issubset(reader.fieldnames): if reader.fieldnames and not required_cols.issubset(reader.fieldnames):
sys.exit(f"This AMEX CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}. Please use an unmodified statement direct from the institution.") sys.exit(
f"This AMEX CSV doesn't seem to have the columns we're expecting, including: {', '.join(required_cols)}. Please use an unmodified statement direct from the institution."
)
def standardize_amex_record(row: Dict, line: int) -> Dict: def standardize_amex_record(row: Dict, line: int) -> Dict:
@ -221,7 +224,9 @@ def read_amex_csv(f: TextIO) -> list:
reader = csv.DictReader(f) reader = csv.DictReader(f)
# The reader.line_num is the source line number, not the spreadsheet row # The reader.line_num is the source line number, not the spreadsheet row
# number due to multi-line records. # number due to multi-line records.
return sort_records([standardize_amex_record(row, i) for i, row in enumerate(reader, 2)]) return sort_records(
[standardize_amex_record(row, i) for i, row in enumerate(reader, 2)]
)
def validate_fr_csv(sample: str) -> None: def validate_fr_csv(sample: str) -> None:
@ -236,7 +241,9 @@ def validate_fr_csv(sample: str) -> None:
pass pass
amount_found = '$' in row[4] and '$' in row[5] amount_found = '$' in row[4] and '$' in row[5]
if len(row) != 6 or not date or not amount_found: if len(row) != 6 or not date or not amount_found:
sys.exit("This First Republic CSV doesn't seem to have the 6 columns we're expecting, including a date in column 2 and an amount in columns 5 and 6. Please use an unmodified statement direct from the institution.") sys.exit(
"This First Republic CSV doesn't seem to have the 6 columns we're expecting, including a date in column 2 and an amount in columns 5 and 6. Please use an unmodified statement direct from the institution."
)
def standardize_fr_record(line, row): def standardize_fr_record(line, row):
@ -255,7 +262,8 @@ def read_fr_csv(f: TextIO) -> list:
# The reader.line_num is the source line number, not the spreadsheet row # The reader.line_num is the source line number, not the spreadsheet row
# number due to multi-line records. # number due to multi-line records.
return sort_records( return sort_records(
standardize_fr_record(i, row) for i, row in enumerate(reader, 1) standardize_fr_record(i, row)
for i, row in enumerate(reader, 1)
if len(row) == 6 and row[2] not in {'LAST STATEMENT', 'THIS STATEMENT'} if len(row) == 6 and row[2] not in {'LAST STATEMENT', 'THIS STATEMENT'}
) )
@ -265,7 +273,9 @@ def standardize_beancount_record(row) -> Dict: # type: ignore[no-untyped-def]
return { return {
'date': row.date, 'date': row.date,
'amount': row.number_cost_position, 'amount': row.number_cost_position,
'payee': remove_payee_junk(f'{row.payee or ""} {row.entity or ""} {row.narration or ""}'), 'payee': remove_payee_junk(
f'{row.payee or ""} {row.entity or ""} {row.narration or ""}'
),
'check_id': str(row.check_id or ''), 'check_id': str(row.check_id or ''),
'filename': row.filename, 'filename': row.filename,
'line': row.line, 'line': row.line,
@ -289,15 +299,27 @@ def format_multirecord(r1s: List[dict], r2s: List[dict], note: str) -> List[list
assert len(r1s) == 1 assert len(r1s) == 1
assert len(r2s) > 1 assert len(r2s) > 1
match_output = [] match_output = []
match_output.append([r1s[0]['date'], f'{format_record(r1s[0])}{format_record(r2s[0])} ✓ Matched{note}']) match_output.append(
[
r1s[0]['date'],
f'{format_record(r1s[0])}{format_record(r2s[0])} ✓ Matched{note}',
]
)
for r2 in r2s[1:]: for r2 in r2s[1:]:
match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}: ↳ → {format_record(r2)} ✓ Matched{note}']) match_output.append(
[
r1s[0]['date'],
f'{r1s[0]["date"].isoformat()}: ↳ → {format_record(r2)} ✓ Matched{note}',
]
)
return match_output return match_output
def _start_of_month(time, offset_months=0): def _start_of_month(time, offset_months=0):
if offset_months > 0: if offset_months > 0:
return _start_of_month(time.replace(day=28) + datetime.timedelta(days=4), offset_months - 1) return _start_of_month(
time.replace(day=28) + datetime.timedelta(days=4), offset_months - 1
)
else: else:
return time.replace(day=1) return time.replace(day=1)
@ -306,7 +328,8 @@ def round_to_month(begin_date, end_date):
"""Round a beginning and end date to beginning and end of months respectively.""" """Round a beginning and end date to beginning and end of months respectively."""
return ( return (
_start_of_month(begin_date), _start_of_month(begin_date),
_start_of_month(end_date, offset_months=1) - datetime.timedelta(days=1)) _start_of_month(end_date, offset_months=1) - datetime.timedelta(days=1),
)
def sort_records(records: List) -> List: def sort_records(records: List) -> List:
@ -377,11 +400,15 @@ def records_match(r1: Dict, r2: Dict) -> Tuple[float, List[str]]:
payee_message = 'payee mismatch' payee_message = 'payee mismatch'
overall_score = (date_score + amount_score + check_score + payee_score) / 4 overall_score = (date_score + amount_score + check_score + payee_score) / 4
overall_message = [m for m in [date_message, amount_message, check_message, payee_message] if m] overall_message = [
m for m in [date_message, amount_message, check_message, payee_message] if m
]
return overall_score, overall_message return overall_score, overall_message
def match_statement_and_books(statement_trans: List[Dict], books_trans: List[Dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: def match_statement_and_books(
statement_trans: List[Dict], books_trans: List[Dict]
) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]:
"""Match transactions between the statement and books. """Match transactions between the statement and books.
If matched, the books transaction is marked off so that it can If matched, the books transaction is marked off so that it can
@ -413,7 +440,12 @@ def match_statement_and_books(statement_trans: List[Dict], books_trans: List[Dic
best_match_score = score best_match_score = score
best_match_index = i best_match_index = i
best_match_note = note best_match_note = note
if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: if (
best_match_score > 0.5
and matches_found == 1
and 'check-id mismatch' not in best_match_note
or best_match_score > 0.8
):
matches.append(([r1], [books_trans[best_match_index]], best_match_note)) matches.append(([r1], [books_trans[best_match_index]], best_match_note))
# Don't try to make a second match against this books entry. # Don't try to make a second match against this books entry.
if best_match_index is not None: if best_match_index is not None:
@ -425,7 +457,9 @@ def match_statement_and_books(statement_trans: List[Dict], books_trans: List[Dic
return matches, remaining_statement_trans, remaining_books_trans return matches, remaining_statement_trans, remaining_books_trans
def subset_match(statement_trans: List[dict], books_trans: List[dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]: def subset_match(
statement_trans: List[dict], books_trans: List[dict]
) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]:
"""Match single statement transactions with multiple books transactions. """Match single statement transactions with multiple books transactions.
Works similarly to match_statement_and_books in that it returns a Works similarly to match_statement_and_books in that it returns a
@ -455,8 +489,15 @@ def subset_match(statement_trans: List[dict], books_trans: List[dict]) -> Tuple[
best_match_score = score best_match_score = score
best_match_index = i best_match_index = i
best_match_note = note best_match_note = note
if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8: if (
matches.append(([statement_trans[best_match_index]], group_items, best_match_note)) best_match_score > 0.5
and matches_found == 1
and 'check-id mismatch' not in best_match_note
or best_match_score > 0.8
):
matches.append(
([statement_trans[best_match_index]], group_items, best_match_note)
)
if best_match_index is not None: if best_match_index is not None:
del statement_trans[best_match_index] del statement_trans[best_match_index]
else: else:
@ -468,7 +509,10 @@ def subset_match(statement_trans: List[dict], books_trans: List[dict]) -> Tuple[
# TODO: Return list of tuples (instead of list of lists). # TODO: Return list of tuples (instead of list of lists).
def format_matches(matches: List, csv_statement: str, show_reconciled_matches: bool) -> List[List]:
def format_matches(
matches: List, csv_statement: str, show_reconciled_matches: bool
) -> List[List]:
"""Produce a list of body output lines from the given matches. """Produce a list of body output lines from the given matches.
@ -484,16 +528,35 @@ def format_matches(matches: List, csv_statement: str, show_reconciled_matches: b
if r1s and r2s: if r1s and r2s:
if show_reconciled_matches or not all(x['bank_statement'] for x in r2s): if show_reconciled_matches or not all(x['bank_statement'] for x in r2s):
if len(r2s) == 1: if len(r2s) == 1:
entry = [r1s[0]['date'], f'{format_record(r1s[0])}{format_record(r2s[0])} ✓ Matched{note}'] entry = [
r1s[0]['date'],
f'{format_record(r1s[0])}{format_record(r2s[0])} ✓ Matched{note}',
]
if 'payee mismatch' in note: if 'payee mismatch' in note:
entry[1] = Fore.YELLOW + Style.BRIGHT + entry[1] + Style.RESET_ALL entry[1] = Fore.YELLOW + Style.BRIGHT + entry[1] + Style.RESET_ALL
match_output.append(entry) match_output.append(entry)
else: else:
match_output.extend(format_multirecord(r1s, r2s, note)) match_output.extend(format_multirecord(r1s, r2s, note))
elif r1s: elif r1s:
match_output.append([r1s[0]['date'], Fore.RED + Style.BRIGHT + f'{format_record(r1s[0])}{" ":^59} ✗ NOT IN BOOKS ({os.path.basename(csv_statement)}:{r1s[0]["line"]})' + Style.RESET_ALL]) match_output.append(
[
r1s[0]['date'],
Fore.RED
+ Style.BRIGHT
+ f'{format_record(r1s[0])}{" ":^59} ✗ NOT IN BOOKS ({os.path.basename(csv_statement)}:{r1s[0]["line"]})'
+ Style.RESET_ALL,
]
)
else: else:
match_output.append([r2s[0]['date'], Fore.RED + Style.BRIGHT + f'{" ":^59}{format_record(r2s[0])} ✗ NOT ON STATEMENT ({os.path.basename(r2s[0]["filename"])}:{r2s[0]["line"]})' + Style.RESET_ALL]) match_output.append(
[
r2s[0]['date'],
Fore.RED
+ Style.BRIGHT
+ f'{" ":^59}{format_record(r2s[0])} ✗ NOT ON STATEMENT ({os.path.basename(r2s[0]["filename"])}:{r2s[0]["line"]})'
+ Style.RESET_ALL,
]
)
return match_output return match_output
@ -507,7 +570,9 @@ def date_proximity(d1: datetime.date, d2: datetime.date) -> float:
return 1.0 - (diff / ZERO_CUTOFF) return 1.0 - (diff / ZERO_CUTOFF)
def metadata_for_match(match: Tuple[List, List, List], statement_filename: str, csv_filename: str) -> List[Tuple[str, int, str]]: def metadata_for_match(
match: Tuple[List, List, List], statement_filename: str, csv_filename: str
) -> List[Tuple[str, int, str]]:
"""Returns the bank-statement metadata that should be applied for a match.""" """Returns the bank-statement metadata that should be applied for a match."""
# TODO: Our data structure would allow multiple statement entries # TODO: Our data structure would allow multiple statement entries
# for a match, but would this ever make sense? Probably not. # for a match, but would this ever make sense? Probably not.
@ -518,8 +583,20 @@ def metadata_for_match(match: Tuple[List, List, List], statement_filename: str,
for books_entry in books_entries: for books_entry in books_entries:
for statement_entry in statement_entries: for statement_entry in statement_entries:
if not books_entry['bank_statement']: if not books_entry['bank_statement']:
metadata.append((books_entry['filename'], books_entry['line'], f' bank-statement: "{statement_filename}"')) metadata.append(
metadata.append((books_entry['filename'], books_entry['line'], f' bank-statement-csv: "{csv_filename}:{statement_entry["line"]}"')) (
books_entry['filename'],
books_entry['line'],
f' bank-statement: "{statement_filename}"',
)
)
metadata.append(
(
books_entry['filename'],
books_entry['line'],
f' bank-statement-csv: "{csv_filename}:{statement_entry["line"]}"',
)
)
return metadata return metadata
@ -550,7 +627,9 @@ def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> No
with open(filename, 'r') as f: with open(filename, 'r') as f:
file_contents[filename] = f.readlines() file_contents[filename] = f.readlines()
# Insert is inefficient, but fast enough for now in practise. # Insert is inefficient, but fast enough for now in practise.
file_contents[filename].insert(line + file_offsets[filename], metadata.rstrip() + '\n') file_contents[filename].insert(
line + file_offsets[filename], metadata.rstrip() + '\n'
)
file_offsets[filename] += 1 file_offsets[filename] += 1
# Writes each updated file back to disk. # Writes each updated file back to disk.
for filename, contents in file_contents.items(): for filename, contents in file_contents.items():
@ -589,7 +668,9 @@ def parse_repo_relative_path(path: str) -> str:
if not repo: if not repo:
raise argparse.ArgumentTypeError('$CONSERVANCY_REPOSITORY is not set.') raise argparse.ArgumentTypeError('$CONSERVANCY_REPOSITORY is not set.')
if not path.startswith(repo): if not path.startswith(repo):
raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.') raise argparse.ArgumentTypeError(
f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.'
)
return path return path
@ -606,16 +687,28 @@ def parse_arguments(argv: List[str]) -> argparse.Namespace:
parser.add_argument('--beancount-file', required=True, type=parse_path) parser.add_argument('--beancount-file', required=True, type=parse_path)
parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path) parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path)
parser.add_argument('--bank-statement', required=True, type=parse_repo_relative_path) parser.add_argument('--bank-statement', required=True, type=parse_repo_relative_path)
parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX') parser.add_argument(
'--account', required=True, help='eg. Liabilities:CreditCard:AMEX'
)
# parser.add_argument('--report-group-regex') # parser.add_argument('--report-group-regex')
parser.add_argument('--show-reconciled-matches', action='store_true') parser.add_argument('--show-reconciled-matches', action='store_true')
parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books") # parser.add_argument('--statement-balance', type=parse_decimal_with_separator, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals") parser.add_argument(
parser.add_argument('--full-months', action='store_true', help='Match payments over the full month, rather that just between the beginning and end dates of the CSV statement') '--non-interactive',
action='store_true',
help="Don't prompt to write to the books",
) # parser.add_argument('--statement-balance', type=parse_decimal_with_separator, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals")
parser.add_argument(
'--full-months',
action='store_true',
help='Match payments over the full month, rather that just between the beginning and end dates of the CSV statement',
)
args = parser.parse_args(args=argv) args = parser.parse_args(args=argv)
return args return args
def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]: def totals(
matches: List[Tuple[List, List, List]]
) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]:
"""Calculate the totals of transactions matched/not-matched.""" """Calculate the totals of transactions matched/not-matched."""
total_matched = decimal.Decimal(0) total_matched = decimal.Decimal(0)
total_missing_from_books = decimal.Decimal(0) total_missing_from_books = decimal.Decimal(0)
@ -630,7 +723,9 @@ def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, dec
return total_matched, total_missing_from_books, total_missing_from_statement return total_matched, total_missing_from_books, total_missing_from_statement
def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]: def process_unmatched(
statement_trans: List[dict], books_trans: List[dict]
) -> List[Tuple[List, List, List]]:
"""Format the remaining unmatched transactions to be added to one single list of matches.""" """Format the remaining unmatched transactions to be added to one single list of matches."""
matches: List[Tuple[List, List, List]] = [] matches: List[Tuple[List, List, List]] = []
for r1 in statement_trans: for r1 in statement_trans:
@ -640,29 +735,41 @@ def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> L
return matches return matches
def format_output(matches, begin_date, end_date, csv_statement, show_reconciled_matches) -> str: def format_output(
matches, begin_date, end_date, csv_statement, show_reconciled_matches
) -> str:
with io.StringIO() as out: with io.StringIO() as out:
match_output = format_matches(matches, csv_statement, show_reconciled_matches) match_output = format_matches(matches, csv_statement, show_reconciled_matches)
_, total_missing_from_books, total_missing_from_statement = totals(matches) _, total_missing_from_books, total_missing_from_statement = totals(matches)
print('-' * 155, file=out) print('-' * 155, file=out)
statement_heading = f'Statement transactions {begin_date} to {end_date}' statement_heading = f'Statement transactions {begin_date} to {end_date}'
print(f'{statement_heading:<52} {"Books transactions":<58} Notes', file=out) print(
f'{statement_heading:<52} {"Books transactions":<58} Notes',
file=out,
)
print('-' * 155, file=out) print('-' * 155, file=out)
for _, output in sorted(match_output, key=lambda x: x[0]): for _, output in sorted(match_output, key=lambda x: x[0]):
print(output, file=out) print(output, file=out)
print('-' * 155, file=out) print('-' * 155, file=out)
print(f'Sub-total not on statement: {total_missing_from_statement:12,.2f}', file=out) print(
f'Sub-total not on statement: {total_missing_from_statement:12,.2f}',
file=out,
)
print(f'Sub-total not in books: {total_missing_from_books:12,.2f}', file=out) print(f'Sub-total not in books: {total_missing_from_books:12,.2f}', file=out)
print(f'Total: {total_missing_from_statement + total_missing_from_books:12,.2f}', file=out) print(
f'Total: {total_missing_from_statement + total_missing_from_books:12,.2f}',
file=out,
)
print('-' * 155, file=out) print('-' * 155, file=out)
return out.getvalue() return out.getvalue()
def main(arglist: Optional[Sequence[str]] = None, def main(
stdout: TextIO = sys.stdout, arglist: Optional[Sequence[str]] = None,
stderr: TextIO = sys.stderr, stdout: TextIO = sys.stdout,
config: Optional[configmod.Config] = None, stderr: TextIO = sys.stderr,
) -> int: config: Optional[configmod.Config] = None,
) -> int:
args = parse_arguments(arglist) args = parse_arguments(arglist)
cliutil.set_loglevel(logger, args.loglevel) cliutil.set_loglevel(logger, args.loglevel)
if config is None: if config is None:
@ -727,9 +834,14 @@ def main(arglist: Optional[Sequence[str]] = None,
# Apply two passes of matching, one for standard matches and one # Apply two passes of matching, one for standard matches and one
# for subset matches. # for subset matches.
matches, remaining_statement_trans, remaining_books_trans = match_statement_and_books(statement_trans, books_trans) (
matches,
remaining_statement_trans,
remaining_books_trans,
) = match_statement_and_books(statement_trans, books_trans)
subset_matches, remaining_statement_trans, remaining_books_trans = subset_match( subset_matches, remaining_statement_trans, remaining_books_trans = subset_match(
remaining_statement_trans, remaining_books_trans) remaining_statement_trans, remaining_books_trans
)
matches.extend(subset_matches) matches.extend(subset_matches)
# Add the remaining unmatched to make one big list of matches, successful or not. # Add the remaining unmatched to make one big list of matches, successful or not.
@ -737,12 +849,22 @@ def main(arglist: Optional[Sequence[str]] = None,
matches.extend(unmatched) matches.extend(unmatched)
# Print out results of our matching. # Print out results of our matching.
print(format_output(matches, begin_date, end_date, args.csv_statement, args.show_reconciled_matches)) print(
format_output(
matches,
begin_date,
end_date,
args.csv_statement,
args.show_reconciled_matches,
)
)
# Write statement metadata back to the books. # Write statement metadata back to the books.
metadata_to_apply = [] metadata_to_apply = []
for match in matches: for match in matches:
metadata_to_apply.extend(metadata_for_match(match, args.bank_statement, args.csv_statement)) metadata_to_apply.extend(
metadata_for_match(match, args.bank_statement, args.csv_statement)
)
if metadata_to_apply and not args.non_interactive: if metadata_to_apply and not args.non_interactive:
print('Mark matched transactions as reconciled in the books? (y/N) ', end='') print('Mark matched transactions as reconciled in the books? (y/N) ', end='')
if input().lower() == 'y': if input().lower() == 'y':