From f3c3ebcf59e2fa517004db37e0a113d80c241346 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Fri, 19 Feb 2021 11:34:35 -0500 Subject: [PATCH] books: Add LoadResult NamedTuple. This refactors out some common functionality from our CLI tools. --- conservancy_beancount/books.py | 73 ++++++++++++++++--- conservancy_beancount/cliutil.py | 2 + tests/test_books_loader.py | 117 ++++++++++++++++++++++++++++++- 3 files changed, 182 insertions(+), 10 deletions(-) diff --git a/conservancy_beancount/books.py b/conservancy_beancount/books.py index 608bfca..09a8799 100644 --- a/conservancy_beancount/books.py +++ b/conservancy_beancount/books.py @@ -11,6 +11,11 @@ from pathlib import Path import beancount.loader as bc_loader import beancount.parser.options as bc_options +import beancount.parser.printer as bc_printer + +from . import cliutil +from . import data +from .reports import rewrite from typing import ( Any, @@ -19,12 +24,14 @@ from typing import ( Mapping, NamedTuple, Optional, + TextIO, Union, ) from .beancount_types import ( + Entries, Error, Errors, - LoadResult, + OptionsMap, ) PathLike = Union[str, Path] @@ -91,6 +98,57 @@ class FiscalYear(NamedTuple): return range(from_fy, to_fy + 1) +class LoadResult(NamedTuple): + entries: Entries + errors: Errors + options_map: OptionsMap + + @classmethod + def empty(cls, error: Optional[Error]=None) -> 'LoadResult': + errors: Errors = [] + if error is not None: + errors.append(error) + return cls([], errors, bc_options.OPTIONS_DEFAULTS.copy()) + + def iter_postings( + self, + rewrites: Iterable[Union[Path, rewrite.RewriteRuleset]]=(), + search_terms: Iterable[cliutil.SearchTerm]=(), + ) -> Iterator[data.Posting]: + postings = data.Posting.from_entries(self.entries) + for ruleset in rewrites: + if isinstance(ruleset, Path): + ruleset = rewrite.RewriteRuleset.from_yaml(ruleset) + postings = ruleset.rewrite(postings) + for search_term in search_terms: + postings = search_term.filter_postings(postings) + return postings + + def load_account_metadata(self) -> None: + return data.Account.load_from_books(self.entries, self.options_map) + + def print_errors(self, out_file: TextIO) -> bool: + for error in self.errors: + bc_printer.print_error(error, file=out_file) + try: + error + except NameError: + return False + else: + return True + + def returncode(self) -> int: + if self.errors: + if self.entries: + return cliutil.ExitCode.BeancountErrors + else: + return cliutil.ExitCode.NoConfiguration + elif not self.entries: + return cliutil.ExitCode.NoDataLoaded + else: + return cliutil.ExitCode.OK + + class Loader: """Load Beancount books organized by fiscal year""" @@ -116,20 +174,20 @@ class Loader: def _load_paths(self, paths: Iterator[Path]) -> LoadResult: try: - entries, errors, options_map = bc_loader.load_file(next(paths)) + result = LoadResult._make(bc_loader.load_file(next(paths))) except StopIteration: - entries, errors, options_map = [], [], {} + result = LoadResult.empty() for load_path in paths: new_entries, new_errors, new_options = bc_loader.load_file(load_path) # We only want transactions from the new fiscal year. # We don't want the opening balance, duplicate definitions, etc. fy_filename = str(load_path.parent.parent / load_path.name) - entries.extend( + result.entries.extend( entry for entry in new_entries if entry.meta.get('filename') == fy_filename ) - errors.extend(new_errors) - return entries, errors, options_map + result.errors.extend(new_errors) + return result def _path_year(self, path: Path) -> int: return int(path.stem) @@ -195,5 +253,4 @@ class Loader: 'filename': str(config_path or 'conservancy_beancount.ini'), 'lineno': lineno, } - errors: Errors = [Error(source, "no books to load in configuration", None)] - return [], errors, bc_options.OPTIONS_DEFAULTS.copy() + return LoadResult.empty(Error(source, "no books to load in configuration", None)) diff --git a/conservancy_beancount/cliutil.py b/conservancy_beancount/cliutil.py index ec44ab0..7ea8147 100644 --- a/conservancy_beancount/cliutil.py +++ b/conservancy_beancount/cliutil.py @@ -183,6 +183,8 @@ class ExitCode(enum.IntEnum): NoConfig = NoConfiguration NoDataFiltered = os.EX_DATAERR NoDataLoaded = os.EX_NOINPUT + OK = os.EX_OK + Ok = OK RewriteRulesError = os.EX_DATAERR # Our own exit codes, working down from that range diff --git a/tests/test_books_loader.py b/tests/test_books_loader.py index 38e0d9a..9efed3c 100644 --- a/tests/test_books_loader.py +++ b/tests/test_books_loader.py @@ -6,6 +6,9 @@ # LICENSE.txt in the repository. import collections +import io +import itertools +import os import re from datetime import date @@ -16,12 +19,36 @@ import pytest from . import testutil from beancount.core import data as bc_data -from conservancy_beancount import books +from conservancy_beancount import books, data FY_START_MONTH = 3 books_path = testutil.test_path('books') +class MockError(Exception): + def __init__(self, message, lineno=0): + self.message = message + self.entry = None + self.source = {'filename': 'test_books_loader.py', 'lineno': lineno} + + +class MockSearchTerm: + def __init__(self, pred): + self.pred = pred + + def filter_postings(self, postings): + return (post for post in postings if self.pred(post)) + + rewrite = filter_postings + + +SEARCH_TERMS = [ + MockSearchTerm(lambda post: post.account.startswith('Expenses:')), + MockSearchTerm(lambda post: post.units.number >= 10), +] + +clean_account_meta = pytest.fixture()(testutil.clean_account_meta) + @pytest.fixture(scope='module') def conservancy_loader(): return books.Loader(books_path, books.FiscalYear(FY_START_MONTH)) @@ -42,6 +69,92 @@ def txn_dates(entries): def txn_years(entries): return frozenset(date.year for date in txn_dates(entries)) +def test_load_result_returncode_ok(): + options_map = {'filename': 'test_load_result_returncode_ok'} + result = books.LoadResult([testutil.Transaction()], [], options_map) + assert result.returncode() == 0 + +def test_load_result_beancount_errors(): + error = MockError("empty transaction", lineno=65) + options_map = dict(error.source) + result = books.LoadResult([testutil.Transaction()], [error], options_map) + assert 10 <= result.returncode() < 64 + +def test_load_result_config_error(): + error = MockError("no books available") + result = books.LoadResult.empty(error) + assert result.returncode() == os.EX_CONFIG + +def test_load_result_no_entries(): + result = books.LoadResult.empty() + assert result.returncode() == os.EX_NOINPUT + +@pytest.mark.parametrize('arg_index,end_index', itertools.product( + range(2), + range(len(SEARCH_TERMS)), +)) +def test_load_result_iter_postings_one_filter_set(arg_index, end_index): + txn = testutil.Transaction(postings=[ + ('Expenses:Other', 20), + ('Expenses:BankingFees', 2), + ('Assets:Checking', -22), + ]) + result = books.LoadResult.empty() + result.entries.append(txn) + args = (SEARCH_TERMS[:end_index], ()) + if arg_index: + args = (args[1], args[0]) + actual = list(result.iter_postings(*args)) + expected = txn.postings[:-end_index or None] + assert len(actual) == len(expected) + for act_post, exp_post in zip(actual, expected): + assert act_post.account == exp_post.account + assert act_post.units == exp_post.units + +def test_load_result_iter_postings_both_filter_sets(): + txn = testutil.Transaction(postings=[ + ('Expenses:Other', 20), + ('Expenses:BankingFees', 2), + ('Assets:Checking', -22), + ]) + result = books.LoadResult.empty() + result.entries.append(txn) + actual = list(result.iter_postings(SEARCH_TERMS[:1], SEARCH_TERMS[1:])) + assert len(actual) == 1 + assert actual[0].account == txn.postings[0].account + assert actual[0].units == txn.postings[0].units + +def test_load_result_account_metadata(clean_account_meta): + accounts = ['Assets:Checking', 'Assets:Savings'] + result = books.LoadResult.empty() + result.options_map['name_liabilities'] = 'Problems' + result.entries.extend( + bc_data.Open({}, date(2017, 3, day), name, None, None) + for day, name in enumerate(accounts, 1) + ) + result.load_account_metadata() + for day, name in enumerate(accounts, 1): + assert data.Account(name).meta.open_date == date(2017, 3, day) + +@pytest.mark.parametrize('count', range(3)) +def test_print_errors(count): + error_lines = [75 + n for n in range(count)] + result = books.LoadResult.empty() + result.errors.extend( + MockError("printed error", lineno=lineno) + for lineno in error_lines + ) + with io.StringIO() as out_file: + actual = result.print_errors(out_file) + matches = list(re.finditer( + r'^test_books_loader\.py:(\d+):\s+printed error', + out_file.getvalue(), + re.MULTILINE, + )) + assert actual is bool(error_lines) + assert len(error_lines) == len(matches) + assert all(lineno == int(match.group(1)) for lineno, match in zip(error_lines, matches)) + @pytest.mark.parametrize('from_fy,to_fy,expect_years', [ (2019, 2019, range(2019, 2020)), (0, 2019, range(2019, 2020)), @@ -70,7 +183,7 @@ def test_load_fy_range_empty(conservancy_loader): entries, errors, options_map = conservancy_loader.load_fy_range(2020, 2019) assert not errors assert not entries - assert not options_map + assert options_map.get('filename') is None @pytest.mark.parametrize('from_year', [None, *range(2018, 2021)]) def test_load_all(conservancy_loader, from_year):