"""test_reports_query.py - Unit tests for query report""" # Copyright © 2021 Brett Smith # License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0 # # Full copyright and licensing details can be found at toplevel file # LICENSE.txt in the repository. import argparse import collections import copy import csv import datetime import io import itertools import re import odf.table import odf.text import pytest from . import testutil from beancount.core import data as bc_data from conservancy_beancount.books import FiscalYear from conservancy_beancount.reports import query as qmod from conservancy_beancount import rtutil from decimal import Decimal class MockRewriteRuleset: def __init__(self, multiplier=2): self.multiplier = multiplier def rewrite(self, posts): for post in posts: number, currency = post.units number *= self.multiplier yield post._replace(units=testutil.Amount(number, currency)) @pytest.fixture(scope='module') def fy(): return FiscalYear(3, 1) @pytest.fixture(scope='module') def rt(): return rtutil.RT(testutil.RTClient()) def pipe_main(arglist, config, stdout_type=io.StringIO): stdout = stdout_type() stderr = io.StringIO() returncode = qmod.main(arglist, stdout, stderr, config) return returncode, stdout, stderr def query_args(query=None, start_date=None, stop_date=None, join=None, select=None): if isinstance(join, str): join = qmod.JoinOperator[join] if select is None: select = [] elif isinstance(select, str): select = select.split(',') return argparse.Namespace(**locals()) def test_books_loader_empty(): result = qmod.BooksLoader(None)() assert not result.entries assert len(result.errors) == 1 def test_books_loader_plain(): books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader)() assert not result.errors assert result.entries min_date = datetime.date(2018, 3, 1) assert all(ent.date >= min_date for ent in result.entries) def test_books_loader_rewrites(): rewrites = [MockRewriteRuleset()] books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader, None, None, rewrites)() assert not result.errors assert result.entries numbers = frozenset( abs(post.units.number) for entry in result.entries for post in getattr(entry, 'postings', ()) ) assert numbers assert all(abs(number) >= 40 for number in numbers) @pytest.mark.parametrize('file_s', [None, '', ' \n \n\n']) def test_build_query_empty(fy, file_s): args = query_args() if file_s is None: query = qmod.build_query(args, fy) else: with io.StringIO(file_s) as qfile: query = qmod.build_query(args, fy, qfile) assert query is None @pytest.mark.parametrize('query_str', [ 'SELECT * WHERE date >= 2018-03-01', 'select *', 'JOURNAL "Income:Donations"', 'journal', 'BALANCES FROM year=2018', 'balances', ]) def test_build_query_in_arglist(fy, query_str): args = query_args(query_str.split(), testutil.PAST_DATE, testutil.FUTURE_DATE) assert qmod.build_query(args, fy) == query_str @pytest.mark.parametrize('argname,argval', [ ('join', qmod.JoinOperator.AND), ('join', qmod.JoinOperator.OR), ('select', 'date,flag'), ('select', 'position,rt-id'), ]) def test_build_query_cant_mix_switches_with_full_query(fy, argname, argval): args = query_args(['journal'], **{argname: argval}) with pytest.raises(ValueError): qmod.build_query(args, fy) @pytest.mark.parametrize('count,join_op', enumerate(qmod.JoinOperator, 1)) def test_build_query_where_arglist_conditions(fy, count, join_op): conds = ['account ~ "^Income:"', 'year >= 2018'][:count] args = query_args(conds, join=join_op.name) query = qmod.build_query(args, fy) assert query.startswith('SELECT ') cond_index = query.index(' WHERE ') + 7 assert query[cond_index:] == '({})'.format(join_op.join(conds)) @pytest.mark.parametrize('select', [ ['flag'], ['check'], ['flag', 'month'], ['cost_label', 'cost_metakey'], ['approval', 'receipt'], ]) def test_build_query_select_fields(fy, select): args = query_args(['year>2018'], select=list(select)) query = qmod.build_query(args, fy) assert query.startswith('SELECT ') start_index = 7 for field in select: if field != 'flag' and field != 'month' and field != 'cost_label': field = f'ANY_META("{field}") AS {field.replace("-", "_")}' match = re.search(rf',\s*{re.escape(field)}\b', query) assert match, f"field {field!r} not found in query: {query!r}" assert match.start() >= start_index start_index = match.end() assert query[start_index:start_index + 7] == ' WHERE ' @pytest.mark.parametrize('argname,date_arg', itertools.product( ['start_date', 'stop_date'], [testutil.FY_START_DATE, testutil.FY_START_DATE.year], )) def test_build_query_one_date_arg(fy, argname, date_arg): query_kwargs = { argname: date_arg, 'query': ['flag = "*"', 'flag = "!"'], 'join': 'OR', } args = query_args(**query_kwargs) query = qmod.build_query(args, fy) assert query.startswith('SELECT ') cond_index = query.index(' WHERE ') + 7 if argname == 'start_date': expect_op = '>=' year_to_date = fy.first_date else: expect_op = '<' year_to_date = fy.next_fy_date if not isinstance(date_arg, datetime.date): date_arg = year_to_date(date_arg) assert query[cond_index:] == '({}) AND date {} {}'.format( ' OR '.join(query_kwargs['query']), expect_op, date_arg.isoformat(), ) @pytest.mark.parametrize('start_date,stop_date', itertools.product( [testutil.PAST_DATE, testutil.PAST_DATE.year], [testutil.FUTURE_DATE, testutil.FUTURE_DATE.year], )) def test_build_query_two_date_args(fy, start_date, stop_date): args = query_args(['account ~ "^Equity:"'], start_date, stop_date, 'AND') query = qmod.build_query(args, fy) assert query.startswith('SELECT ') cond_index = query.index(' WHERE ') + 7 if isinstance(start_date, int): start_date = fy.first_date(start_date) if isinstance(stop_date, int): stop_date = fy.next_fy_date(stop_date) assert query[cond_index:] == '({}) AND date >= {} AND date < {}'.format( args.query[0], start_date.isoformat(), stop_date.isoformat(), ) def test_build_query_plain_from_file(fy): with io.StringIO("SELECT *\n WHERE account ~ '^Assets:';\n") as qfile: query = qmod.build_query(query_args(), fy, qfile) assert re.fullmatch(r"SELECT \*\s+WHERE account ~ '\^Assets:';\s*", query) def test_build_query_from_file_where_clauses(fy): conds = ["account ~ '^Income:'", "account ~ '^Expenses:'"] args = query_args(None, testutil.PAST_DATE, testutil.FUTURE_DATE, 'OR') with io.StringIO(''.join(f'{s}\n' for s in conds)) as qfile: query = qmod.build_query(args, fy, qfile) assert query.startswith('SELECT ') cond_index = query.index(' WHERE ') + 7 assert query[cond_index:] == '({}) AND date >= {} AND date < {}'.format( ' OR '.join(conds), testutil.PAST_DATE.isoformat(), testutil.FUTURE_DATE.isoformat(), ) @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--report-type', 'text'], ['--format=text'], ['-f', 'txt']], range(2018, 2021), )) def test_text_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) lines = iter(stdout) next(lines); next(lines) # Skip header for count, line in enumerate(lines, 1): assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line) assert count >= 2 @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--format=csv'], ['-f', 'csv'], ['-t', 'csv']], range(2018, 2021), )) def test_csv_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) for count, row in enumerate(csv.DictReader(stdout), 1): assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date']) assert row['narration'].startswith(f'{fy} ') assert count >= 2 @pytest.mark.parametrize('end_index', range(3)) def test_rewrite_query(end_index): books_path = testutil.test_path(f'books/books/2018.beancount') config = testutil.TestConfig(books_path=books_path) accounts = ['Assets', 'Income'] expected = frozenset(accounts[:end_index]) rewrite_paths = [ testutil.test_path(f'userconfig/Rewrite{s}.yml') for s in expected ] arglist = [f'--rewrite-rules={path}' for path in rewrite_paths] arglist.append('--format=txt') arglist.append('select any_meta("root") as root') returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) actual = frozenset(line.rstrip('\n') for line in stdout) assert expected.issubset(actual) assert frozenset(accounts).difference(expected).isdisjoint(actual) def test_ods_amount_formatting(): row_types = [('amount', bc_data.Amount)] row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)] ods = qmod.QueryODS() ods.write_query(row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Amount' assert next(actual)[0].text == '$12.00' assert next(actual)[0].text == '¥1,480' assert next(actual, None) is None def test_ods_datetime_formatting(): row_types = [('date', datetime.date)] row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)] ods = qmod.QueryODS() ods.write_query(row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Date' assert next(actual)[0].text == testutil.PAST_DATE.isoformat() assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat() assert next(actual, None) is None @pytest.mark.parametrize('meta_key,header_text', [ ('check', 'Check'), ('purchase-order', 'Purchase Order'), ('rt-id', 'Ticket'), ]) def test_ods_link_formatting(rt, meta_key, header_text): row_types = [(meta_key.replace('-', '_'), object)] row_source = [('rt:1/5',), ('rt:3 Checks/9.pdf',)] ods = qmod.QueryODS(rt) ods.write_query(row_types, row_source) rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) assert next(rows).text == header_text actual = iter( [link.text for link in row.getElementsByType(odf.text.A)] for row in rows ) assert next(actual) == ['photo.jpg'] assert next(actual) == ['rt:3', '9.pdf'] assert next(actual, None) is None def test_ods_meta_formatting(): row_types = [('metadata', object)] row_source = [(testutil.Amount(14),), (None,), ('foo bar',)] ods = qmod.QueryODS() ods.write_query(row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Metadata' assert next(actual)[0].text == '$14.00' assert next(actual)[0].text == '' assert next(actual)[0].text == 'foo bar' assert next(actual, None) is None def test_ods_multicolumn_write(rt): row_types = [('date', datetime.date), ('rt-id', object), ('desc', str)] row_source = [ (testutil.PAST_DATE, 'rt:1', 'aaa'), (testutil.FY_START_DATE, 'rt:2', 'bbb'), (testutil.FUTURE_DATE, 'rt:3', 'ccc'), ] ods = qmod.QueryODS(rt) ods.write_query(row_types, row_source) actual = iter( cell.text for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) for cell in row ) assert next(actual) == 'Date' assert next(actual) == 'Ticket' assert next(actual) == 'Desc' assert next(actual) == testutil.PAST_DATE.isoformat() assert next(actual) == 'rt:1' assert next(actual) == 'aaa' assert next(actual) == testutil.FY_START_DATE.isoformat() assert next(actual) == 'rt:2' assert next(actual) == 'bbb' assert next(actual) == testutil.FUTURE_DATE.isoformat() assert next(actual) == 'rt:3' assert next(actual) == 'ccc' assert next(actual, None) is None def test_ods_is_empty(): ods = qmod.QueryODS() assert ods.is_empty() ods.write_query([], []) assert not ods.is_empty() @pytest.mark.parametrize('fy,account,amt_prefix', [ (2018, 'Assets', '($'), (2019, 'Income', '$'), ]) def test_ods_output(fy, account, amt_prefix): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist = ['-O', '-', '-f', 'ods', f'account ~ "^{account}:"'] returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO) assert returncode == 0 stdout.seek(0) ods_doc = odf.opendocument.load(stdout) rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) next(rows) # Skip header row amt_pattern = rf'^{re.escape(amt_prefix)}\d' for count, row in enumerate(rows, 1): date, entity, narration, amount = row.childNodes assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text) assert narration.text.startswith(f'{fy} ') assert re.match(amt_pattern, amount.text) assert count