1030 lines
38 KiB
Python
1030 lines
38 KiB
Python
"""core.py - Common data classes for reporting functionality"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import abc
|
|
import collections
|
|
import datetime
|
|
import itertools
|
|
import operator
|
|
import re
|
|
import urllib.parse as urlparse
|
|
|
|
import babel.core # type:ignore[import]
|
|
import babel.numbers # type:ignore[import]
|
|
|
|
import odf.config # type:ignore[import]
|
|
import odf.element # type:ignore[import]
|
|
import odf.number # type:ignore[import]
|
|
import odf.opendocument # type:ignore[import]
|
|
import odf.style # type:ignore[import]
|
|
import odf.table # type:ignore[import]
|
|
import odf.text # type:ignore[import]
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
from beancount.core import amount as bc_amount
|
|
|
|
from .. import data
|
|
from .. import filters
|
|
from .. import rtutil
|
|
|
|
from typing import (
|
|
cast,
|
|
overload,
|
|
Any,
|
|
BinaryIO,
|
|
Callable,
|
|
Dict,
|
|
Generic,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
MutableMapping,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
from ..beancount_types import (
|
|
MetaKey,
|
|
MetaValue,
|
|
)
|
|
|
|
DecimalCompat = data.DecimalCompat
|
|
BalanceType = TypeVar('BalanceType', bound='Balance')
|
|
ElementType = Callable[..., odf.element.Element]
|
|
LinkType = Union[str, Tuple[str, Optional[str]]]
|
|
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
|
|
RT = TypeVar('RT', bound=Sequence)
|
|
ST = TypeVar('ST')
|
|
T = TypeVar('T')
|
|
|
|
class Balance(Mapping[str, data.Amount]):
|
|
"""A collection of amounts mapped by currency
|
|
|
|
Each key is a Beancount currency string, and each value represents the
|
|
balance in that currency.
|
|
"""
|
|
__slots__ = ('_currency_map', 'tolerance')
|
|
TOLERANCE = Decimal('0.01')
|
|
|
|
def __init__(self,
|
|
source: Iterable[data.Amount]=(),
|
|
tolerance: Optional[Decimal]=None,
|
|
) -> None:
|
|
if tolerance is None:
|
|
tolerance = self.TOLERANCE
|
|
self.tolerance = tolerance
|
|
self._currency_map: Dict[str, data.Amount] = {}
|
|
for amount in source:
|
|
self._add_amount(self._currency_map, amount)
|
|
|
|
def _add_amount(self,
|
|
currency_map: MutableMapping[str, data.Amount],
|
|
amount: data.Amount,
|
|
) -> None:
|
|
code = amount.currency
|
|
try:
|
|
current_number = currency_map[code].number
|
|
except KeyError:
|
|
current_number = Decimal(0)
|
|
currency_map[code] = data.Amount(current_number + amount.number, code)
|
|
|
|
def _add_other(self,
|
|
currency_map: MutableMapping[str, data.Amount],
|
|
other: Union[data.Amount, 'Balance'],
|
|
) -> None:
|
|
if isinstance(other, Balance):
|
|
for amount in other.values():
|
|
self._add_amount(currency_map, amount)
|
|
else:
|
|
self._add_amount(currency_map, other)
|
|
|
|
def __repr__(self) -> str:
|
|
values = [repr(amt) for amt in self.values()]
|
|
return f"{type(self).__name__}({values!r})"
|
|
|
|
def __str__(self) -> str:
|
|
return self.format()
|
|
|
|
def __abs__(self: BalanceType) -> BalanceType:
|
|
return type(self)(bc_amount.abs(amt) for amt in self.values())
|
|
|
|
def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
|
|
retval_map = self._currency_map.copy()
|
|
self._add_other(retval_map, other)
|
|
return type(self)(retval_map.values())
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
if isinstance(other, Balance):
|
|
clean_self = self.clean_copy()
|
|
clean_other = other.clean_copy()
|
|
return len(clean_self) == len(clean_other) and all(
|
|
clean_self[key] == clean_other.get(key) for key in clean_self
|
|
)
|
|
else:
|
|
return super().__eq__(other)
|
|
|
|
def __neg__(self: BalanceType) -> BalanceType:
|
|
return type(self)(-amt for amt in self.values())
|
|
|
|
def __pos__(self: BalanceType) -> BalanceType:
|
|
return self
|
|
|
|
def __getitem__(self, key: str) -> data.Amount:
|
|
return self._currency_map[key]
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
return iter(self._currency_map)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._currency_map)
|
|
|
|
def _all_amounts(self,
|
|
op_func: Callable[[DecimalCompat, DecimalCompat], bool],
|
|
operand: DecimalCompat,
|
|
) -> bool:
|
|
return all(op_func(amt.number, operand) for amt in self.values())
|
|
|
|
def copy(self: BalanceType) -> BalanceType:
|
|
return type(self)(self.values())
|
|
|
|
def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
|
|
if tolerance is None:
|
|
tolerance = self.tolerance
|
|
return type(self)(
|
|
amount for amount in self.values()
|
|
if abs(amount.number) >= tolerance
|
|
)
|
|
|
|
@staticmethod
|
|
def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
|
|
dec = cast(Decimal, dec)
|
|
return abs(dec) < tolerance
|
|
|
|
def eq_zero(self) -> bool:
|
|
"""Returns true if all amounts in the balance == 0, within tolerance."""
|
|
return self._all_amounts(self.within_tolerance, self.tolerance)
|
|
|
|
is_zero = eq_zero
|
|
|
|
def ge_zero(self) -> bool:
|
|
"""Returns true if all amounts in the balance >= 0, within tolerance."""
|
|
op_func = operator.gt if self.tolerance else operator.ge
|
|
return self._all_amounts(op_func, -self.tolerance)
|
|
|
|
def le_zero(self) -> bool:
|
|
"""Returns true if all amounts in the balance <= 0, within tolerance."""
|
|
op_func = operator.lt if self.tolerance else operator.le
|
|
return self._all_amounts(op_func, self.tolerance)
|
|
|
|
def format(self,
|
|
fmt: Optional[str]='#,#00.00 ¤¤',
|
|
sep: str=', ',
|
|
empty: str="Zero balance",
|
|
tolerance: Optional[Decimal]=None,
|
|
) -> str:
|
|
"""Formats the balance as a string with the given parameters
|
|
|
|
If the balance is zero (within tolerance), returns ``empty``.
|
|
Otherwise, returns a string with each amount in the balance formatted
|
|
as ``fmt``, separated by ``sep``.
|
|
|
|
If you set ``fmt`` to None, amounts will be formatted according to the
|
|
user's locale. The default format is Beancount's input format.
|
|
"""
|
|
amounts = list(self.clean_copy(tolerance).values())
|
|
if not amounts:
|
|
return empty
|
|
amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
|
|
return sep.join(
|
|
babel.numbers.format_currency(amt.number, amt.currency, fmt)
|
|
for amt in amounts
|
|
)
|
|
|
|
|
|
class MutableBalance(Balance):
|
|
__slots__ = ()
|
|
|
|
def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
|
|
self._add_other(self._currency_map, other)
|
|
return self
|
|
|
|
|
|
class RelatedPostings(Sequence[data.Posting]):
|
|
"""Collect and query related postings
|
|
|
|
This class provides common functionality for collecting related postings
|
|
and running queries on them: iterating over them, tallying their balance,
|
|
etc.
|
|
|
|
This class doesn't know anything about how the postings are related. That's
|
|
entirely up to the caller.
|
|
|
|
A common pattern is to use this class with collections.defaultdict
|
|
to organize postings based on some key. See the group_by_meta classmethod
|
|
for an example.
|
|
"""
|
|
__slots__ = ('_postings',)
|
|
|
|
def __init__(self,
|
|
source: Iterable[data.Posting]=(),
|
|
*,
|
|
_can_own: bool=False,
|
|
) -> None:
|
|
self._postings: List[data.Posting]
|
|
if _can_own and isinstance(source, list):
|
|
self._postings = source
|
|
else:
|
|
self._postings = list(source)
|
|
|
|
@classmethod
|
|
def _group_by(cls: Type[RelatedType],
|
|
postings: Iterable[data.Posting],
|
|
key: Callable[[data.Posting], T],
|
|
) -> Iterator[Tuple[T, RelatedType]]:
|
|
mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
|
|
for post in postings:
|
|
mapping[key(post)].append(post)
|
|
for value, posts in mapping.items():
|
|
yield value, cls(posts, _can_own=True)
|
|
|
|
@classmethod
|
|
def group_by_meta(cls: Type[RelatedType],
|
|
postings: Iterable[data.Posting],
|
|
key: MetaKey,
|
|
default: Optional[MetaValue]=None,
|
|
) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
|
|
"""Relate postings by metadata value
|
|
|
|
This method takes an iterable of postings and returns a mapping.
|
|
The keys of the mapping are the values of post.meta.get(key, default).
|
|
The values are RelatedPostings instances that contain all the postings
|
|
that had that same metadata value.
|
|
"""
|
|
def key_func(post: data.Posting) -> Optional[MetaValue]:
|
|
return post.meta.get(key, default)
|
|
return cls._group_by(postings, key_func)
|
|
|
|
@classmethod
|
|
def group_by_first_meta_link(
|
|
cls: Type[RelatedType],
|
|
postings: Iterable[data.Posting],
|
|
key: MetaKey,
|
|
) -> Iterator[Tuple[Optional[str], RelatedType]]:
|
|
"""Relate postings by the first link in metadata
|
|
|
|
This method takes an iterable of postings and returns a mapping.
|
|
The keys of the mapping are the values of
|
|
post.meta.first_link(key, None).
|
|
The values are RelatedPostings instances that contain all the postings
|
|
that had that same first metadata link.
|
|
"""
|
|
def key_func(post: data.Posting) -> Optional[MetaValue]:
|
|
return post.meta.first_link(key, None)
|
|
return cls._group_by(postings, key_func)
|
|
|
|
def __repr__(self) -> str:
|
|
return f'<{type(self).__name__} {self._postings!r}>'
|
|
|
|
@overload
|
|
def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
|
|
|
|
@overload
|
|
def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
|
|
|
|
def __getitem__(self: RelatedType,
|
|
index: Union[int, slice],
|
|
) -> Union[data.Posting, RelatedType]:
|
|
if isinstance(index, slice):
|
|
return type(self)(self._postings[index], _can_own=True)
|
|
else:
|
|
return self._postings[index]
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._postings)
|
|
|
|
def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
|
|
for post in self:
|
|
try:
|
|
yield from post.meta.get_links(key)
|
|
except TypeError:
|
|
pass
|
|
|
|
def all_meta_links(self, key: MetaKey) -> Iterator[str]:
|
|
return filters.iter_unique(self._all_meta_links(key))
|
|
|
|
@overload
|
|
def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
|
|
|
|
@overload
|
|
def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
|
|
|
|
def first_meta_links(self,
|
|
key: MetaKey,
|
|
default: Optional[str]='',
|
|
) -> Iterator[Optional[str]]:
|
|
retval = filters.iter_unique(
|
|
post.meta.first_link(key, default) for post in self
|
|
)
|
|
if default == '':
|
|
retval = (s for s in retval if s)
|
|
return retval
|
|
|
|
def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
|
|
balance = MutableBalance()
|
|
for post in self:
|
|
balance += post.units
|
|
yield post, balance
|
|
|
|
def balance(self) -> Balance:
|
|
for _, balance in self.iter_with_balance():
|
|
pass
|
|
try:
|
|
return balance
|
|
except NameError:
|
|
return Balance()
|
|
|
|
def balance_at_cost(self) -> Balance:
|
|
balance = MutableBalance()
|
|
for post in self:
|
|
if post.cost is None:
|
|
balance += post.units
|
|
else:
|
|
number = post.units.number * post.cost.number
|
|
balance += data.Amount(number, post.cost.currency)
|
|
return balance
|
|
|
|
def meta_values(self,
|
|
key: MetaKey,
|
|
default: Optional[MetaValue]=None,
|
|
) -> Set[Optional[MetaValue]]:
|
|
return {post.meta.get(key, default) for post in self}
|
|
|
|
|
|
class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
|
|
"""Abstract base class to help write spreadsheets
|
|
|
|
This class provides the very core logic to write an arbitrary set of data
|
|
rows to arbitrary output. It calls hooks when it starts writing the
|
|
spreadsheet, starts a new "section" of rows, ends a section, and ends the
|
|
spreadsheet.
|
|
|
|
RT is the type of the input data rows. ST is the type of the section
|
|
identifier that you create from each row. If you don't want to use the
|
|
section logic at all, set ST to None and define section_key to return None.
|
|
"""
|
|
|
|
@abc.abstractmethod
|
|
def section_key(self, row: RT) -> ST:
|
|
"""Return the section a row belongs to
|
|
|
|
Given a data row, this method should return some identifier for the
|
|
"section" the row belongs to. The write method uses this to
|
|
determine when to call start_section and end_section.
|
|
|
|
If your spreadsheet doesn't need sections, define this to return None.
|
|
"""
|
|
...
|
|
|
|
@abc.abstractmethod
|
|
def write_row(self, row: RT) -> None:
|
|
"""Write a data row to the output spreadsheet
|
|
|
|
This method is called once for each data row in the input.
|
|
"""
|
|
...
|
|
|
|
# The next four methods are all called by the write method when the name
|
|
# says. You may override them to output headers or sums, record
|
|
# state, etc. The default implementations are all noops.
|
|
|
|
def start_spreadsheet(self) -> None:
|
|
pass
|
|
|
|
def start_section(self, key: ST) -> None:
|
|
pass
|
|
|
|
def end_section(self, key: ST) -> None:
|
|
pass
|
|
|
|
def end_spreadsheet(self) -> None:
|
|
pass
|
|
|
|
def write(self, rows: Iterable[RT]) -> None:
|
|
prev_section: Optional[ST] = None
|
|
self.start_spreadsheet()
|
|
for row in rows:
|
|
section = self.section_key(row)
|
|
if section != prev_section:
|
|
if prev_section is not None:
|
|
self.end_section(prev_section)
|
|
self.start_section(section)
|
|
prev_section = section
|
|
self.write_row(row)
|
|
try:
|
|
should_end = section is not None
|
|
except NameError:
|
|
should_end = False
|
|
if should_end:
|
|
self.end_section(section)
|
|
self.end_spreadsheet()
|
|
|
|
|
|
class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
|
|
"""Abstract base class to help write OpenDocument spreadsheets
|
|
|
|
This class provides the very core logic to write an arbitrary set of data
|
|
rows to an OpenDocument spreadsheet. It provides helper methods for
|
|
building sheets, rows, and cells.
|
|
|
|
See also the BaseSpreadsheet base class for additional documentation about
|
|
methods you must and can define, the definition of RT and ST, etc.
|
|
"""
|
|
def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
|
|
self.rt_wrapper = rt_wrapper
|
|
self.locale = babel.core.Locale.default('LC_MONETARY')
|
|
self.currency_fmt_key = 'accounting'
|
|
self._name_counter = itertools.count(1)
|
|
self._currency_style_cache: MutableMapping[str, odf.style.Style] = {}
|
|
self.document = odf.opendocument.OpenDocumentSpreadsheet()
|
|
self.init_settings()
|
|
self.init_styles()
|
|
self.sheet = self.use_sheet("Report")
|
|
|
|
### Low-level document tree manipulation
|
|
# The *intent* is that you only need to use these if you're adding new
|
|
# methods to manipulate document settings or styles.
|
|
|
|
def copy_element(self, elem: odf.element.Element) -> odf.element.Element:
|
|
qattrs = dict(self.iter_qattributes(elem))
|
|
retval = odf.element.Element(qname=elem.qname, qattributes=qattrs)
|
|
try:
|
|
orig_name = retval.getAttribute('name')
|
|
except ValueError:
|
|
orig_name = None
|
|
if orig_name is not None:
|
|
retval.setAttribute('name', f'{orig_name}{next(self._name_counter)}')
|
|
return retval
|
|
|
|
def ensure_child(self,
|
|
parent: odf.element.Element,
|
|
child_type: ElementType,
|
|
**kwargs: Any,
|
|
) -> odf.element.Element:
|
|
new_child = child_type(**kwargs)
|
|
found_child = self.find_child(parent, new_child)
|
|
if found_child is None:
|
|
parent.addElement(new_child)
|
|
return parent.lastChild
|
|
else:
|
|
return found_child
|
|
|
|
def ensure_config_map_entry(self,
|
|
root: odf.element.Element,
|
|
map_name: str,
|
|
entry_name: str,
|
|
) -> odf.element.Element:
|
|
"""Return a ``ConfigItemMapEntry`` under ``root``
|
|
|
|
This method ensures there's a ``ConfigItemMapNamed`` named ``map_name``
|
|
under ``root``, and a ``ConfigItemMapEntry`` named ``entry_name`` under
|
|
that. Return the ``ConfigItemMapEntry`` element.
|
|
"""
|
|
config_map = self.ensure_child(root, odf.config.ConfigItemMapNamed, name=map_name)
|
|
return self.ensure_child(config_map, odf.config.ConfigItemMapEntry, name=entry_name)
|
|
|
|
def find_child(self,
|
|
parent: odf.element.Element,
|
|
child: odf.element.Element,
|
|
) -> Optional[odf.element.Element]:
|
|
attrs = {k: v for k, v in self.iter_attributes(child)}
|
|
if not attrs:
|
|
return None
|
|
for elem in parent.childNodes:
|
|
if (elem.qname == child.qname
|
|
and all(elem.getAttribute(k) == v for k, v in attrs.items())):
|
|
return elem
|
|
return None
|
|
|
|
def iter_attributes(self, elem: odf.element.Element) -> Iterator[Tuple[str, str]]:
|
|
for (_, key), value in self.iter_qattributes(elem):
|
|
yield key.lower().replace('-', ''), value
|
|
|
|
def iter_qattributes(self, elem: odf.element.Element) -> Iterator[Tuple[Tuple[str, str], str]]:
|
|
if elem.attributes:
|
|
yield from elem.attributes.items()
|
|
|
|
def replace_child(self,
|
|
parent: odf.element.Element,
|
|
child_type: ElementType,
|
|
**kwargs: Any,
|
|
) -> odf.element.Element:
|
|
new_child = child_type(**kwargs)
|
|
found_child = self.find_child(parent, new_child)
|
|
parent.insertBefore(new_child, found_child)
|
|
if found_child is not None:
|
|
parent.removeChild(found_child)
|
|
return new_child
|
|
|
|
def set_config(self,
|
|
root: odf.element.Element,
|
|
name: str,
|
|
value: Union[bool, int, str],
|
|
config_type: Optional[str]=None,
|
|
) -> None:
|
|
"""Ensure ``root`` has a ``ConfigItem`` with the given name, type, and value"""
|
|
value_s = str(value)
|
|
if isinstance(value, bool):
|
|
value_s = str(value).lower()
|
|
default_type = 'boolean'
|
|
elif isinstance(value, str):
|
|
default_type = 'string'
|
|
if config_type is None:
|
|
try:
|
|
config_type = default_type
|
|
except NameError:
|
|
raise ValueError(
|
|
f"need config_type for {type(value).__name__} value",
|
|
) from None
|
|
item = self.replace_child(
|
|
root, odf.config.ConfigItem, name=name, type=config_type,
|
|
)
|
|
item.addText(value_s)
|
|
|
|
### Styles
|
|
|
|
def _build_currency_style(
|
|
self,
|
|
root: odf.element.Element,
|
|
locale: babel.core.Locale,
|
|
code: str,
|
|
fmt_index: int,
|
|
properties: Optional[odf.style.TextProperties]=None,
|
|
*,
|
|
fmt_key: Optional[str]=None,
|
|
volatile: bool=False,
|
|
minintegerdigits: int=1,
|
|
) -> odf.element.Element:
|
|
if fmt_key is None:
|
|
fmt_key = self.currency_fmt_key
|
|
pattern = locale.currency_formats[fmt_key]
|
|
fmts = pattern.pattern.split(';')
|
|
try:
|
|
fmt = fmts[fmt_index]
|
|
except IndexError:
|
|
fmt = fmts[0]
|
|
grouping = pattern.grouping[0]
|
|
else:
|
|
grouping = pattern.grouping[fmt_index]
|
|
zero_s = babel.numbers.format_currency(0, code, '##0.0', locale)
|
|
try:
|
|
decimal_index = zero_s.rindex('.') + 1
|
|
except ValueError:
|
|
decimalplaces = 0
|
|
else:
|
|
decimalplaces = len(zero_s) - decimal_index
|
|
style = self.replace_child(
|
|
root,
|
|
odf.number.CurrencyStyle,
|
|
name=f'{code}{next(self._name_counter)}',
|
|
)
|
|
style.setAttribute('volatile', 'true' if volatile else 'false')
|
|
if properties is not None:
|
|
style.addElement(properties)
|
|
for part in re.split(r"(¤+|[#0,.]+|'[^']+')", fmt):
|
|
if not part:
|
|
pass
|
|
elif not part.strip('#0,.'):
|
|
style.addElement(odf.number.Number(
|
|
decimalplaces=str(decimalplaces),
|
|
grouping='true' if grouping else 'false',
|
|
minintegerdigits=str(minintegerdigits),
|
|
))
|
|
elif part == '¤':
|
|
style.addElement(odf.number.CurrencySymbol(
|
|
country=locale.territory,
|
|
language=locale.language,
|
|
text=babel.numbers.get_currency_symbol(code, locale),
|
|
))
|
|
elif part == '¤¤':
|
|
style.addElement(odf.number.Text(text=code))
|
|
else:
|
|
style.addElement(odf.number.Text(text=part.strip("'")))
|
|
return style
|
|
|
|
def currency_style(
|
|
self,
|
|
code: str,
|
|
locale: Optional[babel.core.Locale]=None,
|
|
negative_properties: Optional[odf.style.TextProperties]=None,
|
|
positive_properties: Optional[odf.style.TextProperties]=None,
|
|
root: odf.element.Element=None,
|
|
) -> odf.style.Style:
|
|
"""Create and return a spreadsheet style to format currency data
|
|
|
|
Given a currency code and a locale, this method will create all the
|
|
styles necessary to format the currency according to the locale's
|
|
rules, including rendering of decimal points and negative values.
|
|
|
|
You may optionally pass in TextProperties to use for negative and
|
|
positive amounts, respectively. If you don't, negative values will
|
|
automatically be rendered in red (text color #f00).
|
|
|
|
Results are cached. If you repeatedly call this method with the same
|
|
arguments, you'll keep getting the same style returned, which will
|
|
only be added to the document once.
|
|
"""
|
|
if locale is None:
|
|
locale = self.locale
|
|
if negative_properties is None:
|
|
negative_properties = odf.style.TextProperties(color='#ff0000')
|
|
if root is None:
|
|
root = self.document.styles
|
|
cache_parts = [str(id(root)), code, str(locale)]
|
|
for key, value in self.iter_attributes(negative_properties):
|
|
cache_parts.append(f'{key}={value}')
|
|
if positive_properties is not None:
|
|
cache_parts.append('')
|
|
for key, value in self.iter_attributes(positive_properties):
|
|
cache_parts.append(f'{key}={value}')
|
|
cache_key = '\0'.join(cache_parts)
|
|
try:
|
|
style = self._currency_style_cache[cache_key]
|
|
except KeyError:
|
|
pos_style = self._build_currency_style(
|
|
root, locale, code, 0, positive_properties, volatile=True,
|
|
)
|
|
curr_style = self._build_currency_style(
|
|
root, locale, code, 1, negative_properties,
|
|
)
|
|
curr_style.addElement(odf.style.Map(
|
|
condition='value()>=0', applystylename=pos_style,
|
|
))
|
|
style = self.ensure_child(
|
|
self.document.styles,
|
|
odf.style.Style,
|
|
name=f'{curr_style.getAttribute("name")}Cell',
|
|
family='table-cell',
|
|
datastylename=curr_style,
|
|
)
|
|
self._currency_style_cache[cache_key] = style
|
|
return style
|
|
|
|
def _merge_style_iter_names(
|
|
self,
|
|
styles: Sequence[Union[str, odf.style.Style, None]],
|
|
) -> Iterator[str]:
|
|
for source in styles:
|
|
if source is None:
|
|
continue
|
|
elif not isinstance(source, str):
|
|
source = source.getAttribute('name')
|
|
if source.startswith('Merge_'):
|
|
orig_names = iter(source.split('_'))
|
|
next(orig_names)
|
|
yield from orig_names
|
|
else:
|
|
yield source
|
|
|
|
def _merge_styles(self,
|
|
new_style: odf.style.Style,
|
|
sources: Iterable[odf.style.Style],
|
|
) -> None:
|
|
for elem in sources:
|
|
for key, new_value in self.iter_attributes(elem):
|
|
old_value = new_style.getAttribute(key)
|
|
if (key == 'name'
|
|
or key == 'displayname'
|
|
or old_value == new_value):
|
|
pass
|
|
elif old_value is None:
|
|
new_style.setAttribute(key, new_value)
|
|
else:
|
|
raise ValueError(f"cannot merge styles with conflicting {key}")
|
|
for child in elem.childNodes:
|
|
new_style.addElement(self.copy_element(child))
|
|
|
|
def merge_styles(self,
|
|
*styles: Union[str, odf.style.Style, None],
|
|
) -> Optional[odf.style.Style]:
|
|
"""Create a new style from multiple existing styles
|
|
|
|
Given any number of existing styles, create a new style that combines
|
|
all of those styles' attributes and properties, add it to the document
|
|
styles, and return it.
|
|
|
|
Styles can be specified by name, or by passing in their Style element.
|
|
For convenience, you can also pass in None as an argument; None will
|
|
simply be skipped.
|
|
|
|
Results are cached. If you repeatedly call this method with the same
|
|
arguments, you'll keep getting the same style returned, which will
|
|
only be added to the document once.
|
|
|
|
If you pass in zero real style arguments, returns None.
|
|
If you pass in one style argument, returns that style unchanged.
|
|
If you pass in a style that doesn't already exist in the document,
|
|
or if you pass in styles that can't be merged (because they have
|
|
conflicting attributes), raises ValueError.
|
|
"""
|
|
name_map: Dict[str, odf.style.Style] = {}
|
|
for name in self._merge_style_iter_names(styles):
|
|
source = odf.style.Style(name=name)
|
|
found = self.find_child(self.document.styles, source)
|
|
if found is None:
|
|
raise ValueError(f"no style named {name!r}")
|
|
name_map[name] = found
|
|
if not name_map:
|
|
retval = None
|
|
elif len(name_map) == 1:
|
|
_, retval = name_map.popitem()
|
|
else:
|
|
new_name = f'Merge_{"_".join(sorted(name_map))}'
|
|
retval = self.ensure_child(
|
|
self.document.styles, odf.style.Style, name=new_name,
|
|
)
|
|
if retval.firstChild is None:
|
|
self._merge_styles(retval, name_map.values())
|
|
return retval
|
|
|
|
### Sheets
|
|
|
|
def lock_first_row(self, sheet: Optional[odf.table.Table]=None) -> None:
|
|
"""Lock the first row of cells under the given sheet
|
|
|
|
This method sets all the appropriate settings to "lock" the first row
|
|
of cells in a sheet, so it stays in view even as the viewer scrolls
|
|
through rows. If a sheet is not given, works on ``self.sheet``.
|
|
"""
|
|
if sheet is None:
|
|
sheet = self.sheet
|
|
config_map = self.ensure_config_map_entry(
|
|
self.view, 'Tables', sheet.getAttribute('name'),
|
|
)
|
|
self.set_config(config_map, 'PositionBottom', 1, 'int')
|
|
self.set_config(config_map, 'VerticalSplitMode', 2, 'short')
|
|
self.set_config(config_map, 'VerticalSplitPosition', 1, 'short')
|
|
|
|
def use_sheet(self, name: str) -> odf.table.Table:
|
|
"""Switch the active sheet ``self.sheet`` to the one with the given name
|
|
|
|
If there is no sheet with the given name, create it and append it to
|
|
the spreadsheet first.
|
|
|
|
If the current active sheet is empty when this method is called, it
|
|
will be removed from the spreadsheet.
|
|
"""
|
|
try:
|
|
empty_sheet = not self.sheet.hasChildNodes()
|
|
except AttributeError:
|
|
empty_sheet = False
|
|
if empty_sheet:
|
|
self.document.spreadsheet.removeChild(self.sheet)
|
|
self.sheet = self.ensure_child(
|
|
self.document.spreadsheet, odf.table.Table, name=name,
|
|
)
|
|
return self.sheet
|
|
|
|
### Initialization hooks
|
|
|
|
def init_settings(self) -> None:
|
|
"""Hook called to initialize settings
|
|
|
|
This method is called by __init__ to populate
|
|
``self.document.settings``. This implementation creates the barest
|
|
skeleton structure necessary to support other methods, in particular
|
|
``lock_first_row``.
|
|
"""
|
|
view_settings = self.ensure_child(
|
|
self.document.settings, odf.config.ConfigItemSet, name='ooo:view-settings',
|
|
)
|
|
views = self.ensure_child(
|
|
view_settings, odf.config.ConfigItemMapIndexed, name='Views',
|
|
)
|
|
self.view = self.ensure_child(views, odf.config.ConfigItemMapEntry)
|
|
self.set_config(self.view, 'ViewId', 'view1')
|
|
|
|
def init_styles(self) -> None:
|
|
"""Hook called to initialize settings
|
|
|
|
This method is called by __init__ to populate
|
|
``self.document.styles``. This implementation creates basic building
|
|
block cell styles often used in financial reports.
|
|
"""
|
|
styles = self.document.styles
|
|
self.style_bold = self.ensure_child(
|
|
styles, odf.style.Style, name='Bold', family='table-cell',
|
|
)
|
|
self.ensure_child(
|
|
self.style_bold, odf.style.TextProperties, fontweight='bold',
|
|
)
|
|
self.style_dividerline = self.ensure_child(
|
|
styles, odf.style.Style, name='DividerLine', family='table-cell',
|
|
)
|
|
self.ensure_child(
|
|
self.style_dividerline,
|
|
odf.style.TableCellProperties,
|
|
borderbottom='1pt solid #0000ff',
|
|
)
|
|
|
|
date_style = self.replace_child(styles, odf.number.DateStyle, name='ISODate')
|
|
date_style.addElement(odf.number.Year(style='long'))
|
|
date_style.addElement(odf.number.Text(text='-'))
|
|
date_style.addElement(odf.number.Month(style='long'))
|
|
date_style.addElement(odf.number.Text(text='-'))
|
|
date_style.addElement(odf.number.Day(style='long'))
|
|
self.style_date = self.ensure_child(
|
|
styles,
|
|
odf.style.Style,
|
|
name=f'{date_style.getAttribute("name")}Cell',
|
|
family='table-cell',
|
|
datastylename=date_style,
|
|
)
|
|
|
|
self.style_starttext: odf.style.Style
|
|
self.style_centertext: odf.style.Style
|
|
self.style_endtext: odf.style.Style
|
|
for textalign in ['start', 'center', 'end']:
|
|
aligned_style = self.replace_child(
|
|
styles, odf.style.Style, name=f'{textalign.title()}Text',
|
|
)
|
|
aligned_style.setAttribute('family', 'table-cell')
|
|
aligned_style.addElement(odf.style.ParagraphProperties(textalign=textalign))
|
|
setattr(self, f'style_{textalign}text', aligned_style)
|
|
|
|
self.style_col1: odf.style.Style
|
|
self.style_col1_25: odf.style.Style
|
|
self.style_col1_5: odf.style.Style
|
|
self.style_col1_75: odf.style.Style
|
|
self.style_col2: odf.style.Style
|
|
for width in ['1', '1.25', '1.5', '1.75', '2']:
|
|
width_name = width.replace('.', '_')
|
|
column_style = self.replace_child(
|
|
self.document.automaticstyles, odf.style.Style, name=f'col_{width_name}',
|
|
)
|
|
column_style.setAttribute('family', 'table-column')
|
|
column_style.addElement(odf.style.TableColumnProperties(columnwidth=f'{width}in'))
|
|
setattr(self, f'style_col{width_name}', column_style)
|
|
|
|
### Rows and cells
|
|
|
|
def add_row(self, *cells: odf.table.TableCell, **attrs: Any) -> odf.table.TableRow:
|
|
row = odf.table.TableRow(**attrs)
|
|
for cell in cells:
|
|
row.addElement(cell)
|
|
self.sheet.addElement(row)
|
|
return row
|
|
|
|
def balance_cell(self, balance: Balance, **attrs: Any) -> odf.table.TableCell:
|
|
if balance.is_zero():
|
|
return self.float_cell(0, **attrs)
|
|
elif len(balance) == 1:
|
|
amount = next(iter(balance.values()))
|
|
attrs['stylename'] = self.merge_styles(
|
|
attrs.get('stylename'), self.currency_style(amount.currency),
|
|
)
|
|
return self.currency_cell(amount, **attrs)
|
|
else:
|
|
lines = [babel.numbers.format_currency(
|
|
number, currency, locale=self.locale, format_type=self.currency_fmt_key,
|
|
) for number, currency in balance.values()]
|
|
attrs['stylename'] = self.merge_styles(
|
|
attrs.get('stylename'), self.style_endtext,
|
|
)
|
|
return self.multiline_cell(lines, **attrs)
|
|
|
|
def currency_cell(self, amount: data.Amount, **attrs: Any) -> odf.table.TableCell:
|
|
number, currency = amount
|
|
cell = odf.table.TableCell(valuetype='currency', value=number, **attrs)
|
|
cell.addElement(odf.text.P(text=babel.numbers.format_currency(
|
|
number, currency, locale=self.locale, format_type=self.currency_fmt_key,
|
|
)))
|
|
return cell
|
|
|
|
def date_cell(self, date: datetime.date, **attrs: Any) -> odf.table.TableCell:
|
|
attrs.setdefault('stylename', self.style_date)
|
|
cell = odf.table.TableCell(valuetype='date', datevalue=date, **attrs)
|
|
cell.addElement(odf.text.P(text=date.isoformat()))
|
|
return cell
|
|
|
|
def float_cell(self, value: Union[int, float, Decimal], **attrs: Any) -> odf.table.TableCell:
|
|
cell = odf.table.TableCell(valuetype='float', value=value, **attrs)
|
|
cell.addElement(odf.text.P(text=str(value)))
|
|
return cell
|
|
|
|
def _meta_link_pairs(self, links: Iterable[Optional[str]]) -> Iterator[Tuple[str, str]]:
|
|
for href in links:
|
|
if href is None:
|
|
continue
|
|
elif self.rt_wrapper is not None:
|
|
rt_ids = self.rt_wrapper.parse(href)
|
|
rt_href = rt_ids and self.rt_wrapper.url(*rt_ids)
|
|
else:
|
|
rt_ids = None
|
|
rt_href = None
|
|
if rt_ids is None or rt_href is None:
|
|
# '..' pops the ODS filename off the link path. In other words,
|
|
# make the link relative to the directory the ODS is in.
|
|
href_path = Path('..', href)
|
|
href = str(href_path)
|
|
text = href_path.name
|
|
else:
|
|
rt_path = urlparse.urlparse(rt_href).path
|
|
if rt_path.endswith('/Ticket/Display.html'):
|
|
text = rtutil.RT.unparse(*rt_ids)
|
|
else:
|
|
text = urlparse.unquote(Path(rt_path).name)
|
|
href = rt_href
|
|
yield (href, text)
|
|
|
|
def meta_links_cell(self, links: Iterable[Optional[str]], **attrs: Any) -> odf.table.TableCell:
|
|
return self.multilink_cell(self._meta_link_pairs(links), **attrs)
|
|
|
|
def multiline_cell(self, lines: Iterable[Any], **attrs: Any) -> odf.table.TableCell:
|
|
cell = odf.table.TableCell(valuetype='string', **attrs)
|
|
for line in lines:
|
|
cell.addElement(odf.text.P(text=str(line)))
|
|
return cell
|
|
|
|
def multilink_cell(self, links: Iterable[LinkType], **attrs: Any) -> odf.table.TableCell:
|
|
cell = odf.table.TableCell(valuetype='string', **attrs)
|
|
for link in links:
|
|
if isinstance(link, tuple):
|
|
href, text = link
|
|
else:
|
|
href = link
|
|
text = None
|
|
cell.addElement(odf.text.P())
|
|
cell.lastChild.addElement(odf.text.A(
|
|
type='simple', href=href, text=text or href,
|
|
))
|
|
return cell
|
|
|
|
def string_cell(self, text: str, **attrs: Any) -> odf.table.TableCell:
|
|
cell = odf.table.TableCell(valuetype='string', **attrs)
|
|
cell.addElement(odf.text.P(text=text))
|
|
return cell
|
|
|
|
def write_row(self, row: RT) -> None:
|
|
"""Write a single row of input data to the spreadsheet
|
|
|
|
This default implementation adds a single row to the spreadsheet,
|
|
with one cell per element of the row. The type of each element
|
|
determines what kind of cell is created.
|
|
|
|
This implementation will help get you started, but you'll probably
|
|
want to override it to specify styles.
|
|
"""
|
|
out_row = odf.table.TableRow()
|
|
for cell_source in row:
|
|
if isinstance(cell_source, (int, float, Decimal)):
|
|
cell = self.float_cell(cell_source)
|
|
else:
|
|
cell = self.string_cell(cell_source)
|
|
out_row.addElement(cell)
|
|
self.sheet.addElement(out_row)
|
|
|
|
def save_file(self, out_file: BinaryIO) -> None:
|
|
self.document.write(out_file)
|
|
|
|
def save_path(self, path: Path, mode: str='w') -> None:
|
|
with path.open(f'{mode}b') as out_file:
|
|
out_file = cast(BinaryIO, out_file)
|
|
self.save_file(out_file)
|
|
|
|
|
|
def normalize_amount_func(account_name: str) -> Callable[[T], T]:
|
|
"""Get a function to normalize amounts for reporting
|
|
|
|
Given an account name, return a function that can be used on "amounts"
|
|
under that account (including numbers, Amount objects, and Balance objects)
|
|
to normalize them for reporting. Right now that means make flipping the
|
|
sign for accounts where "normal" postings are negative.
|
|
"""
|
|
if account_name.startswith(('Assets:', 'Expenses:')):
|
|
# We can't just return operator.pos because Beancount's Amount class
|
|
# doesn't implement __pos__.
|
|
return lambda amt: amt
|
|
elif account_name.startswith(('Equity:', 'Income:', 'Liabilities:')):
|
|
return operator.neg
|
|
else:
|
|
raise ValueError(f"unrecognized account name {account_name!r}")
|