"""test_reports_accrual - Unit tests for accrual report"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import collections
import copy
import datetime
import io
import itertools
import operator
import re

import babel.numbers
import odf.opendocument
import odf.table
import odf.text

import pytest

from . import testutil

from decimal import Decimal
from typing import NamedTuple, Optional, Sequence

from beancount.core import data as bc_data
from beancount import loader as bc_loader
from conservancy_beancount import cliutil
from conservancy_beancount import data
from conservancy_beancount import rtutil
from conservancy_beancount.reports import accrual

_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
ACCRUAL_TXNS = [
    entry for entry in _accruals_load[0]
    if hasattr(entry, 'narration')
    and entry.narration != 'Opening balances'
]
ACCRUALS_COUNT = sum(
    1
    for txn in ACCRUAL_TXNS
    for post in txn.postings
    if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
)

ACCOUNTS = [
    'Assets:Receivable:Accounts',
    'Assets:Receivable:Loans',
    'Liabilities:Payable:Accounts',
    'Liabilities:Payable:Vacation',
]

class AgingRow(NamedTuple):
    date: datetime.date
    entity: Sequence[str]
    amount: Optional[Sequence[bc_data.Amount]]
    at_cost: bc_data.Amount
    rt_id: Sequence[str]
    invoice: Sequence[str]
    project: Sequence[str]

    @classmethod
    def make_simple(cls, date, entity, at_cost, invoice,
                    rt_id=None, orig_amount=None, project='Conservancy'):
        if isinstance(date, str):
            date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
        if not isinstance(at_cost, tuple):
            at_cost = testutil.Amount(at_cost)
        if rt_id is None and invoice.startswith('rt:'):
            rt_id, _, _ = invoice.partition('/')
        return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice], [project])

    def check_row_match(self, sheet_row):
        cells = testutil.ODSCell.from_row(sheet_row)
        assert len(cells) >= len(self)
        cells = iter(cells)
        assert next(cells).value == self.date
        assert next(cells).text == '\0'.join(self.entity)
        assert next(cells).text == '\0'.join(
            babel.numbers.format_currency(number, currency, format_type='accounting')
            for number, currency in self.amount or ()
        )
        usd_cell = next(cells)
        assert usd_cell.value_type == 'currency'
        assert usd_cell.value == self.at_cost.number
        assert next(cells).text == '\0'.join(self.project)
        for index, cell in enumerate(cells):
            links = cell.getElementsByType(odf.text.A)
            assert len(links) == len(cell.childNodes)
        assert index >= 1


AGING_AP = [
    AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
    AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
    AgingRow.make_simple('2010-04-25', 'Vendor', 200, 'FIXME'),
    AgingRow.make_simple('2010-04-30', 'Vendor', 220, 'rt:310/3120'),
    AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
    AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
                         orig_amount=[testutil.Amount(1000, 'EUR')]),
    AgingRow.make_simple('2010-06-20', 'StateGov', 50, 'Invoices/2010StateRegistration.pdf'),
]

AGING_AR = [
    AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
    AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
                         'rt://ticket/515/attachments/5150'),
    AgingRow.make_simple('2010-06-15', 'GrantCo', 11500, 'rt:470/4700',
                         project='Development Grant'),
]

class RTClient(testutil.RTClient):
    TICKET_DATA = {
        '40': [
            ('400', 'invoice feb.csv', 'text/csv', '40.4k'),
        ],
        '44': [
            ('440', 'invoice feb.csv', 'text/csv', '40.4k'),
        ],
        '310': [
            ('3100', 'VendorContract.pdf', 'application/pdf', '1.7m'),
            ('3120', 'VendorInvoiceB.pdf', 'application/pdf', '1.8m'),
        ],
        '490': [],
        '505': [],
        '510': [
            ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
            ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
            ('5105', 'payment.png', 'image/png', '51.5k'),
            ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
        ],
        '515': [],
        '520': [],
    }


@pytest.fixture
def accrual_postings():
    return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))

def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
    return wrap_type(
        post for post in postings
        if post.meta.get(key) == value
        and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
    )

def find_row_by_text(row_source, want_text):
    for row in row_source:
        try:
            found_row = row.childNodes[0].text == want_text
        except IndexError:
            found_row = False
        if found_row:
            return row
    return None

def check_aging_sheet(sheet, aging_rows, date, accrue_date):
    if not aging_rows:
        return
    if isinstance(accrue_date, int):
        accrue_date = date + datetime.timedelta(days=accrue_date)
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
    for row in rows:
        if "Aging Report" in row.text:
            break
    else:
        assert None, "Header row not found"
    assert f"Accrued by {accrue_date.isoformat()}" in row.text
    assert f"Unpaid by {date.isoformat()}" in row.text
    expect_rows = iter(aging_rows)
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
    next(expect_rows).check_row_match(row0)
    for actual, expected in zip(rows, expect_rows):
        expected.check_row_match(actual)
    for row in rows:
        if row.text.startswith("Total Aged "):
            break
    else:
        assert None, "Totals rows not found"
    actual_sum = Decimal(row.childNodes[-1].value)
    for row in rows:
        if row.text.startswith("Total Aged "):
            actual_sum += Decimal(row.childNodes[-1].value)
        else:
            break
    assert actual_sum == sum(
        row.at_cost.number
        for row in aging_rows
        if row.date <= accrue_date
        and row.at_cost.number > 0
    )

def check_aging_ods(ods_file,
                    date=None,
                    recv_rows=AGING_AR,
                    pay_rows=AGING_AP,
):
    if date is None:
        date = datetime.date.today()
    ods_file.seek(0)
    ods = odf.opendocument.load(ods_file)
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
    assert len(sheets) == 2
    check_aging_sheet(sheets[0], recv_rows, date, -60)
    check_aging_sheet(sheets[1], pay_rows, date, -30)

@pytest.mark.parametrize('search_terms,expect_count,check_func', [
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
        'Assets:Receivable:', 'Liabilities:Payable:',
    )),
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
    ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
    ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
    ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
     lambda post: post.meta['invoice'].startswith('rt:510/')),
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
])
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
    search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
    actual = list(accrual.filter_search(accrual_postings, search_terms))
    if expect_count < ACCRUALS_COUNT:
        assert ACCRUALS_COUNT > len(actual) >= expect_count
    else:
        assert len(actual) == ACCRUALS_COUNT
    for post in actual:
        assert check_func(post)

@pytest.mark.parametrize('arg,expected', [
    ('aging', accrual.AgingReport),
    ('balance', accrual.BalanceReport),
    ('outgoing', accrual.OutgoingReport),
    ('age', accrual.AgingReport),
    ('bal', accrual.BalanceReport),
    ('out', accrual.OutgoingReport),
    ('outgoings', accrual.OutgoingReport),
])
def test_report_type_by_name(arg, expected):
    assert accrual.ReportType.by_name(arg.lower()).value is expected
    assert accrual.ReportType.by_name(arg.title()).value is expected
    assert accrual.ReportType.by_name(arg.upper()).value is expected

@pytest.mark.parametrize('arg', [
    'unknown',
    'blance',
    'outgong',
])
def test_report_type_by_unknown_name(arg):
    # Raising ValueError helps argparse generate good messages.
    with pytest.raises(ValueError):
        accrual.ReportType.by_name(arg)

@pytest.mark.parametrize('acct_name', ACCOUNTS)
def test_accrual_postings_consistent_account(acct_name):
    meta = {'invoice': '{acct_name} invoice.pdf'}
    txn = testutil.Transaction(postings=[
        (acct_name, 50, meta),
        (acct_name, 25, meta),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.account == acct_name

def test_accrual_postings_entity():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.entity == 'Accruee'
    assert set(related.entities()) == {'Accruee', 'Payee10', 'Payee15'}

def test_accrual_postings_entities():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    actual = related.entities()
    assert next(actual, None) == 'Accruee'
    assert set(actual) == {'Payee10', 'Payee15'}

def test_accrual_postings_entities_no_duplicates():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
        (ACCOUNTS[0], -15, {'entity': 'Accruee'}),
        (ACCOUNTS[0], -10, {'entity': 'Other'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    actual = related.entities()
    assert next(actual, None) == 'Accruee'
    assert next(actual, None) == 'Other'
    assert next(actual, None) is None

def test_accrual_postings_inconsistent_account():
    meta = {'invoice': 'invoice.pdf'}
    txn = testutil.Transaction(postings=[
        (acct_name, index, meta)
        for index, acct_name in enumerate(ACCOUNTS)
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.account is related.INCONSISTENT

def test_accrual_postings_rt_id():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90'}),
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:92'}),
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:94 rt:92'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.rt_id == 'rt:90'

def test_accrual_postings_rt_id_inconsistent():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 10, {'rt-id': 'rt:96'}),
        (ACCOUNTS[0], 10, {'rt-id': 'rt:98 rt:96'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.rt_id is related.INCONSISTENT

def test_accrual_postings_rt_id_none():
    txn = testutil.Transaction(postings=[
        (ACCOUNTS[0], 10),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    assert related.rt_id is None

@pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values(
    ACCOUNTS,
    ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES],
    itertools.count(1),
))
def test_make_consistent_bad_invoice(acct_name, invoice, day):
    txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[
        (acct_name, index * 10, {'invoice': invoice, 'entity': f'BadInvoice{day}'})
        for index in range(1, 4)
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    consistent = dict(related.make_consistent())
    assert len(consistent) == 1
    key = next(iter(consistent))
    assert acct_name in key
    if invoice:
        assert str(invoice) in key
    actual = consistent[key]
    assert actual
    assert len(actual) == 3
    for act_post, exp_post in zip(actual, txn.postings):
        assert act_post.units == exp_post.units
        assert act_post.meta.get('invoice') == invoice

def test_make_consistent_across_accounts():
    invoice = 'Invoices/CrossAccount.pdf'
    txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[
        (acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'})
        for acct_name in ACCOUNTS
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    consistent = dict(related.make_consistent())
    assert len(consistent) == len(ACCOUNTS)
    for key, posts in consistent.items():
        assert len(posts) == 1
        assert posts.account in key

def test_make_consistent_both_invoice_and_account():
    txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[
        (acct_name, 150) for acct_name in ACCOUNTS
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    consistent = dict(related.make_consistent())
    assert len(consistent) == len(ACCOUNTS)
    for key, posts in consistent.items():
        assert len(posts) == 1
        assert posts.account in key

@pytest.mark.parametrize('acct_name', ACCOUNTS)
def test_make_consistent_across_entity(acct_name):
    amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg
    txn = testutil.Transaction(postings=[
        (acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'})
        for n in range(1, 4)
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    consistent = dict(related.make_consistent())
    assert len(consistent) == 3
    for key, posts in consistent.items():
        assert len(posts) == 1
        entities = posts.entities()
        assert next(entities, None) == posts.entity
        assert next(entities, None) is None
        assert posts.entity in key

@pytest.mark.parametrize('acct_name', ACCOUNTS)
def test_make_consistent_entity_differs_accrual_payment(acct_name):
    invoice = 'Invoices/DifferPay.pdf'
    txn = testutil.Transaction(postings=[
        # Depending on the account, the order of the accrual and payment might
        # be swapped here, but that shouldn't matter.
        (acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}),
        (acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}),
    ])
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
    consistent = related.make_consistent()
    _, actual = next(consistent)
    assert actual is related
    assert next(consistent, None) is None

def check_output(output, expect_patterns):
    output.seek(0)
    testutil.check_lines_match(iter(output), expect_patterns)

def run_outgoing(rt_id, postings, rt_client=None):
    if rt_client is None:
        rt_client = RTClient()
    rt_wrapper = rtutil.RT(rt_client)
    if not isinstance(postings, accrual.AccrualPostings):
        postings = accruals_by_meta(postings, rt_id, 'rt-id', wrap_type=accrual.AccrualPostings)
    output = io.StringIO()
    report = accrual.OutgoingReport(rt_wrapper, output)
    report.run({rt_id: postings})
    return output

@pytest.mark.parametrize('invoice,expected', [
    ('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
    ('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
    ('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
    ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
])
def test_balance_report(accrual_postings, invoice, expected, caplog):
    related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
    output = io.StringIO()
    report = accrual.BalanceReport(output)
    report.run({invoice: related})
    assert not caplog.records
    check_output(output, [invoice, expected])

def test_outgoing_report(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:510', accrual_postings, rt_client)
    rt_url = RTClient.DEFAULT_URL[:-9]
    rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
    contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
        r'^PAYMENT TO: Hon\. Mx\. 510$',
        r'^TOTAL TO PAY: \$280\.00$',
        fr'^AGREEMENT: {contract_url}',
        r'^BEANCOUNT ENTRIES:$',
        # For each transaction, check for the date line, a metadata, and the
        # Expenses posting.
        r'^\s*2010-06-10\s',
        fr'^\s+rt-id: "{rt_id_url}"$',
        r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
        r'^\s*2010-06-12\s',
        fr'^\s+contract: "{contract_url}"$',
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
    ])
    assert rt_client.edits == {'510': {
        'CF_payment-amount': 'USD 280.00',
        'CF_payment-method': 'USD ACH',
    }}
    assert 'payment-method:' not in output.getvalue()

def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
    rt_client = RTClient(want_cfs=False)
    output = run_outgoing('rt:510', accrual_postings, rt_client)
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: <mx510@example\.org>$',
        r'^PAYMENT TO:\s*$',
    ])

def test_outgoing_report_fx_amounts(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:520 rt:525', accrual_postings, rt_client)
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
        r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
    ])
    assert rt_client.edits == {'520': {
        'CF_payment-amount': 'EUR 1,000.00 ($1,100.00)',
        'CF_payment-method': 'EUR International Wire',
    }}
    assert 'payment-method:' not in output.getvalue()

def test_outgoing_report_multi_invoice(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:310', accrual_postings, rt_client)
    log, = caplog.records
    assert log.levelname == 'WARNING'
    assert log.message.startswith('cannot set payment-method for rt:310: ')
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
        r'^TOTAL TO PAY: \$420.00$',
    ])
    assert rt_client.edits == {'310': {
        'CF_payment-amount': 'USD 420.00',
    }}
    assert 'payment-method:' not in output.getvalue()

def test_outgoing_report_without_rt_id(accrual_postings, caplog):
    invoice = 'rt://ticket/515/attachments/5150'
    related = accruals_by_meta(
        accrual_postings, invoice, wrap_type=accrual.AccrualPostings,
    )
    output = run_outgoing(None, related)
    assert caplog.records
    log = caplog.records[0]
    assert log.message.startswith(
        f"can't generate outgoings report for {invoice} because no RT ticket available:",
    )
    assert not output.getvalue()

def run_aging_report(postings, today=None):
    if today is None:
        today = datetime.date.today()
    postings = (
        post for post in postings
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
    )
    groups = {
        key: group
        for _, related in accrual.AccrualPostings.group_by_meta(postings, 'invoice')
        for key, group in related.make_consistent()
    }
    output = io.BytesIO()
    rt_wrapper = rtutil.RT(RTClient())
    report = accrual.AgingReport(rt_wrapper, output, today)
    report.run(groups)
    return output

def test_aging_report(accrual_postings):
    output = run_aging_report(accrual_postings)
    check_aging_ods(output)

@pytest.mark.parametrize('date,recv_end,pay_end', [
    # Both these dates are chosen for their off-by-one potential:
    # the first is exactly 30 days after the 2010-06-10 payable;
    # the second is exactly 60 days after the 2010-05-15 receivable.
    (datetime.date(2010, 7, 10), 1, 5),
    (datetime.date(2010, 7, 14), 2, 5),
])
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
    expect_recv = AGING_AR[:recv_end]
    expect_pay = AGING_AP[:pay_end]
    if 10 <= date.day < 12:
        # Take the 60 USD posting out of the invoice 510/6100 payable.
        expect_pay[-1] = expect_pay[-1]._replace(
            at_cost=testutil.Amount(expect_pay[-1].at_cost.number - 60),
        )
    output = run_aging_report(accrual_postings, date)
    check_aging_ods(output, date, expect_recv, expect_pay)

def test_aging_report_entity_consistency(accrual_postings):
    output = run_aging_report((
        post for post in accrual_postings
        if post.meta.get('rt-id') == 'rt:480'
        and post.units.number < 0
    ))
    check_aging_ods(output, None, [], [
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
    ])

def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
    # This date is after the Q3 posting, but too soon after for that to be
    # included in the aging report.
    date = datetime.date(2010, 10, 1)
    output = run_aging_report((
        post for post in accrual_postings
        if post.meta.get('rt-id') == 'rt:470'
    ), date)
    # Date+amount are both from the Q2 posting only.
    check_aging_ods(output, date, [
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
                             project='Development Grant'),
    ], [])

def run_main(arglist, config=None, out_type=io.StringIO):
    if config is None:
        config = testutil.TestConfig(
            books_path=testutil.test_path('books/accruals.beancount'),
            rt_client=RTClient(),
        )
    if out_type is io.BytesIO:
        arglist.insert(0, '--output-file=-')
    output = out_type()
    errors = io.StringIO()
    retcode = accrual.main(arglist, output, errors, config)
    output.seek(0)
    errors.seek(0)
    return retcode, output, errors

def check_main_fails(arglist, config, error_flags):
    retcode, output, errors = run_main(arglist, config)
    assert retcode > 16
    assert (retcode - 16) & error_flags
    assert not output.getvalue()
    return errors

@pytest.mark.parametrize('arglist', [
    ['--report-type=balance', 'entity=EarlyBird'],
    ['--report-type=outgoing', 'entity=EarlyBird'],
])
def test_output_excludes_payments(arglist):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    output.seek(0)
    for line in output:
        assert not re.match(r'\brt:4\d\b', line)

@pytest.mark.parametrize('arglist,expect_invoice', [
    (['40'], 'rt:40/400'),
    (['44/440'], 'rt:44/440'),
])
def test_output_payments_when_only_match(arglist, expect_invoice):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        rf'^{re.escape(expect_invoice)}:$',
        r' outstanding since ',
    ])

@pytest.mark.parametrize('arglist,expect_amount', [
    (['310'], 420),
    (['310/3120'], 220),
    (['-t', 'out', 'entity=Vendor'], 420),
])
def test_main_outgoing_report(arglist, expect_amount):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    rt_url = RTClient.DEFAULT_URL[:-9]
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>')
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>')
    check_output(output, [
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
        rf'^TOTAL TO PAY: \${expect_amount}\.00$',
        r'^\s*2010-04-30\s',
        r'^\s+Expenses:Travel\s+220 USD$',
    ])

@pytest.mark.parametrize('arglist', [
    ['-t', 'balance'],
    ['515/5150'],
])
def test_main_balance_report(arglist):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        r'\brt://ticket/515/attachments/5150:$',
        r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
    ])

def test_main_balance_report_because_no_rt_id():
    invoice = 'Invoices/2010StateRegistration.pdf'
    retcode, output, errors = run_main([invoice])
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        rf'\b{re.escape(invoice)}:$',
        r'^\s+-50\.00 USD outstanding since 2010-06-20$',
    ])

@pytest.mark.parametrize('arglist', [
    [],
    ['entity=Lawyer'],
])
def test_main_aging_report(arglist):
    if arglist:
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
    else:
        recv_rows = AGING_AR
        pay_rows = AGING_AP
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
    assert not errors.getvalue()
    assert retcode == 0
    check_aging_ods(output, None, recv_rows, pay_rows)

def test_main_no_books():
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
    testutil.check_lines_match(iter(errors), [
        r':[01]: +no books to load in configuration\b',
    ])

@pytest.mark.parametrize('arglist', [
    ['499'],
    ['505/99999'],
    ['-t', 'balance', 'entity=NonExistent'],
])
def test_main_no_matches(arglist, caplog):
    check_main_fails(arglist, None, 8)
    testutil.check_logs_match(caplog, [
        ('WARNING', 'no matching entries found to report'),
    ])

def test_main_no_rt(caplog):
    config = testutil.TestConfig(
        books_path=testutil.test_path('books/accruals.beancount'),
    )
    check_main_fails(['-t', 'out'], config, 4)
    testutil.check_logs_match(caplog, [
        ('ERROR', 'unable to generate outgoing report: RT client is required'),
    ])