query: Improve formatting of ODS output.
* Provide dedicated formatting for more Beancount types. * Improve code to determine when we're looking up link metadata and should format output as links.
This commit is contained in:
parent
8af45e5f8a
commit
6fa1278966
3 changed files with 193 additions and 75 deletions
|
@ -1368,7 +1368,7 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
|
||||||
)
|
)
|
||||||
return self.multiline_cell(lines, **attrs)
|
return self.multiline_cell(lines, **attrs)
|
||||||
|
|
||||||
def currency_cell(self, amount: data.Amount, **attrs: Any) -> odf.table.TableCell:
|
def currency_cell(self, amount: bc_amount._Amount, **attrs: Any) -> odf.table.TableCell:
|
||||||
if 'stylename' not in attrs:
|
if 'stylename' not in attrs:
|
||||||
attrs['stylename'] = self.currency_style(amount.currency)
|
attrs['stylename'] = self.currency_style(amount.currency)
|
||||||
number, currency = amount
|
number, currency = amount
|
||||||
|
|
|
@ -33,6 +33,7 @@ from typing import (
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
from ..beancount_types import (
|
from ..beancount_types import (
|
||||||
|
MetaKey,
|
||||||
MetaValue,
|
MetaValue,
|
||||||
Posting,
|
Posting,
|
||||||
Transaction,
|
Transaction,
|
||||||
|
@ -41,6 +42,8 @@ from ..beancount_types import (
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from beancount.core.amount import _Amount as BeancountAmount
|
from beancount.core.amount import _Amount as BeancountAmount
|
||||||
|
from beancount.core.inventory import Inventory
|
||||||
|
from beancount.core.position import _Position as Position
|
||||||
|
|
||||||
import beancount.query.numberify as bc_query_numberify
|
import beancount.query.numberify as bc_query_numberify
|
||||||
import beancount.query.query_compile as bc_query_compile
|
import beancount.query.query_compile as bc_query_compile
|
||||||
|
@ -59,10 +62,6 @@ from .. import config as configmod
|
||||||
from .. import data
|
from .. import data
|
||||||
from .. import rtutil
|
from .. import rtutil
|
||||||
|
|
||||||
BUILTIN_FIELDS: AbstractSet[str] = frozenset(itertools.chain(
|
|
||||||
bc_query_env.TargetsEnvironment.columns, # type:ignore[has-type]
|
|
||||||
bc_query_env.TargetsEnvironment.functions, # type:ignore[has-type]
|
|
||||||
))
|
|
||||||
PROGNAME = 'query-report'
|
PROGNAME = 'query-report'
|
||||||
logger = logging.getLogger('conservancy_beancount.reports.query')
|
logger = logging.getLogger('conservancy_beancount.reports.query')
|
||||||
|
|
||||||
|
@ -80,6 +79,12 @@ EnvironmentFunctions = Dict[
|
||||||
RowTypes = Sequence[Tuple[str, Type]]
|
RowTypes = Sequence[Tuple[str, Type]]
|
||||||
Rows = Sequence[NamedTuple]
|
Rows = Sequence[NamedTuple]
|
||||||
Store = List[Any]
|
Store = List[Any]
|
||||||
|
QueryExpression = Union[
|
||||||
|
bc_query_parser.Column,
|
||||||
|
bc_query_parser.Constant,
|
||||||
|
bc_query_parser.Function,
|
||||||
|
bc_query_parser.UnaryOp,
|
||||||
|
]
|
||||||
QueryStatement = Union[
|
QueryStatement = Union[
|
||||||
bc_query_parser.Balances,
|
bc_query_parser.Balances,
|
||||||
bc_query_parser.Journal,
|
bc_query_parser.Journal,
|
||||||
|
@ -124,14 +129,45 @@ class BooksLoader:
|
||||||
|
|
||||||
|
|
||||||
class QueryODS(core.BaseODS[NamedTuple, None]):
|
class QueryODS(core.BaseODS[NamedTuple, None]):
|
||||||
|
META_FNAMES = frozenset([
|
||||||
|
'any_meta',
|
||||||
|
'entry_meta',
|
||||||
|
'meta',
|
||||||
|
'meta_docs',
|
||||||
|
'str_meta',
|
||||||
|
])
|
||||||
|
|
||||||
def is_empty(self) -> bool:
|
def is_empty(self) -> bool:
|
||||||
return not self.sheet.childNodes
|
return not self.sheet.childNodes
|
||||||
|
|
||||||
def section_key(self, row: NamedTuple) -> None:
|
def section_key(self, row: NamedTuple) -> None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _generic_cell(self, value: Any) -> odf.table.TableCell:
|
||||||
|
if isinstance(value, Iterable) and not isinstance(value, (str, tuple)):
|
||||||
|
return self.multiline_cell(value)
|
||||||
|
else:
|
||||||
|
return self.string_cell('' if value is None else str(value))
|
||||||
|
|
||||||
|
def _inventory_cell(self, value: Inventory) -> odf.table.TableCell:
|
||||||
|
return self.balance_cell(core.Balance(pos.units for pos in value))
|
||||||
|
|
||||||
|
def _link_string_cell(self, value: str) -> odf.table.TableCell:
|
||||||
|
return self.meta_links_cell(value.split())
|
||||||
|
|
||||||
|
def _metadata_cell(self, value: MetaValue) -> odf.table.TableCell:
|
||||||
|
return self._cell_type(type(value))(value)
|
||||||
|
|
||||||
|
def _position_cell(self, value: Position) -> odf.table.TableCell:
|
||||||
|
return self.currency_cell(value.units)
|
||||||
|
|
||||||
def _cell_type(self, row_type: Type) -> CellFunc:
|
def _cell_type(self, row_type: Type) -> CellFunc:
|
||||||
if issubclass(row_type, BeancountAmount):
|
"""Return a function to create a cell, for non-metadata row types."""
|
||||||
|
if issubclass(row_type, Inventory):
|
||||||
|
return self._inventory_cell
|
||||||
|
elif issubclass(row_type, Position):
|
||||||
|
return self._position_cell
|
||||||
|
elif issubclass(row_type, BeancountAmount):
|
||||||
return self.currency_cell
|
return self.currency_cell
|
||||||
elif issubclass(row_type, (int, float, Decimal)):
|
elif issubclass(row_type, (int, float, Decimal)):
|
||||||
return self.float_cell
|
return self.float_cell
|
||||||
|
@ -142,49 +178,85 @@ class QueryODS(core.BaseODS[NamedTuple, None]):
|
||||||
else:
|
else:
|
||||||
return self._generic_cell
|
return self._generic_cell
|
||||||
|
|
||||||
def _generic_cell(self, value: Any) -> odf.table.TableCell:
|
def _link_cell_type(self, row_type: Type) -> CellFunc:
|
||||||
return self.string_cell('' if value is None else str(value))
|
"""Return a function to create a cell from metadata with documentation links."""
|
||||||
|
if issubclass(row_type, str):
|
||||||
def _link_cell(self, value: MetaValue) -> odf.table.TableCell:
|
return self._link_string_cell
|
||||||
if isinstance(value, str):
|
elif issubclass(row_type, tuple):
|
||||||
return self.meta_links_cell(value.split())
|
return self._generic_cell
|
||||||
|
elif issubclass(row_type, Iterable):
|
||||||
|
return self.meta_links_cell
|
||||||
else:
|
else:
|
||||||
return self._generic_cell(value)
|
return self._generic_cell
|
||||||
|
|
||||||
def _metadata_cell(self, value: MetaValue) -> odf.table.TableCell:
|
def _meta_target(self, target: QueryExpression) -> Optional[MetaKey]:
|
||||||
return self._cell_type(type(value))(value)
|
"""Return the metadata key looked up by this target, if any
|
||||||
|
|
||||||
def _cell_types(self, row_types: RowTypes) -> Iterator[CellFunc]:
|
This function takes a parsed target (i.e., what we're SELECTing) and
|
||||||
for name, row_type in row_types:
|
recurses it to see whether it's looking up any metadata. If so, it
|
||||||
if row_type is object:
|
returns the key of that metadata. Otherwise it returns None.
|
||||||
if name.replace('_', '-') in data.LINK_METADATA:
|
"""
|
||||||
yield self._link_cell
|
if isinstance(target, bc_query_parser.UnaryOp):
|
||||||
else:
|
return self._meta_target(target.operand)
|
||||||
yield self._metadata_cell
|
elif not isinstance(target, bc_query_parser.Function):
|
||||||
else:
|
return None
|
||||||
|
try:
|
||||||
|
operand = target.operands[0]
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
if (target.fname in self.META_FNAMES
|
||||||
|
and isinstance(operand, bc_query_parser.Constant)):
|
||||||
|
return operand.value # type:ignore[no-any-return]
|
||||||
|
else:
|
||||||
|
for operand in target.operands:
|
||||||
|
retval = self._meta_target(operand)
|
||||||
|
if retval is not None:
|
||||||
|
break
|
||||||
|
return retval
|
||||||
|
|
||||||
|
def _cell_types(self, statement: QueryStatement, row_types: RowTypes) -> Iterator[CellFunc]:
|
||||||
|
"""Return functions to create table cells from result rows
|
||||||
|
|
||||||
|
Given a parsed query and the types of return rows, yields a function
|
||||||
|
to create a cell for each column in the row, in order. The returned
|
||||||
|
functions vary in order to provide the best available formatting for
|
||||||
|
different data types.
|
||||||
|
"""
|
||||||
|
if (isinstance(statement, bc_query_parser.Select)
|
||||||
|
and isinstance(statement.targets, Sequence)):
|
||||||
|
targets = [t.expression for t in statement.targets]
|
||||||
|
else:
|
||||||
|
# Synthesize something that makes clear we're not loading metadata.
|
||||||
|
targets = [bc_query_parser.Column(name) for name, _ in row_types]
|
||||||
|
for target, (_, row_type) in zip(targets, row_types):
|
||||||
|
meta_key = self._meta_target(target)
|
||||||
|
if meta_key is None:
|
||||||
yield self._cell_type(row_type)
|
yield self._cell_type(row_type)
|
||||||
|
elif meta_key in data.LINK_METADATA:
|
||||||
|
yield self._link_cell_type(row_type)
|
||||||
|
else:
|
||||||
|
yield self._metadata_cell
|
||||||
|
|
||||||
def write_query(self, row_types: RowTypes, rows: Rows) -> None:
|
def write_query(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
||||||
if self.is_empty():
|
if self.is_empty():
|
||||||
self.sheet.setAttribute('name', "Query 1")
|
self.sheet.setAttribute('name', "Query 1")
|
||||||
else:
|
else:
|
||||||
self.use_sheet(f"Query {len(self.document.spreadsheet.childNodes) + 1}")
|
self.use_sheet(f"Query {len(self.document.spreadsheet.childNodes) + 1}")
|
||||||
for name, row_type in row_types:
|
for name, row_type in row_types:
|
||||||
if row_type is object or issubclass(row_type, str):
|
if issubclass(row_type, datetime.date):
|
||||||
col_width = 2.0
|
col_width = 1.0
|
||||||
elif issubclass(row_type, BeancountAmount):
|
elif issubclass(row_type, (BeancountAmount, Inventory, Position)):
|
||||||
col_width = 1.5
|
col_width = 1.5
|
||||||
else:
|
else:
|
||||||
col_width = 1.0
|
col_width = 2.0
|
||||||
col_style = self.column_style(col_width)
|
col_style = self.column_style(col_width)
|
||||||
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
|
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
|
||||||
self.add_row(*(
|
self.add_row(*(
|
||||||
self.string_cell(data.Metadata.human_name(name.replace('_', '-')),
|
self.string_cell(data.Metadata.human_name(name), stylename=self.style_bold)
|
||||||
stylename=self.style_bold)
|
|
||||||
for name, _ in row_types
|
for name, _ in row_types
|
||||||
))
|
))
|
||||||
self.lock_first_row()
|
self.lock_first_row()
|
||||||
cell_funcs = list(self._cell_types(row_types))
|
cell_funcs = list(self._cell_types(statement, row_types))
|
||||||
for row in rows:
|
for row in rows:
|
||||||
self.add_row(*(
|
self.add_row(*(
|
||||||
cell_func(value)
|
cell_func(value)
|
||||||
|
@ -238,7 +310,7 @@ class AggregateSet(bc_query_compile.EvalAggregator):
|
||||||
|
|
||||||
def update(self, store: Store, context: Context) -> None:
|
def update(self, store: Store, context: Context) -> None:
|
||||||
value, = self.eval_args(context)
|
value, = self.eval_args(context)
|
||||||
if isinstance(value, Sequence) and not isinstance(value, str):
|
if isinstance(value, Sequence) and not isinstance(value, (str, tuple)):
|
||||||
store[self.handle].update(value)
|
store[self.handle].update(value)
|
||||||
else:
|
else:
|
||||||
store[self.handle].add(value)
|
store[self.handle].add(value)
|
||||||
|
@ -304,9 +376,9 @@ class BQLShell(bc_query_shell.BQLShell):
|
||||||
print("(empty)", file=self.outfile)
|
print("(empty)", file=self.outfile)
|
||||||
else:
|
else:
|
||||||
logger.debug("rendering query as %s", output_format)
|
logger.debug("rendering query as %s", output_format)
|
||||||
render_func(row_types, rows)
|
render_func(statement, row_types, rows)
|
||||||
|
|
||||||
def _render_csv(self, row_types: RowTypes, rows: Rows) -> None:
|
def _render_csv(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
||||||
bc_query_render.render_csv(
|
bc_query_render.render_csv(
|
||||||
row_types,
|
row_types,
|
||||||
rows,
|
rows,
|
||||||
|
@ -315,11 +387,15 @@ class BQLShell(bc_query_shell.BQLShell):
|
||||||
self.vars['expand'],
|
self.vars['expand'],
|
||||||
)
|
)
|
||||||
|
|
||||||
def _render_ods(self, row_types: RowTypes, rows: Rows) -> None:
|
def _render_ods(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
||||||
self.ods.write_query(row_types, rows)
|
self.ods.write_query(statement, row_types, rows)
|
||||||
logger.info("results saved in sheet %s", self.ods.sheet.getAttribute('name'))
|
logger.info(
|
||||||
|
"%s rows of results saved in sheet %s",
|
||||||
|
len(rows),
|
||||||
|
self.ods.sheet.getAttribute('name'),
|
||||||
|
)
|
||||||
|
|
||||||
def _render_text(self, row_types: RowTypes, rows: Rows) -> None:
|
def _render_text(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
||||||
with contextlib.ExitStack() as stack:
|
with contextlib.ExitStack() as stack:
|
||||||
if self.is_interactive:
|
if self.is_interactive:
|
||||||
output = stack.enter_context(self.get_pager())
|
output = stack.enter_context(self.get_pager())
|
||||||
|
@ -394,9 +470,7 @@ ODS reports.
|
||||||
help="""Query to run non-interactively. If none is provided, and
|
help="""Query to run non-interactively. If none is provided, and
|
||||||
standard input is not a terminal, reads the query from stdin instead.
|
standard input is not a terminal, reads the query from stdin instead.
|
||||||
""")
|
""")
|
||||||
|
return parser.parse_args(arglist)
|
||||||
args = parser.parse_args(arglist)
|
|
||||||
return args
|
|
||||||
|
|
||||||
def main(arglist: Optional[Sequence[str]]=None,
|
def main(arglist: Optional[Sequence[str]]=None,
|
||||||
stdout: TextIO=sys.stdout,
|
stdout: TextIO=sys.stdout,
|
||||||
|
|
|
@ -21,6 +21,7 @@ import pytest
|
||||||
from . import testutil
|
from . import testutil
|
||||||
|
|
||||||
from beancount.core import data as bc_data
|
from beancount.core import data as bc_data
|
||||||
|
from beancount.query import query_parser as bc_query_parser
|
||||||
from conservancy_beancount.books import FiscalYear
|
from conservancy_beancount.books import FiscalYear
|
||||||
from conservancy_beancount.reports import query as qmod
|
from conservancy_beancount.reports import query as qmod
|
||||||
from conservancy_beancount import rtutil
|
from conservancy_beancount import rtutil
|
||||||
|
@ -38,6 +39,10 @@ class MockRewriteRuleset:
|
||||||
yield post._replace(units=testutil.Amount(number, currency))
|
yield post._replace(units=testutil.Amount(number, currency))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='module')
|
||||||
|
def qparser():
|
||||||
|
return bc_query_parser.Parser()
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
def rt():
|
def rt():
|
||||||
return rtutil.RT(testutil.RTClient())
|
return rtutil.RT(testutil.RTClient())
|
||||||
|
@ -130,40 +135,47 @@ def test_rewrite_query(end_index):
|
||||||
assert expected.issubset(actual)
|
assert expected.issubset(actual)
|
||||||
assert frozenset(accounts).difference(expected).isdisjoint(actual)
|
assert frozenset(accounts).difference(expected).isdisjoint(actual)
|
||||||
|
|
||||||
def test_ods_amount_formatting():
|
def test_ods_amount_formatting(qparser):
|
||||||
|
statement = qparser.parse('SELECT UNITS(position)')
|
||||||
row_types = [('amount', bc_data.Amount)]
|
row_types = [('amount', bc_data.Amount)]
|
||||||
row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
|
row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
|
||||||
ods = qmod.QueryODS()
|
ods = qmod.QueryODS()
|
||||||
ods.write_query(row_types, row_source)
|
ods.write_query(statement, row_types, row_source)
|
||||||
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
||||||
assert next(actual)[0].text == 'Amount'
|
assert next(actual)[0].text == 'Amount'
|
||||||
assert next(actual)[0].text == '$12.00'
|
assert next(actual)[0].text == '$12.00'
|
||||||
assert next(actual)[0].text == '¥1,480'
|
assert next(actual)[0].text == '¥1,480'
|
||||||
assert next(actual, None) is None
|
assert next(actual, None) is None
|
||||||
|
|
||||||
def test_ods_datetime_formatting():
|
def test_ods_datetime_formatting(qparser):
|
||||||
|
statement = qparser.parse('SELECT date')
|
||||||
row_types = [('date', datetime.date)]
|
row_types = [('date', datetime.date)]
|
||||||
row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
|
row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
|
||||||
ods = qmod.QueryODS()
|
ods = qmod.QueryODS()
|
||||||
ods.write_query(row_types, row_source)
|
ods.write_query(statement, row_types, row_source)
|
||||||
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
||||||
assert next(actual)[0].text == 'Date'
|
assert next(actual)[0].text == 'Date'
|
||||||
assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
|
assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
|
||||||
assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
|
assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
|
||||||
assert next(actual, None) is None
|
assert next(actual, None) is None
|
||||||
|
|
||||||
@pytest.mark.parametrize('meta_key,header_text', [
|
@pytest.mark.parametrize('meta_key,meta_func', [
|
||||||
('check', 'Check'),
|
('check', 'ANY_META'),
|
||||||
('purchase-order', 'Purchase Order'),
|
('purchase-order', 'META'),
|
||||||
('rt-id', 'Ticket'),
|
('rt-id', 'META_DOCS'),
|
||||||
])
|
])
|
||||||
def test_ods_link_formatting(rt, meta_key, header_text):
|
def test_ods_link_formatting(qparser, rt, meta_key, meta_func):
|
||||||
row_types = [(meta_key.replace('-', '_'), object)]
|
meta_func_returns_list = meta_func == 'META_DOCS'
|
||||||
row_source = [('rt:1/5',), ('rt:3 Checks/9.pdf',)]
|
statement = qparser.parse(f'SELECT {meta_func}({meta_key!r}) AS docs')
|
||||||
|
row_types = [('docs', list if meta_func_returns_list else str)]
|
||||||
|
row_source = [
|
||||||
|
(s.split() if meta_func_returns_list else s,)
|
||||||
|
for s in ['rt:1/5', 'rt:3 Checks/9.pdf']
|
||||||
|
]
|
||||||
ods = qmod.QueryODS(rt)
|
ods = qmod.QueryODS(rt)
|
||||||
ods.write_query(row_types, row_source)
|
ods.write_query(statement, row_types, row_source)
|
||||||
rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
||||||
assert next(rows).text == header_text
|
assert next(rows).text == 'Docs'
|
||||||
actual = iter(
|
actual = iter(
|
||||||
[link.text for link in row.getElementsByType(odf.text.A)]
|
[link.text for link in row.getElementsByType(odf.text.A)]
|
||||||
for row in rows
|
for row in rows
|
||||||
|
@ -172,50 +184,54 @@ def test_ods_link_formatting(rt, meta_key, header_text):
|
||||||
assert next(actual) == ['rt:3', '9.pdf']
|
assert next(actual) == ['rt:3', '9.pdf']
|
||||||
assert next(actual, None) is None
|
assert next(actual, None) is None
|
||||||
|
|
||||||
def test_ods_meta_formatting():
|
def test_ods_meta_formatting(qparser):
|
||||||
row_types = [('metadata', object)]
|
statement = qparser.parse('SELECT ANY_META("entity") AS entity')
|
||||||
|
row_types = [('entity', object)]
|
||||||
row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
|
row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
|
||||||
ods = qmod.QueryODS()
|
ods = qmod.QueryODS()
|
||||||
ods.write_query(row_types, row_source)
|
ods.write_query(statement, row_types, row_source)
|
||||||
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
||||||
assert next(actual)[0].text == 'Metadata'
|
assert next(actual)[0].text == 'Entity'
|
||||||
assert next(actual)[0].text == '$14.00'
|
assert next(actual)[0].text == '$14.00'
|
||||||
assert next(actual)[0].text == ''
|
assert next(actual)[0].text == ''
|
||||||
assert next(actual)[0].text == 'foo bar'
|
assert next(actual)[0].text == 'foo bar'
|
||||||
assert next(actual, None) is None
|
assert next(actual, None) is None
|
||||||
|
|
||||||
def test_ods_multicolumn_write(rt):
|
def test_ods_multicolumn_write(qparser, rt):
|
||||||
row_types = [('date', datetime.date), ('rt-id', object), ('desc', str)]
|
statement = qparser.parse(
|
||||||
|
'SELECT MIN(date) AS date, SET(META_DOCS("rt-id")) AS tix, STR_META("entity") AS entity',
|
||||||
|
)
|
||||||
|
row_types = [('date', datetime.date), ('tix', set), ('entity', str)]
|
||||||
row_source = [
|
row_source = [
|
||||||
(testutil.PAST_DATE, 'rt:1', 'aaa'),
|
(testutil.PAST_DATE, {'rt:1'}, 'AA'),
|
||||||
(testutil.FY_START_DATE, 'rt:2', 'bbb'),
|
(testutil.FY_START_DATE, {'rt:2'}, 'BB'),
|
||||||
(testutil.FUTURE_DATE, 'rt:3', 'ccc'),
|
(testutil.FUTURE_DATE, {'rt:3', 'rt:4'}, 'CC'),
|
||||||
]
|
]
|
||||||
ods = qmod.QueryODS(rt)
|
ods = qmod.QueryODS(rt)
|
||||||
ods.write_query(row_types, row_source)
|
ods.write_query(statement, list(row_types), list(row_source))
|
||||||
actual = iter(
|
actual = iter(
|
||||||
cell.text
|
cell.text
|
||||||
for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
||||||
for cell in row
|
for cell in row
|
||||||
)
|
)
|
||||||
assert next(actual) == 'Date'
|
for expected, _ in row_types:
|
||||||
assert next(actual) == 'Ticket'
|
assert next(actual) == expected.title()
|
||||||
assert next(actual) == 'Desc'
|
|
||||||
assert next(actual) == testutil.PAST_DATE.isoformat()
|
assert next(actual) == testutil.PAST_DATE.isoformat()
|
||||||
assert next(actual) == 'rt:1'
|
assert next(actual) == 'rt:1'
|
||||||
assert next(actual) == 'aaa'
|
assert next(actual) == 'AA'
|
||||||
assert next(actual) == testutil.FY_START_DATE.isoformat()
|
assert next(actual) == testutil.FY_START_DATE.isoformat()
|
||||||
assert next(actual) == 'rt:2'
|
assert next(actual) == 'rt:2'
|
||||||
assert next(actual) == 'bbb'
|
assert next(actual) == 'BB'
|
||||||
assert next(actual) == testutil.FUTURE_DATE.isoformat()
|
assert next(actual) == testutil.FUTURE_DATE.isoformat()
|
||||||
assert next(actual) == 'rt:3'
|
assert frozenset(next(actual).split('\0')) == row_source[-1][1]
|
||||||
assert next(actual) == 'ccc'
|
assert next(actual) == 'CC'
|
||||||
assert next(actual, None) is None
|
assert next(actual, None) is None
|
||||||
|
|
||||||
def test_ods_is_empty():
|
def test_ods_is_empty(qparser):
|
||||||
|
statement = qparser.parse('SELECT * WHERE date < 1900-01-01')
|
||||||
ods = qmod.QueryODS()
|
ods = qmod.QueryODS()
|
||||||
assert ods.is_empty()
|
assert ods.is_empty()
|
||||||
ods.write_query([], [])
|
ods.write_query(statement, [], [])
|
||||||
assert not ods.is_empty()
|
assert not ods.is_empty()
|
||||||
|
|
||||||
@pytest.mark.parametrize('fy,account,amt_prefix', [
|
@pytest.mark.parametrize('fy,account,amt_prefix', [
|
||||||
|
@ -232,8 +248,9 @@ def test_ods_output(fy, account, amt_prefix):
|
||||||
]
|
]
|
||||||
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
|
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
|
||||||
assert returncode == 0
|
assert returncode == 0
|
||||||
stdout.seek(0)
|
with stdout:
|
||||||
ods_doc = odf.opendocument.load(stdout)
|
stdout.seek(0)
|
||||||
|
ods_doc = odf.opendocument.load(stdout)
|
||||||
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
||||||
next(rows) # Skip header row
|
next(rows) # Skip header row
|
||||||
amt_pattern = rf'^{re.escape(amt_prefix)}\d'
|
amt_pattern = rf'^{re.escape(amt_prefix)}\d'
|
||||||
|
@ -243,3 +260,30 @@ def test_ods_output(fy, account, amt_prefix):
|
||||||
assert narration.text.startswith(f'{fy} ')
|
assert narration.text.startswith(f'{fy} ')
|
||||||
assert re.match(amt_pattern, amount.text)
|
assert re.match(amt_pattern, amount.text)
|
||||||
assert count
|
assert count
|
||||||
|
|
||||||
|
def test_ods_aggregate_output():
|
||||||
|
books_path = testutil.test_path(f'books/books/2020.beancount')
|
||||||
|
config = testutil.TestConfig(books_path=books_path)
|
||||||
|
arglist = [
|
||||||
|
'-O', '-',
|
||||||
|
'-f', 'ods',
|
||||||
|
'SELECT account, SET(narration), SUM(UNITS(position))',
|
||||||
|
'WHERE date >= 2020-04-01 AND date <= 2020-04-02',
|
||||||
|
'GROUP BY account ORDER BY account ASC',
|
||||||
|
]
|
||||||
|
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
|
||||||
|
assert returncode == 0
|
||||||
|
with stdout:
|
||||||
|
stdout.seek(0)
|
||||||
|
ods_doc = odf.opendocument.load(stdout)
|
||||||
|
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
||||||
|
next(rows) # Skip header row
|
||||||
|
actual = {}
|
||||||
|
for row in rows:
|
||||||
|
acct, descs, balance = row.childNodes
|
||||||
|
actual[acct.text] = (frozenset(descs.text.split('\0')), balance.text)
|
||||||
|
in_desc = {'2020 donation'}
|
||||||
|
ex_desc = {'2020 bank maintenance fee'}
|
||||||
|
assert actual['Income:Donations'] == (in_desc, '$20.20')
|
||||||
|
assert actual['Expenses:BankingFees'] == (ex_desc, '$1.00')
|
||||||
|
assert actual['Assets:Checking'] == (in_desc | ex_desc, '($21.20)')
|
||||||
|
|
Loading…
Reference in a new issue