fdd9f2847b
We've long supported skipping documentation checks by flagging the transaction. We haven't done the same for enumerated metadata because we need it less often, and bad values tend to do more damage to reports. However, occasionally when something very off-process happens, we do need it as a matter of expediency. So support it. In order to skip validation of these fields, the plugin requires that the value start with the string "FIXME". This helps ensure that reports have a consistent way to detect and warn about unfilled values in flagged transactions.
454 lines
13 KiB
Python
454 lines
13 KiB
Python
"""Mock Beancount objects for testing"""
|
|
# Copyright © 2020 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import datetime
|
|
import itertools
|
|
import re
|
|
import unittest.mock
|
|
|
|
import beancount.core.amount as bc_amount
|
|
import beancount.core.data as bc_data
|
|
import beancount.loader as bc_loader
|
|
import beancount.parser.options as bc_options
|
|
|
|
import git
|
|
import odf.element
|
|
import odf.opendocument
|
|
import odf.table
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, Optional, NamedTuple
|
|
|
|
from conservancy_beancount import books, data, rtutil
|
|
|
|
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
|
|
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
|
|
FY_START_DATE = datetime.date(2020, 3, 1)
|
|
FY_MID_DATE = datetime.date(2020, 9, 1)
|
|
PAST_DATE = datetime.date(2000, 1, 1)
|
|
TESTS_DIR = Path(__file__).parent
|
|
|
|
# This function is a teardown fixture, but different test files use
|
|
# it with different scopes. Typical usage looks like:
|
|
# clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta)
|
|
def clean_account_meta():
|
|
try:
|
|
yield
|
|
finally:
|
|
data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS)
|
|
data.Account._meta_map.clear()
|
|
|
|
def _ods_cell_value_type(cell):
|
|
assert cell.tagName == 'table:table-cell'
|
|
return cell.getAttribute('valuetype')
|
|
|
|
def _ods_cell_value(cell):
|
|
value_type = cell.getAttribute('valuetype')
|
|
if value_type == 'currency' or value_type == 'float':
|
|
return Decimal(cell.getAttribute('value'))
|
|
elif value_type == 'date':
|
|
return datetime.datetime.strptime(
|
|
cell.getAttribute('datevalue'), '%Y-%m-%d',
|
|
).date()
|
|
else:
|
|
return cell.getAttribute('value')
|
|
|
|
def _ods_elem_text(elem):
|
|
if isinstance(elem, odf.element.Text):
|
|
return elem.data
|
|
else:
|
|
return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
|
|
|
|
odf.element.Element.value_type = property(_ods_cell_value_type)
|
|
odf.element.Element.value = property(_ods_cell_value)
|
|
odf.element.Element.text = property(_ods_elem_text)
|
|
|
|
def check_lines_match(lines, expect_patterns, source='output'):
|
|
for pattern in expect_patterns:
|
|
assert any(re.search(pattern, line) for line in lines), \
|
|
f"{pattern!r} not found in {source}"
|
|
|
|
def check_logs_match(caplog, expected):
|
|
records = iter(caplog.records)
|
|
for exp_level, exp_msg in expected:
|
|
exp_level = exp_level.upper()
|
|
assert any(
|
|
log.levelname == exp_level and log.message == exp_msg for log in records
|
|
), f"{exp_level} log {exp_msg!r} not found"
|
|
|
|
def check_post_meta(txn, *expected_meta, default=None):
|
|
assert len(txn.postings) == len(expected_meta)
|
|
for post, expected in zip(txn.postings, expected_meta):
|
|
if not expected:
|
|
assert not post.meta
|
|
else:
|
|
actual = None if post.meta is None else {
|
|
key: post.meta.get(key, default) for key in expected
|
|
}
|
|
assert actual == expected
|
|
|
|
def combine_values(*value_seqs):
|
|
stop = 0
|
|
for seq in value_seqs:
|
|
try:
|
|
stop = max(stop, len(seq))
|
|
except TypeError:
|
|
pass
|
|
return itertools.islice(
|
|
zip(*(itertools.cycle(seq) for seq in value_seqs)),
|
|
stop,
|
|
)
|
|
|
|
def date_seq(date=FY_MID_DATE, step=1):
|
|
while True:
|
|
yield date
|
|
date += datetime.timedelta(days=step)
|
|
|
|
def parse_date(s, fmt='%Y-%m-%d'):
|
|
return datetime.datetime.strptime(s, fmt).date()
|
|
|
|
def test_path(s):
|
|
if s is None:
|
|
return s
|
|
s = Path(s)
|
|
if not s.is_absolute():
|
|
s = TESTS_DIR / s
|
|
return s
|
|
|
|
def Amount(number, currency='USD'):
|
|
return bc_amount.Amount(Decimal(number), currency)
|
|
|
|
def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
|
|
return bc_data.Cost(Decimal(number), currency, date, label)
|
|
|
|
def Posting(account, number,
|
|
currency='USD', cost=None, price=None, flag=None,
|
|
_post_type=bc_data.Posting, _meta_type=None, **meta):
|
|
if cost is not None:
|
|
cost = Cost(*cost)
|
|
if not meta:
|
|
meta = None
|
|
elif _meta_type:
|
|
meta = _meta_type(meta)
|
|
return _post_type(
|
|
account,
|
|
Amount(number, currency),
|
|
cost,
|
|
price,
|
|
flag,
|
|
meta,
|
|
)
|
|
|
|
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
|
|
narration='', tags=None, links=None, postings=(),
|
|
**meta):
|
|
if isinstance(date, str):
|
|
date = parse_date(date)
|
|
meta.setdefault('filename', '<test>')
|
|
meta.setdefault('lineno', 0)
|
|
real_postings = []
|
|
for post in postings:
|
|
try:
|
|
post.account
|
|
except AttributeError:
|
|
if isinstance(post[-1], dict):
|
|
args = post[:-1]
|
|
kwargs = post[-1]
|
|
else:
|
|
args = post
|
|
kwargs = {}
|
|
post = Posting(*args, **kwargs)
|
|
real_postings.append(post)
|
|
return bc_data.Transaction(
|
|
meta,
|
|
date,
|
|
flag,
|
|
payee,
|
|
narration,
|
|
set(tags or ''),
|
|
set(links or ''),
|
|
real_postings,
|
|
)
|
|
|
|
LINK_METADATA_STRINGS = {
|
|
'Invoices/304321.pdf',
|
|
'rt:123/456',
|
|
'rt://ticket/234',
|
|
}
|
|
|
|
NON_LINK_METADATA_STRINGS = {
|
|
'',
|
|
' ',
|
|
' ',
|
|
}
|
|
|
|
NON_STRING_METADATA_VALUES = [
|
|
Decimal(5),
|
|
FY_MID_DATE,
|
|
Amount(50),
|
|
Amount(500, None),
|
|
]
|
|
|
|
FIXME_VALUES = [
|
|
'FIXME',
|
|
'FIXME loose comment',
|
|
'FIXME: comment with punctuation',
|
|
]
|
|
|
|
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
|
|
'Equity:Funds:Unrestricted',
|
|
'Equity:Funds:Restricted',
|
|
'Equity:OpeningBalance',
|
|
])
|
|
|
|
class ODSCell:
|
|
@classmethod
|
|
def from_row(cls, row):
|
|
return row.getElementsByType(odf.table.TableCell)
|
|
|
|
@classmethod
|
|
def from_sheet(cls, spreadsheet):
|
|
for row in spreadsheet.getElementsByType(odf.table.TableRow):
|
|
yield list(cls.from_row(row))
|
|
|
|
@classmethod
|
|
def from_ods_file(cls, path):
|
|
ods = odf.opendocument.load(path)
|
|
return cls.from_sheet(ods.spreadsheet)
|
|
|
|
|
|
def OpeningBalance(acct=None, **txn_meta):
|
|
if acct is None:
|
|
acct = next(OPENING_EQUITY_ACCOUNTS)
|
|
return Transaction(**txn_meta, postings=[
|
|
('Assets:Receivable:Accounts', 100),
|
|
('Assets:Receivable:Loans', 200),
|
|
('Liabilities:Payable:Accounts', -15),
|
|
('Liabilities:Payable:Vacation', -25),
|
|
(acct, -260),
|
|
])
|
|
|
|
class TestBooksLoader(books.Loader):
|
|
def __init__(self, source):
|
|
self.source = source
|
|
|
|
def load_all(self, from_year=None):
|
|
return bc_loader.load_file(self.source)
|
|
|
|
def load_fy_range(self, from_fy, to_fy=None):
|
|
return self.load_all()
|
|
|
|
|
|
class TestConfig:
|
|
def __init__(self, *,
|
|
books_path=None,
|
|
fiscal_year=(3, 1),
|
|
payment_threshold=0,
|
|
repo_path=None,
|
|
rt_client=None,
|
|
):
|
|
self._books_path = books_path
|
|
self.fiscal_year = fiscal_year
|
|
self._payment_threshold = Decimal(payment_threshold)
|
|
self.repo_path = test_path(repo_path)
|
|
self._rt_client = rt_client
|
|
if rt_client is None:
|
|
self._rt_wrapper = None
|
|
else:
|
|
self._rt_wrapper = rtutil.RT(rt_client)
|
|
|
|
def books_loader(self):
|
|
if self._books_path is None:
|
|
return None
|
|
else:
|
|
return TestBooksLoader(self._books_path)
|
|
|
|
def books_path(self):
|
|
return self._books_path
|
|
|
|
def books_repo(self):
|
|
return None
|
|
|
|
def config_file_path(self):
|
|
return test_path('userconfig/conservancy_beancount/config.ini')
|
|
|
|
def fiscal_year_begin(self):
|
|
return books.FiscalYear(*self.fiscal_year)
|
|
|
|
def payment_threshold(self):
|
|
return self._payment_threshold
|
|
|
|
def repository_path(self):
|
|
return self.repo_path
|
|
|
|
def rt_client(self):
|
|
return self._rt_client
|
|
|
|
def rt_wrapper(self):
|
|
return self._rt_wrapper
|
|
|
|
|
|
def TestRepo(head_hexsha='abcd1234', dirty=False):
|
|
retval = unittest.mock.Mock(spec=git.Repo)
|
|
retval.is_dirty.return_value = dirty
|
|
retval.head.commit.hexsha = head_hexsha
|
|
return retval
|
|
|
|
|
|
class _TicketBuilder:
|
|
MESSAGE_ATTACHMENTS = [
|
|
('(Unnamed)', 'multipart/alternative', '0b'),
|
|
('(Unnamed)', 'text/plain', '1.2k'),
|
|
('(Unnamed)', 'text/html', '1.4k'),
|
|
]
|
|
MISC_ATTACHMENTS = [
|
|
('Forwarded Message.eml', 'message/rfc822', '3.1k'),
|
|
('photo.jpg', 'image/jpeg', '65.2k'),
|
|
('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
|
|
('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
|
|
('statement.txt', 'text/plain', '652b'),
|
|
('screenshot.png', 'image/png', '1.9m'),
|
|
]
|
|
|
|
def __init__(self):
|
|
self.id_seq = itertools.count(1)
|
|
self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
|
|
|
|
def new_attch(self, attch):
|
|
return (str(next(self.id_seq)), *attch)
|
|
|
|
def new_msg_with_attachments(self, attachments_count=1):
|
|
for attch in self.MESSAGE_ATTACHMENTS:
|
|
yield self.new_attch(attch)
|
|
for _ in range(attachments_count):
|
|
yield self.new_attch(next(self.misc_attchs))
|
|
|
|
def new_messages(self, messages_count, attachments_count=None):
|
|
for n in range(messages_count):
|
|
if attachments_count is None:
|
|
att_count = messages_count - n
|
|
else:
|
|
att_count = attachments_count
|
|
yield from self.new_msg_with_attachments(att_count)
|
|
|
|
|
|
class RTClient:
|
|
_builder = _TicketBuilder()
|
|
DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
|
|
TICKET_DATA = {
|
|
'1': list(_builder.new_messages(1, 3)),
|
|
'2': list(_builder.new_messages(2, 1)),
|
|
'3': list(_builder.new_messages(3, 0)),
|
|
}
|
|
del _builder
|
|
|
|
def __init__(self,
|
|
url=DEFAULT_URL,
|
|
default_login=None,
|
|
default_password=None,
|
|
proxy=None,
|
|
default_queue='General',
|
|
skip_login=False,
|
|
verify_cert=True,
|
|
http_auth=None,
|
|
want_cfs=True,
|
|
):
|
|
self.url = url
|
|
if http_auth is None:
|
|
self.user = default_login
|
|
self.password = default_password
|
|
self.auth_method = 'login'
|
|
self.login_result = skip_login or None
|
|
else:
|
|
self.user = http_auth.username
|
|
self.password = http_auth.password
|
|
self.auth_method = type(http_auth).__name__
|
|
self.login_result = True
|
|
self.last_login = None
|
|
self.want_cfs = want_cfs
|
|
self.edits = {}
|
|
|
|
def login(self, login=None, password=None):
|
|
if login is None and password is None:
|
|
login = self.user
|
|
password = self.password
|
|
self.login_result = bool(login and password and not password.startswith('bad'))
|
|
self.last_login = (login, password, self.login_result)
|
|
return self.login_result
|
|
|
|
def get_attachments(self, ticket_id):
|
|
try:
|
|
return list(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_attachment(self, ticket_id, attachment_id):
|
|
try:
|
|
att_seq = iter(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
att_id = str(attachment_id)
|
|
multipart_id = None
|
|
for attch in att_seq:
|
|
if attch[0] == att_id:
|
|
break
|
|
elif attch[2].startswith('multipart/'):
|
|
multipart_id = attch[0]
|
|
else:
|
|
return None
|
|
tx_id = multipart_id or att_id
|
|
if attch[1] == '(Unnamed)':
|
|
filename = ''
|
|
else:
|
|
filename = attch[1]
|
|
return {
|
|
'id': att_id,
|
|
'ContentType': attch[2],
|
|
'Filename': filename,
|
|
'Transaction': tx_id,
|
|
}
|
|
|
|
def get_ticket(self, ticket_id):
|
|
ticket_id_s = str(ticket_id)
|
|
if ticket_id_s not in self.TICKET_DATA:
|
|
return None
|
|
retval = {
|
|
'id': 'ticket/{}'.format(ticket_id_s),
|
|
'numerical_id': ticket_id_s,
|
|
'Requestors': [
|
|
f'mx{ticket_id_s}@example.org',
|
|
'requestor2@example.org',
|
|
],
|
|
}
|
|
if self.want_cfs:
|
|
retval['CF.{payment-amount}'] = ''
|
|
retval['CF.{payment-method}'] = ''
|
|
retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}'
|
|
return retval
|
|
|
|
def edit_ticket(self, ticket_id, **kwargs):
|
|
self.edits.setdefault(str(ticket_id), {}).update(kwargs)
|
|
return True
|
|
|
|
def get_user(self, user_id):
|
|
user_id_s = str(user_id)
|
|
match = re.search(r'(\d+)@', user_id_s)
|
|
if match is None:
|
|
email = f'mx{user_id_s}@example.org'
|
|
user_id_num = int(user_id_s)
|
|
else:
|
|
email = user_id_s
|
|
user_id_num = int(match.group(1))
|
|
retval = {
|
|
'id': f'user/{user_id_num}',
|
|
'EmailAddress': email,
|
|
'Name': email,
|
|
}
|
|
if self.want_cfs:
|
|
retval['RealName'] = f'Mx. {user_id_num}'
|
|
return retval
|