"""test_reports_accrual - Unit tests for accrual report"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import collections
import copy
import datetime
import io
import itertools
import operator
import re

import babel.numbers
import odf.opendocument
import odf.table
import odf.text

import pytest

from . import testutil

from decimal import Decimal
from typing import NamedTuple, Optional, Sequence

from beancount.core import data as bc_data
from beancount import loader as bc_loader
from conservancy_beancount import cliutil
from conservancy_beancount import data
from conservancy_beancount import rtutil
from conservancy_beancount.reports import accrual

_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
ACCRUAL_TXNS = [
    entry for entry in _accruals_load[0]
    if hasattr(entry, 'narration')
    and entry.narration != 'Opening balances'
]
ACCRUALS_COUNT = sum(
    1
    for txn in ACCRUAL_TXNS
    for post in txn.postings
    if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
)

# Accounts where invoice and entity metadata are both used to identify accruals.
INVOICE_ACCOUNTS = [
    'Assets:Receivable:Accounts',
    'Assets:Receivable:Loans',
    'Liabilities:Payable:Accounts',
]

# Accounts where only entity metadata is used to identify accruals.
ENTITY_ACCOUNTS = [
    'Assets:Prepaid:Expenses',
    'Liabilities:Payable:Vacation',
]

ACCOUNTS = INVOICE_ACCOUNTS + ENTITY_ACCOUNTS

AGE_SUM_RE = re.compile(r'(?:\b(\d+) Years?)?(?: ?\b(\d+) Days?)?[–:]')

class AgingRow(NamedTuple):
    date: datetime.date
    entity: Sequence[str]
    amount: Optional[Sequence[bc_data.Amount]]
    at_cost: bc_data.Amount
    rt_id: Sequence[str]
    invoice: Sequence[str]
    project: Sequence[str]

    @classmethod
    def make_simple(cls, date, entity, at_cost, invoice,
                    rt_id=None, orig_amount=None, project='Conservancy'):
        if isinstance(date, str):
            date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
        if not isinstance(at_cost, tuple):
            at_cost = testutil.Amount(at_cost)
        if rt_id is None and invoice.startswith('rt:'):
            rt_id, _, _ = invoice.partition('/')
        return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice], [project])

    def check_row_match(self, sheet_row):
        cells = testutil.ODSCell.from_row(sheet_row)
        assert len(cells) >= len(self)
        cells = iter(cells)
        assert next(cells).value == self.date
        assert next(cells).text == '\0'.join(self.entity)
        assert next(cells).text == '\0'.join(
            babel.numbers.format_currency(number, currency, format_type='accounting')
            for number, currency in self.amount or ()
        )
        usd_cell = next(cells)
        assert usd_cell.value_type == 'currency'
        assert usd_cell.value == self.at_cost.number
        assert next(cells).text == '\0'.join(self.project)
        for index, cell in enumerate(cells):
            links = cell.getElementsByType(odf.text.A)
            assert len(links) == len(cell.childNodes)
        assert index >= 1


AGING_AP = [
    AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
    AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
    AgingRow.make_simple('2010-04-25', 'Vendor', 200, 'FIXME'),
    AgingRow.make_simple('2010-04-30', 'Vendor', 220, 'rt:310/3120'),
    AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
    AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
                         orig_amount=[testutil.Amount(1000, 'EUR')]),
    AgingRow.make_simple('2010-06-20', 'StateGov', 50, 'Invoices/2010StateRegistration.pdf'),
]

AGING_AR = [
    AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
    AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
                         'rt://ticket/515/attachments/5150'),
    AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
                         project='Development Grant'),
    AgingRow.make_simple('2010-09-15', 'GrantCo', 6000, 'rt:470/4700',
                         project='Development Grant'),
]

class RTClient(testutil.RTClient):
    TICKET_DATA = {
        '40': [
            ('400', 'invoice feb.csv', 'text/csv', '40.4k'),
        ],
        '44': [
            ('440', 'invoice feb.csv', 'text/csv', '40.4k'),
        ],
        '310': [
            ('3100', 'VendorContract.pdf', 'application/pdf', '1.7m'),
            ('3120', 'VendorInvoiceB.pdf', 'application/pdf', '1.8m'),
        ],
        '490': [],
        '505': [],
        '510': [
            ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
            ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
            ('5105', 'payment.png', 'image/png', '51.5k'),
            ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
        ],
        '515': [],
        '520': [],
    }


@pytest.fixture
def accrual_postings():
    return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))

def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
    return wrap_type(
        post for post in postings
        if post.meta.get(key) == value
        and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
    )

def find_row_by_text(row_source, want_text):
    for row in row_source:
        try:
            found_row = row.childNodes[0].text == want_text
        except IndexError:
            found_row = False
        if found_row:
            return row
    pytest.fail(f"did not find row with text {want_text!r}")

def check_age_sum(aging_rows, row, date):
    text = row.firstChild.text
    ages = [int(match.group(1) or 0) * 365 + int(match.group(2) or 0)
            for match in AGE_SUM_RE.finditer(text)]
    if len(ages) == 1:
        # datetime only supports a 10K year range so this should cover all of it
        if text.startswith('Total Aged Over '):
            age_range = range(ages[0], 3650000)
        else:
            age_range = range(-3650000, ages[0])
    elif len(ages) == 2:
        age_range = range(*ages)
    else:
        pytest.fail(f"row has incorrect age matches: {ages!r}")
    assert row.lastChild.value == sum(
        row.at_cost.number
        for row in aging_rows
        if row.at_cost.number > 0
        and (date - row.date).days in age_range
    )
    return row.lastChild.value

def check_aging_sheet(sheet, aging_rows, date):
    if not aging_rows:
        return
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
    expect_rows = iter(aging_rows)
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
    next(expect_rows).check_row_match(row0)
    for actual, expected in zip(rows, expect_rows):
        expected.check_row_match(actual)
    row0 = find_row_by_text(rows, "Total Aged Over 1 Year: ")
    aging_sum = check_age_sum(aging_rows, row0, date)
    sums = 0
    for row in rows:
        if not row.firstChild:
            pass
        elif row.firstChild.text.startswith("Total Unpaid"):
            assert row.lastChild.value == aging_sum
            sums += 1
        else:
            aging_sum += check_age_sum(aging_rows, row, date)
    assert sums > 1

def check_aging_ods(ods_file, date, recv_rows=AGING_AR, pay_rows=AGING_AP):
    ods_file.seek(0)
    ods = odf.opendocument.load(ods_file)
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
    assert len(sheets) >= 2
    check_aging_sheet(sheets[0], recv_rows, date)
    check_aging_sheet(sheets[1], pay_rows, date)

@pytest.mark.parametrize('search_terms,expect_count,check_func', [
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
        'Assets:Receivable:', 'Liabilities:Payable:',
    )),
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
    ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
    ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
    ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
     lambda post: post.meta['invoice'].startswith('rt:510/')),
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
])
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
    search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
    actual = list(accrual.filter_search(accrual_postings, search_terms))
    if expect_count < ACCRUALS_COUNT:
        assert ACCRUALS_COUNT > len(actual) >= expect_count
    else:
        assert len(actual) == ACCRUALS_COUNT
    for post in actual:
        assert check_func(post)

@pytest.mark.parametrize('arg,expected', [
    ('aging', accrual.AgingReport),
    ('balance', accrual.BalanceReport),
    ('outgoing', accrual.OutgoingReport),
    ('age', accrual.AgingReport),
    ('bal', accrual.BalanceReport),
    ('out', accrual.OutgoingReport),
    ('outgoings', accrual.OutgoingReport),
])
def test_report_type_by_name(arg, expected):
    assert accrual.ReportType.by_name(arg.lower()).value is expected
    assert accrual.ReportType.by_name(arg.title()).value is expected
    assert accrual.ReportType.by_name(arg.upper()).value is expected

@pytest.mark.parametrize('arg', [
    'unknown',
    'blance',
    'outgong',
])
def test_report_type_by_unknown_name(arg):
    # Raising ValueError helps argparse generate good messages.
    with pytest.raises(ValueError):
        accrual.ReportType.by_name(arg)

@pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values(
    INVOICE_ACCOUNTS,
    ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES],
    itertools.count(1),
))
def test_make_consistent_bad_invoice(acct_name, invoice, day):
    if acct_name.startswith('Assets:'):
        mult = 10
    else:
        mult = -10
    txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[
        (acct_name, index * mult, {'invoice': invoice, 'entity': f'BadInvoice{day}'})
        for index in range(1, 4)
    ])
    postings = data.Posting.from_txn(txn)
    consistent = dict(accrual.AccrualPostings.make_consistent(iter(postings)))
    assert len(consistent) == 1
    _, actual = consistent.popitem()
    assert len(actual) == 3
    for act_post, exp_post in zip(actual, postings):
        assert act_post == exp_post

def test_make_consistent_across_accounts():
    invoice = 'Invoices/CrossAccount.pdf'
    txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[
        (acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'})
        for acct_name in ACCOUNTS
    ])
    consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn)))
    assert len(consistent) == len(ACCOUNTS)
    for key, posts in consistent.items():
        assert len(posts) == 1

def test_make_consistent_both_invoice_and_account():
    txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[
        (acct_name, 150) for acct_name in ACCOUNTS
    ])
    consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn)))
    assert len(consistent) == len(ACCOUNTS)
    for key, posts in consistent.items():
        assert len(posts) == 1

@pytest.mark.parametrize('acct_name', ACCOUNTS)
def test_make_consistent_across_entity(acct_name):
    amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg
    txn = testutil.Transaction(postings=[
        (acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'})
        for n in range(1, 4)
    ])
    consistent = dict(accrual.AccrualPostings.make_consistent(data.Posting.from_txn(txn)))
    assert len(consistent) == 3
    for key, posts in consistent.items():
        assert len(posts) == 1

@pytest.mark.parametrize('acct_name', INVOICE_ACCOUNTS)
def test_make_consistent_entity_differs_accrual_payment(acct_name):
    invoice = 'Invoices/DifferPay.pdf'
    txn = testutil.Transaction(postings=[
        # Depending on the account, the order of the accrual and payment might
        # be swapped here, but that shouldn't matter.
        (acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}),
        (acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}),
    ])
    related = list(data.Posting.from_txn(txn))
    consistent = accrual.AccrualPostings.make_consistent(related)
    _, actual = next(consistent)
    assert len(actual) == len(related)
    assert all(post in actual for post in related)
    assert next(consistent, None) is None

def test_make_consistent_by_date_accruals_differ():
    meta = {'rt-id': '1', 'invoice': 'rt:1/2', 'entity': 'MultiDate'}
    entries = [testutil.Transaction(date=date, postings=[
        ('Assets:Receivable:Accounts', date.day * 100, meta),
    ]) for date in itertools.islice(testutil.date_seq(), 3)]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 3
    assert {post.units.number for group in actual for post in group} == {100, 200, 300}

def test_make_consistent_by_date_with_exact_payment():
    meta = {'rt-id': '1', 'invoice': 'rt:1/3', 'entity': 'OnePayment'}
    entries = [testutil.Transaction(date=date, postings=[(
        'Assets:Receivable:Accounts',
        35 * (1 if date.day % 2 else -1),
        meta,
    )]) for date in itertools.islice(testutil.date_seq(), 3)]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 2
    assert sum(post.units.number for post in actual[0]) == 0
    assert len(actual[1]) == 1
    assert actual[1][0].meta.date.day == 3

def test_make_consistent_by_date_with_underpayment():
    meta = {'rt-id': '1', 'invoice': 'rt:1/4', 'entity': 'UnderPayment'}
    entries = [testutil.Transaction(date=date, postings=[(
        'Assets:Receivable:Accounts',
        40 * (1 if date.day % 2 else -.5),
        meta,
    )]) for date in itertools.islice(testutil.date_seq(), 3)]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 2
    assert len(actual[0]) == 2
    assert actual[0][0].units.number == 40
    assert actual[0][1].units.number == -20
    assert len(actual[1]) == 1
    assert actual[1][0].meta.date.day == 3

def test_make_consistent_by_date_with_overpayment():
    meta = {'rt-id': '1', 'invoice': 'rt:1/5', 'entity': 'OverPayment'}
    entries = [testutil.Transaction(date=date, postings=[(
        'Assets:Receivable:Accounts',
        50 * (1 if date.day % 2 else -1.5),
        meta,
    )]) for date in itertools.islice(testutil.date_seq(), 3)]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 2
    assert sum(post.units.number for post in actual[0]) == 0
    assert len(actual[1]) == 2
    assert actual[1][0].meta.date.day == 2
    assert actual[1][0].units.number == -25
    assert actual[1][1].meta.date.day == 3

def test_make_consistent_by_date_with_late_payment():
    meta = {'rt-id': '1', 'invoice': 'rt:1/6', 'entity': 'LatePayment'}
    entries = [testutil.Transaction(date=date, postings=[(
        'Assets:Receivable:Accounts',
        60 * (-1 if date.day > 2 else 1),
        meta,
    )]) for date in itertools.islice(testutil.date_seq(), 3)]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 2
    assert len(actual[0]) == 2
    assert actual[0][0].meta.date.day == 1
    assert actual[0][1].meta.date.day == 3
    assert len(actual[1]) == 1
    assert actual[1][0].meta.date.day == 2

def test_make_consistent_by_date_with_split_payments():
    meta = {'rt-id': '1', 'invoice': 'rt:1/7', 'entity': 'SplitPayments'}
    entries = [testutil.Transaction(date=date, postings=[(
        'Assets:Receivable:Accounts', amount, meta,
    )]) for date, amount in zip(testutil.date_seq(), [70, 80, -50, -100])]
    actual = [group for _, group in
              accrual.AccrualPostings.make_consistent(data.Posting.from_entries(entries))]
    assert len(actual) == 2
    assert [post.units.number for post in actual[0]] == [70, -50, -20]
    assert [post.units.number for post in actual[1]] == [80, -80]

@pytest.mark.parametrize('account,day', itertools.product(
    ACCOUNTS,
    [1, 10, 20, 30],
))
def test_make_consistent_with_three_one_split(account, day):
    meta = {'rt-id': '1', 'invoice': 'rt:1/8', 'entity': '3Split'}
    entries = [testutil.Transaction(date=datetime.date(2019, 5, dd), postings=[(
        account, dd, meta,
    )]) for dd in [5, 15, 25]]
    entries.insert(day // 10, testutil.Transaction(
        date=datetime.date(2019, 5, day),
        postings=[(account, -45, meta)],
    ))
    postings = data.Posting.from_entries(entries)
    actual = dict(accrual.AccrualPostings.make_consistent(iter(postings)))
    if account.startswith('Assets:'):
        group_count = 3
        post_count = 2
    else:
        group_count = 1
        post_count = 4
    assert len(actual) == group_count
    for related in actual.values():
        assert len(related) == post_count
        assert sum(post.units.number for post in related) == 0

@pytest.mark.parametrize('account', ACCOUNTS)
def test_make_consistent_with_three_two_split(account):
    meta = {'rt-id': '1', 'invoice': 'rt:1/9', 'entity': '5Split'}
    entries = [testutil.Transaction(date=datetime.date(2019, 5, day), postings=[(
        account, day * (1 if day % 10 else -1.5), meta,
    )]) for day in range(5, 30, 5)]
    postings = data.Posting.from_entries(entries)
    actual = dict(accrual.AccrualPostings.make_consistent(iter(postings)))
    if account.startswith('Assets:'):
        group_count = 3
    else:
        group_count = 2
    assert len(actual) == group_count
    for related in actual.values():
        assert len(related) >= 2
        assert sum(post.units.number for post in related) == 0

def check_output(output, expect_patterns):
    output.seek(0)
    testutil.check_lines_match(iter(output), expect_patterns)

def run_outgoing(rt_id, postings, rt_client=None):
    if rt_client is None:
        rt_client = RTClient()
    rt_wrapper = rtutil.RT(rt_client)
    if not isinstance(postings, accrual.AccrualPostings):
        postings = accruals_by_meta(postings, rt_id, 'rt-id', wrap_type=accrual.AccrualPostings)
    output = io.StringIO()
    report = accrual.OutgoingReport(rt_wrapper, output)
    report.run({rt_id: postings})
    return output

@pytest.mark.parametrize('invoice,expected', [
    ('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
    ('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
    ('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
    ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
])
def test_balance_report(accrual_postings, invoice, expected, caplog):
    related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
    output = io.StringIO()
    report = accrual.BalanceReport(output)
    report.run({invoice: related})
    assert not caplog.records
    check_output(output, [invoice, expected])

def test_outgoing_report(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:510', accrual_postings, rt_client)
    rt_url = RTClient.DEFAULT_URL[:-9]
    rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
    contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
        r'^PAYMENT TO: Hon\. Mx\. 510$',
        r'^TOTAL TO PAY: \$280\.00$',
        fr'^AGREEMENT: {contract_url}',
        r'^BEANCOUNT ENTRIES:$',
    ])
    # Find the date line of the first transaction.
    # For each transaction, check for the date line, a metadata, and the
    # Expenses posting.
    for line in output:
        if not line.isspace():
            break
    assert re.match(r'\s*2010-06-10\s', line), \
        "first entry line did not have expected date"
    check_output(output, [
        fr'^\s+rt-id: "{rt_id_url}"$',
        r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
        r'^\s*2010-06-10\s',
        fr'^\s+contract: "{contract_url}"$',
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
    ])
    assert rt_client.edits == {'510': {
        'CF_payment-amount': 'USD 280.00',
        'CF_payment-method': 'USD ACH',
    }}
    assert 'payment-method:' not in output.getvalue()

def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
    rt_client = RTClient(want_cfs=False)
    output = run_outgoing('rt:510', accrual_postings, rt_client)
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: <mx510@example\.org>$',
        r'^PAYMENT TO:\s*$',
    ])

def test_outgoing_report_fx_amounts(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:520 rt:525', accrual_postings, rt_client)
    assert not caplog.records
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
        r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
    ])
    assert rt_client.edits == {'520': {
        'CF_payment-amount': 'EUR 1,000.00 ($1,100.00)',
        'CF_payment-method': 'EUR Wire',
    }}
    assert 'payment-method:' not in output.getvalue()

def test_outgoing_report_multi_invoice(accrual_postings, caplog):
    rt_client = RTClient()
    output = run_outgoing('rt:310', accrual_postings, rt_client)
    log, = caplog.records
    assert log.levelname == 'WARNING'
    assert log.message.startswith('cannot set payment-method for rt:310: ')
    check_output(output, [
        r'^PAYMENT FOR APPROVAL:$',
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
        r'^TOTAL TO PAY: \$420.00$',
    ])
    assert rt_client.edits == {'310': {
        'CF_payment-amount': 'USD 420.00',
    }}
    assert 'payment-method:' not in output.getvalue()

@pytest.mark.parametrize('arg', [
    'usd ach',
    '  eur  wire',
    'cad   vendorportal  ',
    ' gbp check ',
])
def test_outgoing_report_good_payment_method(caplog, accrual_postings, arg):
    rt_id = 'rt:40'
    meta = {'rt-id': rt_id, 'invoice': 'rt:40/100', 'payment-method': arg}
    txn = testutil.Transaction(postings=[
        ('Liabilities:Payable:Accounts', -100, meta),
    ])
    rt_client = RTClient()
    run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client)
    assert not caplog.records
    cf_values = rt_client.edits[rt_id[3:]]['CF_payment-method'].split()
    assert cf_values[0] == arg.split()[0].upper()
    assert len(cf_values) > 1

@pytest.mark.parametrize('arg', [
    '',
    'usd',
    'usd nonexistent',
    'check',
    'us check',
    *testutil.NON_STRING_METADATA_VALUES,
])
def test_outgoing_report_bad_payment_method(caplog, accrual_postings, arg):
    rt_id = 'rt:40'
    meta = {'rt-id': rt_id, 'invoice': 'rt:40/100', 'payment-method': arg}
    txn = testutil.Transaction(postings=[
        ('Liabilities:Payable:Accounts', -100, meta),
    ])
    rt_client = RTClient()
    run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client)
    assert caplog.records
    for log in caplog.records:
        assert log.levelname == 'WARNING'
        assert log.message.startswith(f'cannot set payment-method for {rt_id}: ')
    assert 'CF_payment-method' not in rt_client.edits[rt_id[3:]]

def test_outgoing_report_without_rt_id(accrual_postings, caplog):
    invoice = 'rt://ticket/515/attachments/5150'
    related = accruals_by_meta(
        accrual_postings, invoice, wrap_type=accrual.AccrualPostings,
    )
    output = run_outgoing(None, related)
    assert caplog.records
    log = caplog.records[0]
    assert log.message.startswith(
        f"can't generate outgoings report for 2010-05-15 MatchingProgram {invoice}"
        " because no RT ticket available:",
    )
    assert not output.getvalue()

def run_aging_report(postings, today):
    postings = (
        post for post in postings
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
    )
    groups = dict(accrual.AccrualPostings.make_consistent(postings))
    output = io.BytesIO()
    rt_wrapper = rtutil.RT(RTClient())
    report = accrual.AgingReport(rt_wrapper, output, today)
    report.run(groups)
    return output

@pytest.mark.parametrize('date', [
    datetime.date(2010, 3, 1),
    # Both these dates are chosen for their off-by-one potential:
    # the first is exactly 30 days after the 2010-06-10 payable;
    # the second is exactly 60 days after the 2010-05-15 receivable.
    datetime.date(2010, 7, 10),
    datetime.date(2010, 7, 14),
    # The remainder just shuffle the age buckets some.
    datetime.date(2010, 12, 1),
    datetime.date(2011, 6, 1),
    datetime.date(2011, 12, 1),
    datetime.date(2012, 3, 1),
])
def test_aging_report_date_cutoffs(accrual_postings, date):
    output = run_aging_report(accrual_postings, date)
    check_aging_ods(output, date)

def test_aging_report_entity_consistency(accrual_postings):
    date = datetime.date.today()
    output = run_aging_report((
        post for post in accrual_postings
        if post.meta.get('rt-id') == 'rt:480'
        and post.units.number < 0
    ), date)
    check_aging_ods(output, date, [], [
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
    ])

def run_main(arglist, config=None, out_type=io.StringIO):
    if config is None:
        config = testutil.TestConfig(
            books_path=testutil.test_path('books/accruals.beancount'),
            rt_client=RTClient(),
        )
    if out_type is io.BytesIO:
        arglist.insert(0, '--output-file=-')
    output = out_type()
    errors = io.StringIO()
    retcode = accrual.main(arglist, output, errors, config)
    output.seek(0)
    errors.seek(0)
    return retcode, output, errors

def check_main_fails(arglist, config, expect_retcode):
    if not isinstance(expect_retcode, int):
        expect_retcode = cliutil.ExitCode[expect_retcode]
    retcode, output, errors = run_main(arglist, config)
    assert retcode == expect_retcode
    assert not output.getvalue()
    return errors

@pytest.mark.parametrize('arglist', [
    ['--report-type=balance', 'entity=EarlyBird'],
    ['--report-type=outgoing', 'entity=EarlyBird'],
])
def test_output_excludes_payments(arglist):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    output.seek(0)
    for line in output:
        assert not re.match(r'\brt:4\d\b', line)

@pytest.mark.parametrize('arglist,expect_invoice', [
    (['40'], 'rt:40/400'),
    (['44/440'], 'rt:44/440'),
])
def test_output_payments_when_only_match(arglist, expect_invoice):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        rf'^EarlyBird {re.escape(expect_invoice)}:$',
        r' outstanding since ',
    ])

@pytest.mark.parametrize('arglist,expect_amount', [
    (['310'], 420),
    (['310/3120'], 220),
    (['-t', 'out', 'entity=Vendor'], 420),
])
def test_main_outgoing_report(arglist, expect_amount):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    rt_url = RTClient.DEFAULT_URL[:-9]
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>')
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>')
    check_output(output, [
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
        rf'^TOTAL TO PAY: \${expect_amount}\.00$',
        r'^\s*2010-04-30\s',
        r'^\s+Expenses:Travel\s+220 USD$',
    ])

@pytest.mark.parametrize('arglist', [
    ['-t', 'balance'],
    ['515/5150'],
])
def test_main_balance_report(arglist):
    retcode, output, errors = run_main(arglist)
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        r'\brt://ticket/515/attachments/5150:$',
        r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
    ])

def test_main_balance_report_because_no_rt_id():
    invoice = 'Invoices/2010StateRegistration.pdf'
    retcode, output, errors = run_main([invoice])
    assert not errors.getvalue()
    assert retcode == 0
    check_output(output, [
        rf'\b{re.escape(invoice)}:$',
        r'^\s+-50\.00 USD outstanding since 2010-06-20$',
    ])

@pytest.mark.parametrize('arglist', [
    [],
    ['entity=Lawyer'],
])
def test_main_aging_report(arglist):
    if arglist:
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
    else:
        recv_rows = AGING_AR
        pay_rows = AGING_AP
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
    assert not errors.getvalue()
    assert retcode == 0
    check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows)

def test_main_no_books():
    errors = check_main_fails([], testutil.TestConfig(), 'NoConfiguration')
    testutil.check_lines_match(iter(errors), [
        r':[01]: +no books to load in configuration\b',
    ])

@pytest.mark.parametrize('arglist', [
    ['499'],
    ['505/99999'],
    ['-t', 'balance', 'entity=NonExistent'],
])
def test_main_no_matches(arglist, caplog):
    check_main_fails(arglist, None, 'NoDataFiltered')
    testutil.check_logs_match(caplog, [
        ('WARNING', 'no matching entries found to report'),
    ])

def test_main_no_rt(caplog):
    config = testutil.TestConfig(
        books_path=testutil.test_path('books/accruals.beancount'),
    )
    check_main_fails(['-t', 'out'], config, 'NoConfiguration')
    testutil.check_logs_match(caplog, [
        ('ERROR', 'unable to generate outgoing report: RT client is required'),
    ])