331 lines
12 KiB
Python
331 lines
12 KiB
Python
"""books - Tools for loading the books
|
|
|
|
This module provides common functionality for loading books split by fiscal
|
|
year and doing common operations on the results.
|
|
"""
|
|
# Copyright © 2020 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import datetime
|
|
|
|
from pathlib import Path
|
|
|
|
import beancount.loader as bc_loader
|
|
import beancount.parser.options as bc_options
|
|
import beancount.parser.printer as bc_printer
|
|
|
|
from . import cliutil
|
|
from . import data
|
|
from .reports import rewrite
|
|
|
|
from typing import (
|
|
Any,
|
|
Iterable,
|
|
Iterator,
|
|
Mapping,
|
|
NamedTuple,
|
|
Optional,
|
|
Set,
|
|
TextIO,
|
|
Union,
|
|
)
|
|
from .beancount_types import (
|
|
Entries,
|
|
Error,
|
|
Errors,
|
|
OptionsMap,
|
|
)
|
|
|
|
PathLike = Union[str, Path]
|
|
Year = Union[int, datetime.date]
|
|
|
|
class FiscalYear(NamedTuple):
|
|
"""Convert to and from fiscal years and calendar dates
|
|
|
|
Given a month and date that a fiscal year starts, this class provides
|
|
methods to calculate the fiscal year of a given calendar date; to return
|
|
important calendar dates associated with the fiscal year; and iterate
|
|
fiscal years.
|
|
|
|
Most methods can accept either an int, representing a fiscal year;
|
|
or a date. When you pass a date, the method will calculate that date's
|
|
corresponding fiscal year, and use it as the argument.
|
|
"""
|
|
month: int = 3
|
|
day: int = 1
|
|
|
|
def for_date(self, date: Optional[datetime.date]=None) -> int:
|
|
"""Return the fiscal year of a given calendar date
|
|
|
|
The default date is today's date.
|
|
"""
|
|
if date is None:
|
|
date = datetime.date.today()
|
|
if (date.month, date.day) < self:
|
|
return date.year - 1
|
|
else:
|
|
return date.year
|
|
|
|
def first_date(self, year: Year) -> datetime.date:
|
|
"""Return the first calendar date of a fiscal year"""
|
|
if isinstance(year, datetime.date):
|
|
year = self.for_date(year)
|
|
return datetime.date(year, self.month, self.day)
|
|
|
|
def last_date(self, year: Year) -> datetime.date:
|
|
"""Return the last calendar date of a fiscal year"""
|
|
return self.next_fy_date(year) - datetime.timedelta(days=1)
|
|
|
|
def next_fy_date(self, year: Year) -> datetime.date:
|
|
"""Return the last calendar date of a fiscal year"""
|
|
if isinstance(year, datetime.date):
|
|
year = self.for_date(year)
|
|
return datetime.date(year + 1, self.month, self.day)
|
|
|
|
def range(self, from_fy: Year, to_fy: Optional[Year]=None) -> Iterable[int]:
|
|
"""Return a range of fiscal years
|
|
|
|
Both arguments can be either a year (represented as an integer) or a
|
|
date. Dates will be converted into a year by calling for_date() on
|
|
them.
|
|
|
|
If the first argument is negative or below 1000, it will be treated as
|
|
an offset. You'll get a range of fiscal years between the second
|
|
argument offset by this amount.
|
|
|
|
If the second argument is omitted, it defaults to the current fiscal
|
|
year.
|
|
|
|
Note that unlike normal Python ranges, these ranges include the final
|
|
fiscal year.
|
|
|
|
Examples:
|
|
|
|
range(2015) # Iterate all fiscal years from 2015 to today, inclusive
|
|
|
|
range(-1) # Iterate the previous fiscal year and current fiscal year
|
|
"""
|
|
if not isinstance(from_fy, int):
|
|
from_fy = self.for_date(from_fy)
|
|
if to_fy is None:
|
|
to_fy = self.for_date()
|
|
elif not isinstance(to_fy, int):
|
|
to_fy = self.for_date(to_fy - datetime.timedelta(days=1))
|
|
if from_fy < 1:
|
|
from_fy += to_fy
|
|
elif from_fy < 1000:
|
|
from_fy, to_fy = to_fy, from_fy + to_fy
|
|
return range(from_fy, to_fy + 1)
|
|
|
|
|
|
class LoadResult(NamedTuple):
|
|
"""Common functionality for loaded books
|
|
|
|
This class is type-compatible with the return value of the loader
|
|
functions in ``beancount.loader``. This provides named access to the
|
|
results, as well as common functionality methods.
|
|
"""
|
|
entries: Entries
|
|
errors: Errors
|
|
options_map: OptionsMap
|
|
|
|
@classmethod
|
|
def empty(cls, error: Optional[Error]=None) -> 'LoadResult':
|
|
"""Create a return result that represents nothing loaded
|
|
|
|
If an error is provided, it will be the sole error reported.
|
|
|
|
This method is useful to create a LoadResult when one can't be
|
|
created normally; e.g., because a books path is not properly configured.
|
|
"""
|
|
errors: Errors = []
|
|
if error is not None:
|
|
errors.append(error)
|
|
return cls([], errors, bc_options.OPTIONS_DEFAULTS.copy())
|
|
|
|
def iter_postings(
|
|
self,
|
|
rewrites: Iterable[Union[Path, rewrite.RewriteRuleset]]=(),
|
|
search_terms: Iterable[cliutil.SearchTerm]=(),
|
|
) -> Iterator[data.Posting]:
|
|
"""Iterate all the postings in this LoadResult
|
|
|
|
If ``rewrites`` are provided, postings will be passed through them all.
|
|
See the ``reports.rewrite`` pydoc for details.
|
|
|
|
If ``search_terms`` are provided, postings will be filtered through
|
|
them all. See the ``cliutil.SearchTerm`` pydoc for details.
|
|
"""
|
|
postings = data.Posting.from_entries(self.entries)
|
|
for ruleset in rewrites:
|
|
if isinstance(ruleset, Path):
|
|
ruleset = rewrite.RewriteRuleset.from_yaml(ruleset)
|
|
postings = ruleset.rewrite(postings)
|
|
for search_term in search_terms:
|
|
postings = search_term.filter_postings(postings)
|
|
return postings
|
|
|
|
def load_account_metadata(self) -> None:
|
|
"""Load account metadata from this LoadResult"""
|
|
return data.Account.load_from_books(self.entries, self.options_map)
|
|
|
|
def print_errors(self, out_file: TextIO) -> bool:
|
|
"""Report errors from this LoadResult to ``out_file``
|
|
|
|
Returns True if errors were reported, False otherwise.
|
|
"""
|
|
for error in self.errors:
|
|
bc_printer.print_error(error, file=out_file)
|
|
try:
|
|
error
|
|
except NameError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def returncode(self) -> int:
|
|
"""Return an appropriate Unix exit code for this LoadResult
|
|
|
|
If this LoadResult has errors, or no entries, return an exit code that
|
|
best represents that. Otherwise, return the standard OK exit code 0.
|
|
"""
|
|
if self.errors:
|
|
if self.entries:
|
|
return cliutil.ExitCode.BeancountErrors
|
|
else:
|
|
return cliutil.ExitCode.NoConfiguration
|
|
elif not self.entries:
|
|
return cliutil.ExitCode.NoDataLoaded
|
|
else:
|
|
return cliutil.ExitCode.OK
|
|
|
|
|
|
class Loader:
|
|
"""Load Beancount books organized by fiscal year"""
|
|
|
|
def __init__(self,
|
|
books_root: Path,
|
|
fiscal_year: FiscalYear,
|
|
) -> None:
|
|
"""Set up a books loader
|
|
|
|
Arguments:
|
|
* books_root: A Path to a Beancount books checkout.
|
|
* fiscal_year: A FiscalYear object, used to determine what books to
|
|
load for a given date range.
|
|
"""
|
|
self.books_root = books_root
|
|
self.fiscal_year = fiscal_year
|
|
|
|
def _iter_fy_books(self, fy_range: Iterable[int]) -> Iterator[Path]:
|
|
for year in fy_range:
|
|
path = Path(self.books_root, 'books', f'{year}.beancount')
|
|
if path.exists():
|
|
yield path
|
|
|
|
def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
|
|
try:
|
|
result = LoadResult._make(bc_loader.load_file(next(paths)))
|
|
except StopIteration:
|
|
result = LoadResult.empty()
|
|
seen_files: Set[str] = set(result.options_map['include'])
|
|
for load_path in paths:
|
|
new_entries, new_errors, new_options = bc_loader.load_file(load_path)
|
|
# We only want transactions from the new fiscal year.
|
|
# We don't want the opening balance, duplicate definitions, etc.
|
|
seen_files.add(new_options['filename'])
|
|
result.entries.extend(
|
|
entry for entry in new_entries
|
|
if entry.meta.get('filename') not in seen_files
|
|
)
|
|
result.errors.extend(new_errors)
|
|
seen_files.update(new_options['include'])
|
|
result.options_map['include'] = list(seen_files)
|
|
return result
|
|
|
|
def _path_year(self, path: Path) -> int:
|
|
return int(path.stem)
|
|
|
|
def load_all(self, from_year: Optional[Year]=None) -> LoadResult:
|
|
"""Load all of the books from a starting FY
|
|
|
|
This method loads all of the books, starting from the fiscal year you
|
|
specify.
|
|
|
|
* Pass in a date to start from the FY for that date.
|
|
* Pass in an integer >= 1000 to start from that year.
|
|
* Pass in a smaller integer to start from an FY relative to today
|
|
(e.g., -2 starts two FYs before today).
|
|
* Pass is no argument to load all books from the first available FY.
|
|
|
|
This method finds books by globbing the filesystem. It still loads
|
|
each fiscal year in sequence to provide the best cache utilization.
|
|
"""
|
|
path = Path(self.books_root, 'books')
|
|
fy_paths = list(path.glob('[1-9][0-9][0-9][0-9].beancount'))
|
|
fy_paths.sort(key=self._path_year)
|
|
if from_year is not None:
|
|
if not isinstance(from_year, int):
|
|
from_year = self.fiscal_year.for_date(from_year)
|
|
elif from_year < 1000:
|
|
from_year = self.fiscal_year.for_date() + from_year
|
|
for index, path in enumerate(fy_paths):
|
|
if self._path_year(path) >= from_year:
|
|
fy_paths = fy_paths[index:]
|
|
break
|
|
else:
|
|
fy_paths = []
|
|
return self._load_paths(iter(fy_paths))
|
|
|
|
def load_fy_range(self,
|
|
from_fy: Year,
|
|
to_fy: Optional[Year]=None,
|
|
) -> LoadResult:
|
|
"""Load books for a range of fiscal years
|
|
|
|
This method generates a range of fiscal years by calling
|
|
FiscalYear.range() with its arguments. It loads all the books within
|
|
that range.
|
|
"""
|
|
fy_range = self.fiscal_year.range(from_fy, to_fy)
|
|
fy_paths = self._iter_fy_books(fy_range)
|
|
return self._load_paths(fy_paths)
|
|
|
|
@classmethod
|
|
def load_none(cls, config_path: Optional[PathLike]=None, lineno: int=0) -> LoadResult:
|
|
"""Load no books and generate an error about it
|
|
|
|
This is a convenience method for reporting tools that already handle
|
|
general Beancount errors. If a configuration problem prevents them from
|
|
loading the books, they can call this method in place of a regular
|
|
loading method, and then continue on their normal code path.
|
|
|
|
The path and line number given in the arguments will be named as the
|
|
source of the error.
|
|
"""
|
|
source = {
|
|
'filename': str(config_path or 'conservancy_beancount.ini'),
|
|
'lineno': lineno,
|
|
}
|
|
return LoadResult.empty(Error(source, "no books to load in configuration", None))
|
|
|
|
@classmethod
|
|
def dispatch(cls,
|
|
loader: Optional['Loader'],
|
|
from_fy: Optional[Year]=None,
|
|
to_fy: Optional[Year]=None,
|
|
) -> LoadResult:
|
|
"""High-level, "do-what-I-mean"-ish books loader
|
|
|
|
Most tools can call this with a books loader from configuration, plus
|
|
one or two fiscal year arguments, to get the LoadResult they want.
|
|
"""
|
|
if loader is None:
|
|
return cls.load_none()
|
|
elif to_fy is None:
|
|
return loader.load_all(from_fy)
|
|
else:
|
|
return loader.load_fy_range(from_fy or 0, to_fy)
|