conservancy_beancount/tests/test_reports_query.py

246 lines
8.7 KiB
Python
Raw Normal View History

"""test_reports_query.py - Unit tests for query report"""
# Copyright © 2021 Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.
import argparse
import collections
import copy
import csv
import datetime
import io
import itertools
import re
2021-03-06 20:45:11 +00:00
import odf.table
import odf.text
import pytest
from . import testutil
2021-03-06 20:45:11 +00:00
from beancount.core import data as bc_data
from conservancy_beancount.books import FiscalYear
from conservancy_beancount.reports import query as qmod
2021-03-06 20:45:11 +00:00
from conservancy_beancount import rtutil
from decimal import Decimal
class MockRewriteRuleset:
def __init__(self, multiplier=2):
self.multiplier = multiplier
def rewrite(self, posts):
for post in posts:
number, currency = post.units
number *= self.multiplier
yield post._replace(units=testutil.Amount(number, currency))
2021-03-06 20:45:11 +00:00
@pytest.fixture(scope='module')
def rt():
return rtutil.RT(testutil.RTClient())
def pipe_main(arglist, config, stdout_type=io.StringIO):
stdout = stdout_type()
stderr = io.StringIO()
returncode = qmod.main(arglist, stdout, stderr, config)
return returncode, stdout, stderr
def test_books_loader_empty():
result = qmod.BooksLoader(None)()
assert not result.entries
assert len(result.errors) == 1
def test_books_loader_plain():
books_path = testutil.test_path(f'books/books/2018.beancount')
loader = testutil.TestBooksLoader(books_path)
result = qmod.BooksLoader(loader)()
assert not result.errors
assert result.entries
min_date = datetime.date(2018, 3, 1)
assert all(ent.date >= min_date for ent in result.entries)
def test_books_loader_rewrites():
rewrites = [MockRewriteRuleset()]
books_path = testutil.test_path(f'books/books/2018.beancount')
loader = testutil.TestBooksLoader(books_path)
result = qmod.BooksLoader(loader, None, None, rewrites)()
assert not result.errors
assert result.entries
numbers = frozenset(
abs(post.units.number)
for entry in result.entries
for post in getattr(entry, 'postings', ())
)
assert numbers
assert all(abs(number) >= 40 for number in numbers)
@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
[['--report-type', 'text'], ['--format=text'], ['-f', 'txt']],
range(2018, 2021),
))
def test_text_query(arglist, fy):
books_path = testutil.test_path(f'books/books/{fy}.beancount')
config = testutil.TestConfig(books_path=books_path)
arglist += ['select', 'date,', 'narration,', 'account,', 'position']
returncode, stdout, stderr = pipe_main(arglist, config)
assert returncode == 0
stdout.seek(0)
lines = iter(stdout)
next(lines); next(lines) # Skip header
for count, line in enumerate(lines, 1):
assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line)
assert count >= 2
@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
[['--format=csv'], ['-f', 'csv'], ['-t', 'csv']],
range(2018, 2021),
))
def test_csv_query(arglist, fy):
books_path = testutil.test_path(f'books/books/{fy}.beancount')
config = testutil.TestConfig(books_path=books_path)
arglist += ['select', 'date,', 'narration,', 'account,', 'position']
returncode, stdout, stderr = pipe_main(arglist, config)
assert returncode == 0
stdout.seek(0)
for count, row in enumerate(csv.DictReader(stdout), 1):
assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date'])
assert row['narration'].startswith(f'{fy} ')
assert count >= 2
@pytest.mark.parametrize('end_index', range(3))
def test_rewrite_query(end_index):
books_path = testutil.test_path(f'books/books/2018.beancount')
config = testutil.TestConfig(books_path=books_path)
accounts = ['Assets', 'Income']
expected = frozenset(accounts[:end_index])
rewrite_paths = [
testutil.test_path(f'userconfig/Rewrite{s}.yml')
for s in expected
]
arglist = [f'--rewrite-rules={path}' for path in rewrite_paths]
arglist.append('--format=txt')
arglist.append('select any_meta("root") as root')
returncode, stdout, stderr = pipe_main(arglist, config)
assert returncode == 0
stdout.seek(0)
actual = frozenset(line.rstrip('\n') for line in stdout)
assert expected.issubset(actual)
assert frozenset(accounts).difference(expected).isdisjoint(actual)
2021-03-06 20:45:11 +00:00
def test_ods_amount_formatting():
row_types = [('amount', bc_data.Amount)]
row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
ods = qmod.QueryODS()
ods.write_query(row_types, row_source)
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
assert next(actual)[0].text == 'Amount'
assert next(actual)[0].text == '$12.00'
assert next(actual)[0].text == '¥1,480'
assert next(actual, None) is None
def test_ods_datetime_formatting():
row_types = [('date', datetime.date)]
row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
ods = qmod.QueryODS()
ods.write_query(row_types, row_source)
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
assert next(actual)[0].text == 'Date'
assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
assert next(actual, None) is None
@pytest.mark.parametrize('meta_key,header_text', [
('check', 'Check'),
('purchase-order', 'Purchase Order'),
('rt-id', 'Ticket'),
])
def test_ods_link_formatting(rt, meta_key, header_text):
row_types = [(meta_key.replace('-', '_'), object)]
row_source = [('rt:1/5',), ('rt:3 Checks/9.pdf',)]
ods = qmod.QueryODS(rt)
ods.write_query(row_types, row_source)
rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
assert next(rows).text == header_text
actual = iter(
[link.text for link in row.getElementsByType(odf.text.A)]
for row in rows
)
assert next(actual) == ['photo.jpg']
assert next(actual) == ['rt:3', '9.pdf']
assert next(actual, None) is None
def test_ods_meta_formatting():
row_types = [('metadata', object)]
row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
ods = qmod.QueryODS()
ods.write_query(row_types, row_source)
actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
assert next(actual)[0].text == 'Metadata'
assert next(actual)[0].text == '$14.00'
assert next(actual)[0].text == ''
assert next(actual)[0].text == 'foo bar'
assert next(actual, None) is None
def test_ods_multicolumn_write(rt):
row_types = [('date', datetime.date), ('rt-id', object), ('desc', str)]
row_source = [
(testutil.PAST_DATE, 'rt:1', 'aaa'),
(testutil.FY_START_DATE, 'rt:2', 'bbb'),
(testutil.FUTURE_DATE, 'rt:3', 'ccc'),
]
ods = qmod.QueryODS(rt)
ods.write_query(row_types, row_source)
actual = iter(
cell.text
for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
for cell in row
)
assert next(actual) == 'Date'
assert next(actual) == 'Ticket'
assert next(actual) == 'Desc'
assert next(actual) == testutil.PAST_DATE.isoformat()
assert next(actual) == 'rt:1'
assert next(actual) == 'aaa'
assert next(actual) == testutil.FY_START_DATE.isoformat()
assert next(actual) == 'rt:2'
assert next(actual) == 'bbb'
assert next(actual) == testutil.FUTURE_DATE.isoformat()
assert next(actual) == 'rt:3'
assert next(actual) == 'ccc'
assert next(actual, None) is None
def test_ods_is_empty():
ods = qmod.QueryODS()
assert ods.is_empty()
ods.write_query([], [])
assert not ods.is_empty()
@pytest.mark.parametrize('fy,account,amt_prefix', [
(2018, 'Assets', '($'),
(2019, 'Income', '$'),
])
def test_ods_output(fy, account, amt_prefix):
books_path = testutil.test_path(f'books/books/{fy}.beancount')
config = testutil.TestConfig(books_path=books_path)
arglist = [
'-O', '-',
'-f', 'ods',
f'SELECT date, narration, UNITS(position) WHERE account ~ "^{account}:"',
]
2021-03-06 20:45:11 +00:00
returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
assert returncode == 0
stdout.seek(0)
ods_doc = odf.opendocument.load(stdout)
rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
next(rows) # Skip header row
amt_pattern = rf'^{re.escape(amt_prefix)}\d'
for count, row in enumerate(rows, 1):
date, narration, amount = row.childNodes
2021-03-06 20:45:11 +00:00
assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text)
assert narration.text.startswith(f'{fy} ')
assert re.match(amt_pattern, amount.text)
assert count