f55fccd48d
This prevents mistakes where a transaction is entered in the wrong file for its date (which in turns causes errors in reports).
457 lines
14 KiB
Python
457 lines
14 KiB
Python
"""Mock Beancount objects for testing"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import itertools
|
|
import re
|
|
import unittest.mock
|
|
|
|
import beancount.core.amount as bc_amount
|
|
import beancount.core.data as bc_data
|
|
import beancount.loader as bc_loader
|
|
import beancount.parser.options as bc_options
|
|
|
|
import git
|
|
import odf.element
|
|
import odf.opendocument
|
|
import odf.table
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, Optional, NamedTuple
|
|
|
|
from conservancy_beancount import books, data, rtutil
|
|
|
|
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
|
|
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
|
|
FY_START_DATE = datetime.date(2020, 3, 1)
|
|
FY_MID_DATE = datetime.date(2020, 9, 1)
|
|
PAST_DATE = datetime.date(2000, 1, 1)
|
|
TESTS_DIR = Path(__file__).parent
|
|
|
|
# This function is a teardown fixture, but different test files use
|
|
# it with different scopes. Typical usage looks like:
|
|
# clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta)
|
|
def clean_account_meta():
|
|
try:
|
|
yield
|
|
finally:
|
|
data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS)
|
|
data.Account._meta_map.clear()
|
|
|
|
def _ods_cell_value_type(cell):
|
|
assert cell.tagName == 'table:table-cell'
|
|
return cell.getAttribute('valuetype')
|
|
|
|
def _ods_cell_value(cell):
|
|
value_type = cell.getAttribute('valuetype')
|
|
if value_type == 'currency' or value_type == 'float':
|
|
return Decimal(cell.getAttribute('value'))
|
|
elif value_type == 'date':
|
|
return datetime.datetime.strptime(
|
|
cell.getAttribute('datevalue'), '%Y-%m-%d',
|
|
).date()
|
|
else:
|
|
return cell.getAttribute('value')
|
|
|
|
def _ods_elem_text(elem):
|
|
if isinstance(elem, odf.element.Text):
|
|
return elem.data
|
|
else:
|
|
return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
|
|
|
|
odf.element.Element.value_type = property(_ods_cell_value_type)
|
|
odf.element.Element.value = property(_ods_cell_value)
|
|
odf.element.Element.text = property(_ods_elem_text)
|
|
|
|
def check_lines_match(lines, expect_patterns, source='output'):
|
|
for pattern in expect_patterns:
|
|
assert any(re.search(pattern, line) for line in lines), \
|
|
f"{pattern!r} not found in {source}"
|
|
|
|
def check_logs_match(caplog, expected):
|
|
records = iter(caplog.records)
|
|
for exp_level, exp_msg in expected:
|
|
exp_level = exp_level.upper()
|
|
assert any(
|
|
log.levelname == exp_level and log.message == exp_msg for log in records
|
|
), f"{exp_level} log {exp_msg!r} not found"
|
|
|
|
def check_post_meta(txn, *expected_meta, default=None):
|
|
assert len(txn.postings) == len(expected_meta)
|
|
for post, expected in zip(txn.postings, expected_meta):
|
|
if not expected:
|
|
assert not post.meta
|
|
else:
|
|
actual = None if post.meta is None else {
|
|
key: post.meta.get(key, default) for key in expected
|
|
}
|
|
assert actual == expected
|
|
|
|
def combine_values(*value_seqs):
|
|
stop = 0
|
|
for seq in value_seqs:
|
|
try:
|
|
stop = max(stop, len(seq))
|
|
except TypeError:
|
|
pass
|
|
return itertools.islice(
|
|
zip(*(itertools.cycle(seq) for seq in value_seqs)),
|
|
stop,
|
|
)
|
|
|
|
def date_seq(date=FY_MID_DATE, step=1):
|
|
while True:
|
|
yield date
|
|
date += datetime.timedelta(days=step)
|
|
|
|
def parse_date(s, fmt='%Y-%m-%d'):
|
|
return datetime.datetime.strptime(s, fmt).date()
|
|
|
|
def test_path(s):
|
|
if s is None:
|
|
return s
|
|
s = Path(s)
|
|
if not s.is_absolute():
|
|
s = TESTS_DIR / s
|
|
return s
|
|
|
|
def Amount(number, currency='USD'):
|
|
return bc_amount.Amount(Decimal(number), currency)
|
|
|
|
def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
|
|
return bc_data.Cost(Decimal(number), currency, date, label)
|
|
|
|
def Posting(account, number,
|
|
currency='USD', cost=None, price=None, flag=None,
|
|
_post_type=bc_data.Posting, _meta_type=None, **meta):
|
|
if cost is not None:
|
|
cost = Cost(*cost)
|
|
if not meta:
|
|
meta = None
|
|
elif _meta_type:
|
|
meta = _meta_type(meta)
|
|
return _post_type(
|
|
account,
|
|
Amount(number, currency),
|
|
cost,
|
|
price,
|
|
flag,
|
|
meta,
|
|
)
|
|
|
|
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
|
|
narration='', tags=None, links=None, postings=(),
|
|
**meta):
|
|
if isinstance(date, str):
|
|
date = parse_date(date)
|
|
meta.setdefault('filename', '<test>')
|
|
meta.setdefault('lineno', 0)
|
|
real_postings = []
|
|
for post in postings:
|
|
try:
|
|
post.account
|
|
except AttributeError:
|
|
if isinstance(post[-1], dict):
|
|
args = post[:-1]
|
|
kwargs = post[-1]
|
|
else:
|
|
args = post
|
|
kwargs = {}
|
|
post = Posting(*args, **kwargs)
|
|
real_postings.append(post)
|
|
return bc_data.Transaction(
|
|
meta,
|
|
date,
|
|
flag,
|
|
payee,
|
|
narration,
|
|
set(tags or ''),
|
|
set(links or ''),
|
|
real_postings,
|
|
)
|
|
|
|
LINK_METADATA_STRINGS = {
|
|
'Invoices/304321.pdf',
|
|
'rt:123/456',
|
|
'rt://ticket/234',
|
|
}
|
|
|
|
NON_LINK_METADATA_STRINGS = {
|
|
'',
|
|
' ',
|
|
' ',
|
|
}
|
|
|
|
NON_STRING_METADATA_VALUES = [
|
|
Decimal(5),
|
|
FY_MID_DATE,
|
|
Amount(50),
|
|
Amount(500, None),
|
|
]
|
|
|
|
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
|
|
'Equity:Funds:Unrestricted',
|
|
'Equity:Funds:Restricted',
|
|
'Equity:OpeningBalance',
|
|
])
|
|
|
|
class ODSCell:
|
|
@classmethod
|
|
def from_row(cls, row):
|
|
return row.getElementsByType(odf.table.TableCell)
|
|
|
|
@classmethod
|
|
def from_sheet(cls, spreadsheet):
|
|
for row in spreadsheet.getElementsByType(odf.table.TableRow):
|
|
yield list(cls.from_row(row))
|
|
|
|
@classmethod
|
|
def from_ods_file(cls, path):
|
|
ods = odf.opendocument.load(path)
|
|
return cls.from_sheet(ods.spreadsheet)
|
|
|
|
|
|
def OpeningBalance(acct=None, **txn_meta):
|
|
if acct is None:
|
|
acct = next(OPENING_EQUITY_ACCOUNTS)
|
|
return Transaction(**txn_meta, postings=[
|
|
('Assets:Receivable:Accounts', 100),
|
|
('Assets:Receivable:Loans', 200),
|
|
('Liabilities:Payable:Accounts', -15),
|
|
('Liabilities:Payable:Vacation', -25),
|
|
(acct, -260),
|
|
])
|
|
|
|
class TestBooksLoader(books.Loader):
|
|
def __init__(self, source):
|
|
self.source = source
|
|
|
|
def load_all(self, from_year=None):
|
|
return bc_loader.load_file(self.source)
|
|
|
|
def load_fy_range(self, from_fy, to_fy=None):
|
|
return self.load_all()
|
|
|
|
|
|
class TestConfig:
|
|
def __init__(self, *,
|
|
books_path=None,
|
|
fiscal_year=(3, 1),
|
|
payment_threshold=0,
|
|
repo_path=None,
|
|
rt_client=None,
|
|
):
|
|
self._books_path = books_path
|
|
self.fiscal_year = fiscal_year
|
|
self._payment_threshold = Decimal(payment_threshold)
|
|
self.repo_path = test_path(repo_path)
|
|
self._rt_client = rt_client
|
|
if rt_client is None:
|
|
self._rt_wrapper = None
|
|
else:
|
|
self._rt_wrapper = rtutil.RT(rt_client)
|
|
|
|
def books_loader(self):
|
|
if self._books_path is None:
|
|
return None
|
|
else:
|
|
return TestBooksLoader(self._books_path)
|
|
|
|
def books_path(self):
|
|
return self._books_path
|
|
|
|
def books_repo(self):
|
|
return None
|
|
|
|
def config_file_path(self):
|
|
return test_path('userconfig/conservancy_beancount/config.ini')
|
|
|
|
def fiscal_year_begin(self):
|
|
return books.FiscalYear(*self.fiscal_year)
|
|
|
|
def payment_threshold(self):
|
|
return self._payment_threshold
|
|
|
|
def repository_path(self):
|
|
return self.repo_path
|
|
|
|
def rt_client(self):
|
|
return self._rt_client
|
|
|
|
def rt_wrapper(self):
|
|
return self._rt_wrapper
|
|
|
|
|
|
def TestRepo(head_hexsha='abcd1234', dirty=False):
|
|
retval = unittest.mock.Mock(spec=git.Repo)
|
|
retval.is_dirty.return_value = dirty
|
|
retval.head.commit.hexsha = head_hexsha
|
|
return retval
|
|
|
|
|
|
class _TicketBuilder:
|
|
MESSAGE_ATTACHMENTS = [
|
|
('(Unnamed)', 'multipart/alternative', '0b'),
|
|
('(Unnamed)', 'text/plain', '1.2k'),
|
|
('(Unnamed)', 'text/html', '1.4k'),
|
|
]
|
|
MISC_ATTACHMENTS = [
|
|
('Forwarded Message.eml', 'message/rfc822', '3.1k'),
|
|
('photo.jpg', 'image/jpeg', '65.2k'),
|
|
('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
|
|
('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
|
|
('statement.txt', 'text/plain', '652b'),
|
|
('screenshot.png', 'image/png', '1.9m'),
|
|
]
|
|
|
|
def __init__(self):
|
|
self.id_seq = itertools.count(1)
|
|
self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
|
|
|
|
def new_attch(self, attch):
|
|
return (str(next(self.id_seq)), *attch)
|
|
|
|
def new_msg_with_attachments(self, attachments_count=1):
|
|
for attch in self.MESSAGE_ATTACHMENTS:
|
|
yield self.new_attch(attch)
|
|
for _ in range(attachments_count):
|
|
yield self.new_attch(next(self.misc_attchs))
|
|
|
|
def new_messages(self, messages_count, attachments_count=None):
|
|
for n in range(messages_count):
|
|
if attachments_count is None:
|
|
att_count = messages_count - n
|
|
else:
|
|
att_count = attachments_count
|
|
yield from self.new_msg_with_attachments(att_count)
|
|
|
|
|
|
class RTClient:
|
|
_builder = _TicketBuilder()
|
|
DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
|
|
TICKET_DATA = {
|
|
'1': list(_builder.new_messages(1, 3)),
|
|
'2': list(_builder.new_messages(2, 1)),
|
|
'3': list(_builder.new_messages(3, 0)),
|
|
}
|
|
del _builder
|
|
|
|
def __init__(self,
|
|
url=DEFAULT_URL,
|
|
default_login=None,
|
|
default_password=None,
|
|
proxy=None,
|
|
default_queue='General',
|
|
skip_login=False,
|
|
verify_cert=True,
|
|
http_auth=None,
|
|
want_cfs=True,
|
|
):
|
|
self.url = url
|
|
if http_auth is None:
|
|
self.user = default_login
|
|
self.password = default_password
|
|
self.auth_method = 'login'
|
|
self.login_result = skip_login or None
|
|
else:
|
|
self.user = http_auth.username
|
|
self.password = http_auth.password
|
|
self.auth_method = type(http_auth).__name__
|
|
self.login_result = True
|
|
self.last_login = None
|
|
self.want_cfs = want_cfs
|
|
self.edits = {}
|
|
|
|
def login(self, login=None, password=None):
|
|
if login is None and password is None:
|
|
login = self.user
|
|
password = self.password
|
|
self.login_result = bool(login and password and not password.startswith('bad'))
|
|
self.last_login = (login, password, self.login_result)
|
|
return self.login_result
|
|
|
|
def get_attachments(self, ticket_id):
|
|
try:
|
|
return list(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_attachment(self, ticket_id, attachment_id):
|
|
try:
|
|
att_seq = iter(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
att_id = str(attachment_id)
|
|
multipart_id = None
|
|
for attch in att_seq:
|
|
if attch[0] == att_id:
|
|
break
|
|
elif attch[2].startswith('multipart/'):
|
|
multipart_id = attch[0]
|
|
else:
|
|
return None
|
|
tx_id = multipart_id or att_id
|
|
if attch[1] == '(Unnamed)':
|
|
filename = ''
|
|
else:
|
|
filename = attch[1]
|
|
return {
|
|
'id': att_id,
|
|
'ContentType': attch[2],
|
|
'Filename': filename,
|
|
'Transaction': tx_id,
|
|
}
|
|
|
|
def get_ticket(self, ticket_id):
|
|
ticket_id_s = str(ticket_id)
|
|
if ticket_id_s not in self.TICKET_DATA:
|
|
return None
|
|
retval = {
|
|
'id': 'ticket/{}'.format(ticket_id_s),
|
|
'numerical_id': ticket_id_s,
|
|
'Requestors': [
|
|
f'mx{ticket_id_s}@example.org',
|
|
'requestor2@example.org',
|
|
],
|
|
}
|
|
if self.want_cfs:
|
|
retval['CF.{payment-amount}'] = ''
|
|
retval['CF.{payment-method}'] = ''
|
|
retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}'
|
|
return retval
|
|
|
|
def edit_ticket(self, ticket_id, **kwargs):
|
|
self.edits.setdefault(str(ticket_id), {}).update(kwargs)
|
|
return True
|
|
|
|
def get_user(self, user_id):
|
|
user_id_s = str(user_id)
|
|
match = re.search(r'(\d+)@', user_id_s)
|
|
if match is None:
|
|
email = f'mx{user_id_s}@example.org'
|
|
user_id_num = int(user_id_s)
|
|
else:
|
|
email = user_id_s
|
|
user_id_num = int(match.group(1))
|
|
retval = {
|
|
'id': f'user/{user_id_num}',
|
|
'EmailAddress': email,
|
|
'Name': email,
|
|
}
|
|
if self.want_cfs:
|
|
retval['RealName'] = f'Mx. {user_id_num}'
|
|
return retval
|