416 lines
16 KiB
Python
416 lines
16 KiB
Python
"""test_reports_query.py - Unit tests for query report"""
|
|
# Copyright © 2021 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import argparse
|
|
import collections
|
|
import copy
|
|
import csv
|
|
import datetime
|
|
import io
|
|
import itertools
|
|
import re
|
|
|
|
import odf.table
|
|
import odf.text
|
|
import pytest
|
|
|
|
from . import testutil
|
|
|
|
from beancount.core import data as bc_data
|
|
from beancount.query import query_compile as bc_query_compile
|
|
from beancount.query import query_execute as bc_query_execute
|
|
from beancount.query import query_parser as bc_query_parser
|
|
from conservancy_beancount.books import FiscalYear
|
|
from conservancy_beancount.reports import query as qmod
|
|
from conservancy_beancount import rtutil
|
|
|
|
from decimal import Decimal
|
|
|
|
UTC = datetime.timezone.utc
|
|
|
|
class MockRewriteRuleset:
|
|
def __init__(self, multiplier=2):
|
|
self.multiplier = multiplier
|
|
|
|
def rewrite(self, posts):
|
|
for post in posts:
|
|
number, currency = post.units
|
|
number *= self.multiplier
|
|
yield post._replace(units=testutil.Amount(number, currency))
|
|
|
|
|
|
class RowContext(bc_query_execute.RowContext):
|
|
def __init__(self, entry, posting=None):
|
|
super().__init__()
|
|
self.entry = entry
|
|
self.posting = posting
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def qparser():
|
|
return bc_query_parser.Parser()
|
|
|
|
@pytest.fixture(scope='module')
|
|
def rt():
|
|
return rtutil.RT(testutil.RTClient())
|
|
|
|
@pytest.fixture(scope='module')
|
|
def ticket_query():
|
|
return qmod.RTTicket.with_client(testutil.RTClient(), 'testfixture')
|
|
|
|
def const_operands(*args):
|
|
return [bc_query_compile.EvalConstant(v) for v in args]
|
|
|
|
def pipe_main(arglist, config, stdout_type=io.StringIO):
|
|
stdout = stdout_type()
|
|
stderr = io.StringIO()
|
|
returncode = qmod.main(arglist, stdout, stderr, config)
|
|
return returncode, stdout, stderr
|
|
|
|
def test_rt_ticket_unconfigured():
|
|
with pytest.raises(RuntimeError):
|
|
qmod.RTTicket(const_operands('id', 'rt-id'))
|
|
|
|
@pytest.mark.parametrize('field_name', ['foo', 'bar'])
|
|
def test_rt_ticket_bad_field(ticket_query, field_name):
|
|
with pytest.raises(ValueError):
|
|
ticket_query(const_operands(field_name, 'rt-id'))
|
|
|
|
@pytest.mark.parametrize('meta_name', ['foo', 'bar'])
|
|
def test_rt_ticket_bad_metadata(ticket_query, meta_name):
|
|
with pytest.raises(ValueError):
|
|
ticket_query(const_operands('id', meta_name))
|
|
|
|
@pytest.mark.parametrize('field_name,meta_name,expected', [
|
|
('id', 'rt-id', 1),
|
|
('Queue', 'approval', 'general'),
|
|
('Requestors', 'invoice', ['mx1@example.org', 'requestor2@example.org']),
|
|
('Due', 'tax-reporting', datetime.datetime(2017, 1, 14, 12, 1, 0, tzinfo=UTC)),
|
|
])
|
|
def test_rt_ticket_from_txn(ticket_query, field_name, meta_name, expected):
|
|
func = ticket_query(const_operands(field_name, meta_name))
|
|
txn = testutil.Transaction(**{meta_name: 'rt:1'}, postings=[
|
|
('Assets:Cash', 80),
|
|
])
|
|
context = RowContext(txn, txn.postings[0])
|
|
if not isinstance(expected, list):
|
|
expected = [expected]
|
|
assert func(context) == expected
|
|
|
|
@pytest.mark.parametrize('field_name,meta_name,expected', [
|
|
('id', 'rt-id', 2),
|
|
('Queue', 'approval', 'general'),
|
|
('Requestors', 'invoice', ['mx2@example.org', 'requestor2@example.org']),
|
|
('Due', 'tax-reporting', datetime.datetime(2017, 1, 14, 12, 2, 0, tzinfo=UTC)),
|
|
])
|
|
def test_rt_ticket_from_post(ticket_query, field_name, meta_name, expected):
|
|
func = ticket_query(const_operands(field_name, meta_name))
|
|
txn = testutil.Transaction(**{meta_name: 'rt:1'}, postings=[
|
|
('Assets:Cash', 110, {meta_name: 'rt:2/8'}),
|
|
])
|
|
context = RowContext(txn, txn.postings[0])
|
|
if not isinstance(expected, list):
|
|
expected = [expected]
|
|
assert func(context) == expected
|
|
|
|
@pytest.mark.parametrize('field_name,meta_name,expected,on_txn', [
|
|
('id', 'approval', [1, 2], True),
|
|
('Queue', 'check', ['general', 'general'], False),
|
|
('Requestors', 'invoice', [
|
|
'mx1@example.org',
|
|
'mx2@example.org',
|
|
'requestor2@example.org',
|
|
'requestor2@example.org',
|
|
], False),
|
|
])
|
|
def test_rt_ticket_multi_results(ticket_query, field_name, meta_name, expected, on_txn):
|
|
func = ticket_query(const_operands(field_name, meta_name))
|
|
txn = testutil.Transaction(**{'rt-id': 'rt:1'}, postings=[
|
|
('Assets:Cash', 110, {'rt-id': 'rt:2'}),
|
|
])
|
|
post = txn.postings[0]
|
|
meta = txn.meta if on_txn else post.meta
|
|
meta[meta_name] = 'rt:1/2 Docs/12.pdf rt:2/8'
|
|
context = RowContext(txn, post)
|
|
assert sorted(func(context)) == expected
|
|
|
|
@pytest.mark.parametrize('meta_value,on_txn', testutil.combine_values(
|
|
['', 'Docs/34.pdf', 'Docs/100.pdf Docs/120.pdf'],
|
|
[True, False],
|
|
))
|
|
def test_rt_ticket_no_results(ticket_query, meta_value, on_txn):
|
|
func = ticket_query(const_operands('Queue', 'check'))
|
|
txn = testutil.Transaction(**{'rt-id': 'rt:1'}, postings=[
|
|
('Assets:Cash', 110, {'rt-id': 'rt:2'}),
|
|
])
|
|
post = txn.postings[0]
|
|
meta = txn.meta if on_txn else post.meta
|
|
meta['check'] = meta_value
|
|
context = RowContext(txn, post)
|
|
assert func(context) == []
|
|
|
|
def test_rt_ticket_caches_tickets():
|
|
rt_client = testutil.RTClient()
|
|
rt_client.TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
|
|
ticket_query = qmod.RTTicket.with_client(rt_client, 'cachetestA')
|
|
func = ticket_query(const_operands('id', 'rt-id'))
|
|
txn = testutil.Transaction(postings=[
|
|
('Assets:Cash', 160, {'rt-id': 'rt:3'}),
|
|
])
|
|
context = RowContext(txn, txn.postings[0])
|
|
assert func(context) == [3]
|
|
del rt_client.TICKET_DATA['3']
|
|
assert func(context) == [3]
|
|
|
|
def test_rt_ticket_caches_tickets_not_found():
|
|
rt_client = testutil.RTClient()
|
|
rt_client.TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
|
|
rt3 = rt_client.TICKET_DATA.pop('3')
|
|
ticket_query = qmod.RTTicket.with_client(rt_client, 'cachetestB')
|
|
func = ticket_query(const_operands('id', 'rt-id'))
|
|
txn = testutil.Transaction(postings=[
|
|
('Assets:Cash', 160, {'rt-id': 'rt:3'}),
|
|
])
|
|
context = RowContext(txn, txn.postings[0])
|
|
assert func(context) == []
|
|
rt_client.TICKET_DATA['3'] = rt3
|
|
assert func(context) == []
|
|
|
|
def test_books_loader_empty():
|
|
result = qmod.BooksLoader(None)()
|
|
assert not result.entries
|
|
assert len(result.errors) == 1
|
|
|
|
def test_books_loader_plain():
|
|
books_path = testutil.test_path(f'books/books/2018.beancount')
|
|
loader = testutil.TestBooksLoader(books_path)
|
|
result = qmod.BooksLoader(loader)()
|
|
assert not result.errors
|
|
assert result.entries
|
|
min_date = datetime.date(2018, 3, 1)
|
|
assert all(ent.date >= min_date for ent in result.entries)
|
|
|
|
def test_books_loader_rewrites():
|
|
rewrites = [MockRewriteRuleset()]
|
|
books_path = testutil.test_path(f'books/books/2018.beancount')
|
|
loader = testutil.TestBooksLoader(books_path)
|
|
result = qmod.BooksLoader(loader, None, None, rewrites)()
|
|
assert not result.errors
|
|
assert result.entries
|
|
numbers = frozenset(
|
|
abs(post.units.number)
|
|
for entry in result.entries
|
|
for post in getattr(entry, 'postings', ())
|
|
)
|
|
assert numbers
|
|
assert all(abs(number) >= 40 for number in numbers)
|
|
|
|
@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
|
|
[['--report-type', 'text'], ['--format=text'], ['-f', 'txt']],
|
|
range(2018, 2021),
|
|
))
|
|
def test_text_query(arglist, fy):
|
|
books_path = testutil.test_path(f'books/books/{fy}.beancount')
|
|
config = testutil.TestConfig(books_path=books_path)
|
|
arglist += ['select', 'date,', 'narration,', 'account,', 'position']
|
|
returncode, stdout, stderr = pipe_main(arglist, config)
|
|
assert returncode == 0
|
|
stdout.seek(0)
|
|
lines = iter(stdout)
|
|
next(lines); next(lines) # Skip header
|
|
for count, line in enumerate(lines, 1):
|
|
assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line)
|
|
assert count >= 2
|
|
|
|
@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
|
|
[['--format=csv'], ['-f', 'csv'], ['-t', 'csv']],
|
|
range(2018, 2021),
|
|
))
|
|
def test_csv_query(arglist, fy):
|
|
books_path = testutil.test_path(f'books/books/{fy}.beancount')
|
|
config = testutil.TestConfig(books_path=books_path)
|
|
arglist += ['select', 'date,', 'narration,', 'account,', 'position']
|
|
returncode, stdout, stderr = pipe_main(arglist, config)
|
|
assert returncode == 0
|
|
stdout.seek(0)
|
|
for count, row in enumerate(csv.DictReader(stdout), 1):
|
|
assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date'])
|
|
assert row['narration'].startswith(f'{fy} ')
|
|
assert count >= 2
|
|
|
|
@pytest.mark.parametrize('end_index', range(3))
|
|
def test_rewrite_query(end_index):
|
|
books_path = testutil.test_path(f'books/books/2018.beancount')
|
|
config = testutil.TestConfig(books_path=books_path)
|
|
accounts = ['Assets', 'Income']
|
|
expected = frozenset(accounts[:end_index])
|
|
rewrite_paths = [
|
|
testutil.test_path(f'userconfig/Rewrite{s}.yml')
|
|
for s in expected
|
|
]
|
|
arglist = [f'--rewrite-rules={path}' for path in rewrite_paths]
|
|
arglist.append('--format=txt')
|
|
arglist.append('select any_meta("root") as root')
|
|
returncode, stdout, stderr = pipe_main(arglist, config)
|
|
assert returncode == 0
|
|
stdout.seek(0)
|
|
actual = frozenset(line.rstrip('\n') for line in stdout)
|
|
assert expected.issubset(actual)
|
|
assert frozenset(accounts).difference(expected).isdisjoint(actual)
|
|
|
|
def test_ods_amount_formatting(qparser):
|
|
statement = qparser.parse('SELECT UNITS(position)')
|
|
row_types = [('amount', bc_data.Amount)]
|
|
row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
|
|
ods = qmod.QueryODS()
|
|
ods.write_query(statement, row_types, row_source)
|
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
|
assert next(actual)[0].text == 'Amount'
|
|
assert next(actual)[0].text == '$12.00'
|
|
assert next(actual)[0].text == '¥1,480'
|
|
assert next(actual, None) is None
|
|
|
|
def test_ods_datetime_formatting(qparser):
|
|
statement = qparser.parse('SELECT date')
|
|
row_types = [('date', datetime.date)]
|
|
row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
|
|
ods = qmod.QueryODS()
|
|
ods.write_query(statement, row_types, row_source)
|
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
|
assert next(actual)[0].text == 'Date'
|
|
assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
|
|
assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
|
|
assert next(actual, None) is None
|
|
|
|
@pytest.mark.parametrize('meta_key,meta_func', [
|
|
('check', 'ANY_META'),
|
|
('purchase-order', 'META'),
|
|
('rt-id', 'META_DOCS'),
|
|
])
|
|
def test_ods_link_formatting(qparser, rt, meta_key, meta_func):
|
|
meta_func_returns_list = meta_func == 'META_DOCS'
|
|
statement = qparser.parse(f'SELECT {meta_func}({meta_key!r}) AS docs')
|
|
row_types = [('docs', list if meta_func_returns_list else str)]
|
|
row_source = [
|
|
(s.split() if meta_func_returns_list else s,)
|
|
for s in ['rt:1/5', 'rt:3 Checks/9.pdf']
|
|
]
|
|
ods = qmod.QueryODS(rt)
|
|
ods.write_query(statement, row_types, row_source)
|
|
rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
|
assert next(rows).text == 'Docs'
|
|
actual = iter(
|
|
[link.text for link in row.getElementsByType(odf.text.A)]
|
|
for row in rows
|
|
)
|
|
assert next(actual) == ['photo.jpg']
|
|
assert next(actual) == ['rt:3', '9.pdf']
|
|
assert next(actual, None) is None
|
|
|
|
def test_ods_meta_formatting(qparser):
|
|
statement = qparser.parse('SELECT ANY_META("entity") AS entity')
|
|
row_types = [('entity', object)]
|
|
row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
|
|
ods = qmod.QueryODS()
|
|
ods.write_query(statement, row_types, row_source)
|
|
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
|
assert next(actual)[0].text == 'Entity'
|
|
assert next(actual)[0].text == '$14.00'
|
|
assert next(actual)[0].text == ''
|
|
assert next(actual)[0].text == 'foo bar'
|
|
assert next(actual, None) is None
|
|
|
|
def test_ods_multicolumn_write(qparser, rt):
|
|
statement = qparser.parse(
|
|
'SELECT MIN(date) AS date, SET(META_DOCS("rt-id")) AS tix, STR_META("entity") AS entity',
|
|
)
|
|
row_types = [('date', datetime.date), ('tix', set), ('entity', str)]
|
|
row_source = [
|
|
(testutil.PAST_DATE, {'rt:1'}, 'AA'),
|
|
(testutil.FY_START_DATE, {'rt:2'}, 'BB'),
|
|
(testutil.FUTURE_DATE, {'rt:3', 'rt:4'}, 'CC'),
|
|
]
|
|
ods = qmod.QueryODS(rt)
|
|
ods.write_query(statement, list(row_types), list(row_source))
|
|
actual = iter(
|
|
cell.text
|
|
for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
|
|
for cell in row
|
|
)
|
|
for expected, _ in row_types:
|
|
assert next(actual) == expected.title()
|
|
assert next(actual) == testutil.PAST_DATE.isoformat()
|
|
assert next(actual) == 'rt:1'
|
|
assert next(actual) == 'AA'
|
|
assert next(actual) == testutil.FY_START_DATE.isoformat()
|
|
assert next(actual) == 'rt:2'
|
|
assert next(actual) == 'BB'
|
|
assert next(actual) == testutil.FUTURE_DATE.isoformat()
|
|
assert frozenset(next(actual).split('\0')) == row_source[-1][1]
|
|
assert next(actual) == 'CC'
|
|
assert next(actual, None) is None
|
|
|
|
def test_ods_is_empty(qparser):
|
|
statement = qparser.parse('SELECT * WHERE date < 1900-01-01')
|
|
ods = qmod.QueryODS()
|
|
assert ods.is_empty()
|
|
ods.write_query(statement, [], [])
|
|
assert not ods.is_empty()
|
|
|
|
@pytest.mark.parametrize('fy,account,amt_prefix', [
|
|
(2018, 'Assets', '($'),
|
|
(2019, 'Income', '$'),
|
|
])
|
|
def test_ods_output(fy, account, amt_prefix):
|
|
books_path = testutil.test_path(f'books/books/{fy}.beancount')
|
|
config = testutil.TestConfig(books_path=books_path)
|
|
arglist = [
|
|
'-O', '-',
|
|
'-f', 'ods',
|
|
f'SELECT date, narration, UNITS(position) WHERE account ~ "^{account}:"',
|
|
]
|
|
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
|
|
assert returncode == 0
|
|
with stdout:
|
|
stdout.seek(0)
|
|
ods_doc = odf.opendocument.load(stdout)
|
|
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
|
next(rows) # Skip header row
|
|
amt_pattern = rf'^{re.escape(amt_prefix)}\d'
|
|
for count, row in enumerate(rows, 1):
|
|
date, narration, amount = row.childNodes
|
|
assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text)
|
|
assert narration.text.startswith(f'{fy} ')
|
|
assert re.match(amt_pattern, amount.text)
|
|
assert count
|
|
|
|
def test_ods_aggregate_output():
|
|
books_path = testutil.test_path(f'books/books/2020.beancount')
|
|
config = testutil.TestConfig(books_path=books_path)
|
|
arglist = [
|
|
'-O', '-',
|
|
'-f', 'ods',
|
|
'SELECT account, SET(narration), SUM(UNITS(position))',
|
|
'WHERE date >= 2020-04-01 AND date <= 2020-04-02',
|
|
'GROUP BY account ORDER BY account ASC',
|
|
]
|
|
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
|
|
assert returncode == 0
|
|
with stdout:
|
|
stdout.seek(0)
|
|
ods_doc = odf.opendocument.load(stdout)
|
|
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
|
|
next(rows) # Skip header row
|
|
actual = {}
|
|
for row in rows:
|
|
acct, descs, balance = row.childNodes
|
|
actual[acct.text] = (frozenset(descs.text.split('\0')), balance.text)
|
|
in_desc = {'2020 donation'}
|
|
ex_desc = {'2020 bank maintenance fee'}
|
|
assert actual['Income:Donations'] == (in_desc, '$20.20')
|
|
assert actual['Expenses:BankingFees'] == (ex_desc, '$1.00')
|
|
assert actual['Assets:Checking'] == (in_desc | ex_desc, '($21.20)')
|