books: Add LoadResult NamedTuple.

This refactors out some common functionality from our CLI tools.
This commit is contained in:
Brett Smith 2021-02-19 11:34:35 -05:00
parent 0a34ed6798
commit f3c3ebcf59
3 changed files with 182 additions and 10 deletions

View file

@ -11,6 +11,11 @@ from pathlib import Path
import beancount.loader as bc_loader import beancount.loader as bc_loader
import beancount.parser.options as bc_options import beancount.parser.options as bc_options
import beancount.parser.printer as bc_printer
from . import cliutil
from . import data
from .reports import rewrite
from typing import ( from typing import (
Any, Any,
@ -19,12 +24,14 @@ from typing import (
Mapping, Mapping,
NamedTuple, NamedTuple,
Optional, Optional,
TextIO,
Union, Union,
) )
from .beancount_types import ( from .beancount_types import (
Entries,
Error, Error,
Errors, Errors,
LoadResult, OptionsMap,
) )
PathLike = Union[str, Path] PathLike = Union[str, Path]
@ -91,6 +98,57 @@ class FiscalYear(NamedTuple):
return range(from_fy, to_fy + 1) return range(from_fy, to_fy + 1)
class LoadResult(NamedTuple):
entries: Entries
errors: Errors
options_map: OptionsMap
@classmethod
def empty(cls, error: Optional[Error]=None) -> 'LoadResult':
errors: Errors = []
if error is not None:
errors.append(error)
return cls([], errors, bc_options.OPTIONS_DEFAULTS.copy())
def iter_postings(
self,
rewrites: Iterable[Union[Path, rewrite.RewriteRuleset]]=(),
search_terms: Iterable[cliutil.SearchTerm]=(),
) -> Iterator[data.Posting]:
postings = data.Posting.from_entries(self.entries)
for ruleset in rewrites:
if isinstance(ruleset, Path):
ruleset = rewrite.RewriteRuleset.from_yaml(ruleset)
postings = ruleset.rewrite(postings)
for search_term in search_terms:
postings = search_term.filter_postings(postings)
return postings
def load_account_metadata(self) -> None:
return data.Account.load_from_books(self.entries, self.options_map)
def print_errors(self, out_file: TextIO) -> bool:
for error in self.errors:
bc_printer.print_error(error, file=out_file)
try:
error
except NameError:
return False
else:
return True
def returncode(self) -> int:
if self.errors:
if self.entries:
return cliutil.ExitCode.BeancountErrors
else:
return cliutil.ExitCode.NoConfiguration
elif not self.entries:
return cliutil.ExitCode.NoDataLoaded
else:
return cliutil.ExitCode.OK
class Loader: class Loader:
"""Load Beancount books organized by fiscal year""" """Load Beancount books organized by fiscal year"""
@ -116,20 +174,20 @@ class Loader:
def _load_paths(self, paths: Iterator[Path]) -> LoadResult: def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
try: try:
entries, errors, options_map = bc_loader.load_file(next(paths)) result = LoadResult._make(bc_loader.load_file(next(paths)))
except StopIteration: except StopIteration:
entries, errors, options_map = [], [], {} result = LoadResult.empty()
for load_path in paths: for load_path in paths:
new_entries, new_errors, new_options = bc_loader.load_file(load_path) new_entries, new_errors, new_options = bc_loader.load_file(load_path)
# We only want transactions from the new fiscal year. # We only want transactions from the new fiscal year.
# We don't want the opening balance, duplicate definitions, etc. # We don't want the opening balance, duplicate definitions, etc.
fy_filename = str(load_path.parent.parent / load_path.name) fy_filename = str(load_path.parent.parent / load_path.name)
entries.extend( result.entries.extend(
entry for entry in new_entries entry for entry in new_entries
if entry.meta.get('filename') == fy_filename if entry.meta.get('filename') == fy_filename
) )
errors.extend(new_errors) result.errors.extend(new_errors)
return entries, errors, options_map return result
def _path_year(self, path: Path) -> int: def _path_year(self, path: Path) -> int:
return int(path.stem) return int(path.stem)
@ -195,5 +253,4 @@ class Loader:
'filename': str(config_path or 'conservancy_beancount.ini'), 'filename': str(config_path or 'conservancy_beancount.ini'),
'lineno': lineno, 'lineno': lineno,
} }
errors: Errors = [Error(source, "no books to load in configuration", None)] return LoadResult.empty(Error(source, "no books to load in configuration", None))
return [], errors, bc_options.OPTIONS_DEFAULTS.copy()

View file

@ -183,6 +183,8 @@ class ExitCode(enum.IntEnum):
NoConfig = NoConfiguration NoConfig = NoConfiguration
NoDataFiltered = os.EX_DATAERR NoDataFiltered = os.EX_DATAERR
NoDataLoaded = os.EX_NOINPUT NoDataLoaded = os.EX_NOINPUT
OK = os.EX_OK
Ok = OK
RewriteRulesError = os.EX_DATAERR RewriteRulesError = os.EX_DATAERR
# Our own exit codes, working down from that range # Our own exit codes, working down from that range

View file

@ -6,6 +6,9 @@
# LICENSE.txt in the repository. # LICENSE.txt in the repository.
import collections import collections
import io
import itertools
import os
import re import re
from datetime import date from datetime import date
@ -16,12 +19,36 @@ import pytest
from . import testutil from . import testutil
from beancount.core import data as bc_data from beancount.core import data as bc_data
from conservancy_beancount import books from conservancy_beancount import books, data
FY_START_MONTH = 3 FY_START_MONTH = 3
books_path = testutil.test_path('books') books_path = testutil.test_path('books')
class MockError(Exception):
def __init__(self, message, lineno=0):
self.message = message
self.entry = None
self.source = {'filename': 'test_books_loader.py', 'lineno': lineno}
class MockSearchTerm:
def __init__(self, pred):
self.pred = pred
def filter_postings(self, postings):
return (post for post in postings if self.pred(post))
rewrite = filter_postings
SEARCH_TERMS = [
MockSearchTerm(lambda post: post.account.startswith('Expenses:')),
MockSearchTerm(lambda post: post.units.number >= 10),
]
clean_account_meta = pytest.fixture()(testutil.clean_account_meta)
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def conservancy_loader(): def conservancy_loader():
return books.Loader(books_path, books.FiscalYear(FY_START_MONTH)) return books.Loader(books_path, books.FiscalYear(FY_START_MONTH))
@ -42,6 +69,92 @@ def txn_dates(entries):
def txn_years(entries): def txn_years(entries):
return frozenset(date.year for date in txn_dates(entries)) return frozenset(date.year for date in txn_dates(entries))
def test_load_result_returncode_ok():
options_map = {'filename': 'test_load_result_returncode_ok'}
result = books.LoadResult([testutil.Transaction()], [], options_map)
assert result.returncode() == 0
def test_load_result_beancount_errors():
error = MockError("empty transaction", lineno=65)
options_map = dict(error.source)
result = books.LoadResult([testutil.Transaction()], [error], options_map)
assert 10 <= result.returncode() < 64
def test_load_result_config_error():
error = MockError("no books available")
result = books.LoadResult.empty(error)
assert result.returncode() == os.EX_CONFIG
def test_load_result_no_entries():
result = books.LoadResult.empty()
assert result.returncode() == os.EX_NOINPUT
@pytest.mark.parametrize('arg_index,end_index', itertools.product(
range(2),
range(len(SEARCH_TERMS)),
))
def test_load_result_iter_postings_one_filter_set(arg_index, end_index):
txn = testutil.Transaction(postings=[
('Expenses:Other', 20),
('Expenses:BankingFees', 2),
('Assets:Checking', -22),
])
result = books.LoadResult.empty()
result.entries.append(txn)
args = (SEARCH_TERMS[:end_index], ())
if arg_index:
args = (args[1], args[0])
actual = list(result.iter_postings(*args))
expected = txn.postings[:-end_index or None]
assert len(actual) == len(expected)
for act_post, exp_post in zip(actual, expected):
assert act_post.account == exp_post.account
assert act_post.units == exp_post.units
def test_load_result_iter_postings_both_filter_sets():
txn = testutil.Transaction(postings=[
('Expenses:Other', 20),
('Expenses:BankingFees', 2),
('Assets:Checking', -22),
])
result = books.LoadResult.empty()
result.entries.append(txn)
actual = list(result.iter_postings(SEARCH_TERMS[:1], SEARCH_TERMS[1:]))
assert len(actual) == 1
assert actual[0].account == txn.postings[0].account
assert actual[0].units == txn.postings[0].units
def test_load_result_account_metadata(clean_account_meta):
accounts = ['Assets:Checking', 'Assets:Savings']
result = books.LoadResult.empty()
result.options_map['name_liabilities'] = 'Problems'
result.entries.extend(
bc_data.Open({}, date(2017, 3, day), name, None, None)
for day, name in enumerate(accounts, 1)
)
result.load_account_metadata()
for day, name in enumerate(accounts, 1):
assert data.Account(name).meta.open_date == date(2017, 3, day)
@pytest.mark.parametrize('count', range(3))
def test_print_errors(count):
error_lines = [75 + n for n in range(count)]
result = books.LoadResult.empty()
result.errors.extend(
MockError("printed error", lineno=lineno)
for lineno in error_lines
)
with io.StringIO() as out_file:
actual = result.print_errors(out_file)
matches = list(re.finditer(
r'^test_books_loader\.py:(\d+):\s+printed error',
out_file.getvalue(),
re.MULTILINE,
))
assert actual is bool(error_lines)
assert len(error_lines) == len(matches)
assert all(lineno == int(match.group(1)) for lineno, match in zip(error_lines, matches))
@pytest.mark.parametrize('from_fy,to_fy,expect_years', [ @pytest.mark.parametrize('from_fy,to_fy,expect_years', [
(2019, 2019, range(2019, 2020)), (2019, 2019, range(2019, 2020)),
(0, 2019, range(2019, 2020)), (0, 2019, range(2019, 2020)),
@ -70,7 +183,7 @@ def test_load_fy_range_empty(conservancy_loader):
entries, errors, options_map = conservancy_loader.load_fy_range(2020, 2019) entries, errors, options_map = conservancy_loader.load_fy_range(2020, 2019)
assert not errors assert not errors
assert not entries assert not entries
assert not options_map assert options_map.get('filename') is None
@pytest.mark.parametrize('from_year', [None, *range(2018, 2021)]) @pytest.mark.parametrize('from_year', [None, *range(2018, 2021)])
def test_load_all(conservancy_loader, from_year): def test_load_all(conservancy_loader, from_year):