conservancy_beancount/conservancy_beancount/books.py

274 lines
9.2 KiB
Python
Raw Normal View History

2020-04-20 21:20:26 +00:00
"""books - Tools for loading the books"""
# Copyright © 2020 Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
2020-04-20 21:20:26 +00:00
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.
2020-04-20 21:20:26 +00:00
import datetime
2020-04-21 14:47:13 +00:00
from pathlib import Path
import beancount.loader as bc_loader
import beancount.parser.options as bc_options
import beancount.parser.printer as bc_printer
from . import cliutil
from . import data
from .reports import rewrite
2020-04-20 21:20:26 +00:00
from typing import (
Any,
2020-04-20 21:20:26 +00:00
Iterable,
Iterator,
2020-04-21 14:47:13 +00:00
Mapping,
2020-04-20 21:20:26 +00:00
NamedTuple,
Optional,
Set,
TextIO,
2020-04-20 21:20:26 +00:00
Union,
)
from .beancount_types import (
Entries,
2020-06-07 13:04:53 +00:00
Error,
Errors,
OptionsMap,
)
2020-04-20 21:20:26 +00:00
2020-04-21 14:47:13 +00:00
PathLike = Union[str, Path]
Year = Union[int, datetime.date]
2020-04-20 21:20:26 +00:00
class FiscalYear(NamedTuple):
month: int = 3
day: int = 1
def for_date(self, date: Optional[datetime.date]=None) -> int:
if date is None:
date = datetime.date.today()
if (date.month, date.day) < self:
return date.year - 1
else:
return date.year
def first_date(self, year: Year) -> datetime.date:
if isinstance(year, datetime.date):
year = self.for_date(year)
return datetime.date(year, self.month, self.day)
def last_date(self, year: Year) -> datetime.date:
return self.next_fy_date(year) - datetime.timedelta(days=1)
def next_fy_date(self, year: Year) -> datetime.date:
if isinstance(year, datetime.date):
year = self.for_date(year)
return datetime.date(year + 1, self.month, self.day)
def range(self, from_fy: Year, to_fy: Optional[Year]=None) -> Iterable[int]:
2020-04-20 21:20:26 +00:00
"""Return a range of fiscal years
Both arguments can be either a year (represented as an integer) or a
date. Dates will be converted into a year by calling for_date() on
them.
If the first argument is negative or below 1000, it will be treated as
an offset. You'll get a range of fiscal years between the second
argument offset by this amount.
If the second argument is omitted, it defaults to the current fiscal
year.
Note that unlike normal Python ranges, these ranges include the final
fiscal year.
Examples:
range(2015) # Iterate all fiscal years from 2015 to today, inclusive
range(-1) # Iterate the previous fiscal year and current fiscal year
"""
if not isinstance(from_fy, int):
from_fy = self.for_date(from_fy)
if to_fy is None:
to_fy = self.for_date()
elif not isinstance(to_fy, int):
to_fy = self.for_date(to_fy - datetime.timedelta(days=1))
if from_fy < 1:
from_fy += to_fy
elif from_fy < 1000:
from_fy, to_fy = to_fy, from_fy + to_fy
return range(from_fy, to_fy + 1)
2020-04-21 14:47:13 +00:00
class LoadResult(NamedTuple):
entries: Entries
errors: Errors
options_map: OptionsMap
@classmethod
def empty(cls, error: Optional[Error]=None) -> 'LoadResult':
errors: Errors = []
if error is not None:
errors.append(error)
return cls([], errors, bc_options.OPTIONS_DEFAULTS.copy())
def iter_postings(
self,
rewrites: Iterable[Union[Path, rewrite.RewriteRuleset]]=(),
search_terms: Iterable[cliutil.SearchTerm]=(),
) -> Iterator[data.Posting]:
postings = data.Posting.from_entries(self.entries)
for ruleset in rewrites:
if isinstance(ruleset, Path):
ruleset = rewrite.RewriteRuleset.from_yaml(ruleset)
postings = ruleset.rewrite(postings)
for search_term in search_terms:
postings = search_term.filter_postings(postings)
return postings
def load_account_metadata(self) -> None:
return data.Account.load_from_books(self.entries, self.options_map)
def print_errors(self, out_file: TextIO) -> bool:
for error in self.errors:
bc_printer.print_error(error, file=out_file)
try:
error
except NameError:
return False
else:
return True
def returncode(self) -> int:
if self.errors:
if self.entries:
return cliutil.ExitCode.BeancountErrors
else:
return cliutil.ExitCode.NoConfiguration
elif not self.entries:
return cliutil.ExitCode.NoDataLoaded
else:
return cliutil.ExitCode.OK
2020-04-21 14:47:13 +00:00
class Loader:
"""Load Beancount books organized by fiscal year"""
def __init__(self,
books_root: Path,
fiscal_year: FiscalYear,
) -> None:
"""Set up a books loader
Arguments:
* books_root: A Path to a Beancount books checkout.
* fiscal_year: A FiscalYear object, used to determine what books to
load for a given date range.
"""
self.books_root = books_root
self.fiscal_year = fiscal_year
def _iter_fy_books(self, fy_range: Iterable[int]) -> Iterator[Path]:
for year in fy_range:
path = Path(self.books_root, 'books', f'{year}.beancount')
if path.exists():
yield path
2020-04-21 14:47:13 +00:00
2020-05-25 14:37:21 +00:00
def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
2020-04-21 14:47:13 +00:00
try:
result = LoadResult._make(bc_loader.load_file(next(paths)))
2020-04-21 14:47:13 +00:00
except StopIteration:
result = LoadResult.empty()
seen_files: Set[str] = set(result.options_map['include'])
2020-05-25 14:37:21 +00:00
for load_path in paths:
new_entries, new_errors, new_options = bc_loader.load_file(load_path)
# We only want transactions from the new fiscal year.
# We don't want the opening balance, duplicate definitions, etc.
seen_files.add(new_options['filename'])
result.entries.extend(
entry for entry in new_entries
if entry.meta.get('filename') not in seen_files
)
result.errors.extend(new_errors)
seen_files.update(new_options['include'])
result.options_map['include'] = list(seen_files)
return result
2020-05-25 14:37:21 +00:00
def _path_year(self, path: Path) -> int:
return int(path.stem)
2020-05-25 14:37:21 +00:00
def load_all(self, from_year: Optional[Year]=None) -> LoadResult:
"""Load all of the books from a starting FY
This method loads all of the books, starting from the fiscal year you
specify.
* Pass in a date to start from the FY for that date.
* Pass in an integer >= 1000 to start from that year.
* Pass in a smaller integer to start from an FY relative to today
(e.g., -2 starts two FYs before today).
* Pass is no argument to load all books from the first available FY.
This method finds books by globbing the filesystem. It still loads
each fiscal year in sequence to provide the best cache utilization.
2020-05-25 14:37:21 +00:00
"""
path = Path(self.books_root, 'books')
fy_paths = list(path.glob('[1-9][0-9][0-9][0-9].beancount'))
fy_paths.sort(key=self._path_year)
if from_year is not None:
if not isinstance(from_year, int):
from_year = self.fiscal_year.for_date(from_year)
elif from_year < 1000:
from_year = self.fiscal_year.for_date() + from_year
for index, path in enumerate(fy_paths):
if self._path_year(path) >= from_year:
fy_paths = fy_paths[index:]
break
else:
fy_paths = []
2020-05-25 14:37:21 +00:00
return self._load_paths(iter(fy_paths))
def load_fy_range(self,
from_fy: Year,
to_fy: Optional[Year]=None,
) -> LoadResult:
"""Load books for a range of fiscal years
This method generates a range of fiscal years by calling
FiscalYear.range() with its arguments. It loads all the books within
that range.
2020-05-25 14:37:21 +00:00
"""
fy_range = self.fiscal_year.range(from_fy, to_fy)
fy_paths = self._iter_fy_books(fy_range)
return self._load_paths(fy_paths)
2020-06-07 13:04:53 +00:00
@classmethod
def load_none(cls, config_path: Optional[PathLike]=None, lineno: int=0) -> LoadResult:
"""Load no books and generate an error about it
This is a convenience method for reporting tools that already handle
general Beancount errors. If a configuration problem prevents them from
loading the books, they can call this method in place of a regular
loading method, and then continue on their normal code path.
The path and line number given in the arguments will be named as the
source of the error.
"""
source = {
'filename': str(config_path or 'conservancy_beancount.ini'),
'lineno': lineno,
}
return LoadResult.empty(Error(source, "no books to load in configuration", None))
@classmethod
def dispatch(cls,
loader: Optional['Loader'],
from_fy: Optional[Year]=None,
to_fy: Optional[Year]=None,
) -> LoadResult:
if loader is None:
return cls.load_none()
elif to_fy is None:
return loader.load_all(from_fy)
else:
return loader.load_fy_range(from_fy or 0, to_fy)