"""Enhanced Beancount data structures for Conservancy""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import collections.abc from beancount.core import account as bc_account from typing import ( Iterable, Iterator, Optional, ) from .beancount_types import ( MetaKey, MetaValue, Posting, Transaction, ) class Account(str): SEP = bc_account.sep def is_under(self, acct_s: str) -> bool: return self.startswith(acct_s) and ( acct_s.endswith(self.SEP) or self == acct_s or self[len(acct_s)] == self.SEP ) class PostingMeta(collections.abc.MutableMapping): def __init__(self, txn: Transaction, index: int, post: Optional[Posting]=None) -> None: if post is None: post = txn.postings[index] self.txn = txn self.index = index self.post = post def __iter__(self) -> Iterator[MetaKey]: keys: Iterable[MetaKey] if self.post.meta is None: keys = self.txn.meta.keys() else: keys = frozenset(self.post.meta.keys()).union(self.txn.meta.keys()) return iter(keys) def __len__(self) -> int: return sum(1 for _ in self) def __getitem__(self, key: MetaKey) -> MetaValue: if self.post.meta: try: return self.post.meta[key] except KeyError: pass return self.txn.meta[key] def __setitem__(self, key: MetaKey, value: MetaValue) -> None: if self.post.meta is None: self.post = self.post._replace(meta={key: value}) self.txn.postings[self.index] = self.post else: self.post.meta[key] = value def __delitem__(self, key: MetaKey) -> None: if self.post.meta is None: raise KeyError(key) else: del self.post.meta[key] def iter_postings(txn: Transaction) -> Iterator[Posting]: for index, source in enumerate(txn.postings): yield source._replace( meta=PostingMeta(txn, index, source), )