440 lines
13 KiB
Python
440 lines
13 KiB
Python
"""Mock Beancount objects for testing"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import itertools
|
|
import re
|
|
|
|
import beancount.core.amount as bc_amount
|
|
import beancount.core.data as bc_data
|
|
import beancount.loader as bc_loader
|
|
import beancount.parser.options as bc_options
|
|
|
|
import odf.element
|
|
import odf.opendocument
|
|
import odf.table
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any, Optional, NamedTuple
|
|
|
|
from conservancy_beancount import books, data, rtutil
|
|
|
|
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
|
|
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
|
|
FY_START_DATE = datetime.date(2020, 3, 1)
|
|
FY_MID_DATE = datetime.date(2020, 9, 1)
|
|
PAST_DATE = datetime.date(2000, 1, 1)
|
|
TESTS_DIR = Path(__file__).parent
|
|
|
|
# This function is a teardown fixture, but different test files use
|
|
# it with different scopes. Typical usage looks like:
|
|
# clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta)
|
|
def clean_account_meta():
|
|
yield
|
|
data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS)
|
|
data.Account._meta_map.clear()
|
|
|
|
def _ods_cell_value_type(cell):
|
|
assert cell.tagName == 'table:table-cell'
|
|
return cell.getAttribute('valuetype')
|
|
|
|
def _ods_cell_value(cell):
|
|
value_type = cell.getAttribute('valuetype')
|
|
if value_type == 'currency' or value_type == 'float':
|
|
return Decimal(cell.getAttribute('value'))
|
|
elif value_type == 'date':
|
|
return datetime.datetime.strptime(
|
|
cell.getAttribute('datevalue'), '%Y-%m-%d',
|
|
).date()
|
|
else:
|
|
return cell.getAttribute('value')
|
|
|
|
def _ods_elem_text(elem):
|
|
if isinstance(elem, odf.element.Text):
|
|
return elem.data
|
|
else:
|
|
return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
|
|
|
|
odf.element.Element.value_type = property(_ods_cell_value_type)
|
|
odf.element.Element.value = property(_ods_cell_value)
|
|
odf.element.Element.text = property(_ods_elem_text)
|
|
|
|
def check_lines_match(lines, expect_patterns, source='output'):
|
|
for pattern in expect_patterns:
|
|
assert any(re.search(pattern, line) for line in lines), \
|
|
f"{pattern!r} not found in {source}"
|
|
|
|
def check_logs_match(caplog, expected):
|
|
records = iter(caplog.records)
|
|
for exp_level, exp_msg in expected:
|
|
exp_level = exp_level.upper()
|
|
assert any(
|
|
log.levelname == exp_level and log.message == exp_msg for log in records
|
|
), f"{exp_level} log {exp_msg!r} not found"
|
|
|
|
def check_post_meta(txn, *expected_meta, default=None):
|
|
assert len(txn.postings) == len(expected_meta)
|
|
for post, expected in zip(txn.postings, expected_meta):
|
|
if not expected:
|
|
assert not post.meta
|
|
else:
|
|
actual = None if post.meta is None else {
|
|
key: post.meta.get(key, default) for key in expected
|
|
}
|
|
assert actual == expected
|
|
|
|
def combine_values(*value_seqs):
|
|
stop = 0
|
|
for seq in value_seqs:
|
|
try:
|
|
stop = max(stop, len(seq))
|
|
except TypeError:
|
|
pass
|
|
return itertools.islice(
|
|
zip(*(itertools.cycle(seq) for seq in value_seqs)),
|
|
stop,
|
|
)
|
|
|
|
def date_seq(date=FY_MID_DATE, step=1):
|
|
while True:
|
|
yield date
|
|
date += datetime.timedelta(days=step)
|
|
|
|
def parse_date(s, fmt='%Y-%m-%d'):
|
|
return datetime.datetime.strptime(s, fmt).date()
|
|
|
|
def test_path(s):
|
|
if s is None:
|
|
return s
|
|
s = Path(s)
|
|
if not s.is_absolute():
|
|
s = TESTS_DIR / s
|
|
return s
|
|
|
|
def Amount(number, currency='USD'):
|
|
return bc_amount.Amount(Decimal(number), currency)
|
|
|
|
def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
|
|
return bc_data.Cost(Decimal(number), currency, date, label)
|
|
|
|
def Posting(account, number,
|
|
currency='USD', cost=None, price=None, flag=None,
|
|
_post_type=bc_data.Posting, _meta_type=None, **meta):
|
|
if cost is not None:
|
|
cost = Cost(*cost)
|
|
if not meta:
|
|
meta = None
|
|
elif _meta_type:
|
|
meta = _meta_type(meta)
|
|
return _post_type(
|
|
account,
|
|
Amount(number, currency),
|
|
cost,
|
|
price,
|
|
flag,
|
|
meta,
|
|
)
|
|
|
|
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
|
|
narration='', tags=None, links=None, postings=(),
|
|
**meta):
|
|
if isinstance(date, str):
|
|
date = parse_date(date)
|
|
meta.setdefault('filename', '<test>')
|
|
meta.setdefault('lineno', 0)
|
|
real_postings = []
|
|
for post in postings:
|
|
try:
|
|
post.account
|
|
except AttributeError:
|
|
if isinstance(post[-1], dict):
|
|
args = post[:-1]
|
|
kwargs = post[-1]
|
|
else:
|
|
args = post
|
|
kwargs = {}
|
|
post = Posting(*args, **kwargs)
|
|
real_postings.append(post)
|
|
return bc_data.Transaction(
|
|
meta,
|
|
date,
|
|
flag,
|
|
payee,
|
|
narration,
|
|
set(tags or ''),
|
|
set(links or ''),
|
|
real_postings,
|
|
)
|
|
|
|
LINK_METADATA_STRINGS = {
|
|
'Invoices/304321.pdf',
|
|
'rt:123/456',
|
|
'rt://ticket/234',
|
|
}
|
|
|
|
NON_LINK_METADATA_STRINGS = {
|
|
'',
|
|
' ',
|
|
' ',
|
|
}
|
|
|
|
NON_STRING_METADATA_VALUES = [
|
|
Decimal(5),
|
|
FY_MID_DATE,
|
|
Amount(50),
|
|
Amount(500, None),
|
|
]
|
|
|
|
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
|
|
'Equity:Funds:Unrestricted',
|
|
'Equity:Funds:Restricted',
|
|
'Equity:OpeningBalance',
|
|
])
|
|
|
|
class ODSCell:
|
|
@classmethod
|
|
def from_row(cls, row):
|
|
return row.getElementsByType(odf.table.TableCell)
|
|
|
|
@classmethod
|
|
def from_sheet(cls, spreadsheet):
|
|
for row in spreadsheet.getElementsByType(odf.table.TableRow):
|
|
yield list(cls.from_row(row))
|
|
|
|
@classmethod
|
|
def from_ods_file(cls, path):
|
|
ods = odf.opendocument.load(path)
|
|
return cls.from_sheet(ods.spreadsheet)
|
|
|
|
|
|
def OpeningBalance(acct=None, **txn_meta):
|
|
if acct is None:
|
|
acct = next(OPENING_EQUITY_ACCOUNTS)
|
|
return Transaction(**txn_meta, postings=[
|
|
('Assets:Receivable:Accounts', 100),
|
|
('Assets:Receivable:Loans', 200),
|
|
('Liabilities:Payable:Accounts', -15),
|
|
('Liabilities:Payable:Vacation', -25),
|
|
(acct, -260),
|
|
])
|
|
|
|
class TestBooksLoader(books.Loader):
|
|
def __init__(self, source):
|
|
self.source = source
|
|
|
|
def load_all(self, from_year=None):
|
|
return bc_loader.load_file(self.source)
|
|
|
|
def load_fy_range(self, from_fy, to_fy=None):
|
|
return self.load_all()
|
|
|
|
|
|
class TestConfig:
|
|
def __init__(self, *,
|
|
books_path=None,
|
|
fiscal_year=(3, 1),
|
|
payment_threshold=0,
|
|
repo_path=None,
|
|
rt_client=None,
|
|
):
|
|
if books_path is None:
|
|
self._books_loader = None
|
|
else:
|
|
self._books_loader = TestBooksLoader(books_path)
|
|
self.fiscal_year = fiscal_year
|
|
self._payment_threshold = Decimal(payment_threshold)
|
|
self.repo_path = test_path(repo_path)
|
|
self._rt_client = rt_client
|
|
if rt_client is None:
|
|
self._rt_wrapper = None
|
|
else:
|
|
self._rt_wrapper = rtutil.RT(rt_client)
|
|
|
|
def books_loader(self):
|
|
return self._books_loader
|
|
|
|
def config_file_path(self):
|
|
return test_path('userconfig/conservancy_beancount/config.ini')
|
|
|
|
def fiscal_year_begin(self):
|
|
return books.FiscalYear(*self.fiscal_year)
|
|
|
|
def payment_threshold(self):
|
|
return self._payment_threshold
|
|
|
|
def repository_path(self):
|
|
return self.repo_path
|
|
|
|
def rt_client(self):
|
|
return self._rt_client
|
|
|
|
def rt_wrapper(self):
|
|
return self._rt_wrapper
|
|
|
|
|
|
class _TicketBuilder:
|
|
MESSAGE_ATTACHMENTS = [
|
|
('(Unnamed)', 'multipart/alternative', '0b'),
|
|
('(Unnamed)', 'text/plain', '1.2k'),
|
|
('(Unnamed)', 'text/html', '1.4k'),
|
|
]
|
|
MISC_ATTACHMENTS = [
|
|
('Forwarded Message.eml', 'message/rfc822', '3.1k'),
|
|
('photo.jpg', 'image/jpeg', '65.2k'),
|
|
('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
|
|
('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
|
|
('statement.txt', 'text/plain', '652b'),
|
|
('screenshot.png', 'image/png', '1.9m'),
|
|
]
|
|
|
|
def __init__(self):
|
|
self.id_seq = itertools.count(1)
|
|
self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
|
|
|
|
def new_attch(self, attch):
|
|
return (str(next(self.id_seq)), *attch)
|
|
|
|
def new_msg_with_attachments(self, attachments_count=1):
|
|
for attch in self.MESSAGE_ATTACHMENTS:
|
|
yield self.new_attch(attch)
|
|
for _ in range(attachments_count):
|
|
yield self.new_attch(next(self.misc_attchs))
|
|
|
|
def new_messages(self, messages_count, attachments_count=None):
|
|
for n in range(messages_count):
|
|
if attachments_count is None:
|
|
att_count = messages_count - n
|
|
else:
|
|
att_count = attachments_count
|
|
yield from self.new_msg_with_attachments(att_count)
|
|
|
|
|
|
class RTClient:
|
|
_builder = _TicketBuilder()
|
|
DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
|
|
TICKET_DATA = {
|
|
'1': list(_builder.new_messages(1, 3)),
|
|
'2': list(_builder.new_messages(2, 1)),
|
|
'3': list(_builder.new_messages(3, 0)),
|
|
}
|
|
del _builder
|
|
|
|
def __init__(self,
|
|
url=DEFAULT_URL,
|
|
default_login=None,
|
|
default_password=None,
|
|
proxy=None,
|
|
default_queue='General',
|
|
skip_login=False,
|
|
verify_cert=True,
|
|
http_auth=None,
|
|
want_cfs=True,
|
|
):
|
|
self.url = url
|
|
if http_auth is None:
|
|
self.user = default_login
|
|
self.password = default_password
|
|
self.auth_method = 'login'
|
|
self.login_result = skip_login or None
|
|
else:
|
|
self.user = http_auth.username
|
|
self.password = http_auth.password
|
|
self.auth_method = type(http_auth).__name__
|
|
self.login_result = True
|
|
self.last_login = None
|
|
self.want_cfs = want_cfs
|
|
self.edits = {}
|
|
|
|
def login(self, login=None, password=None):
|
|
if login is None and password is None:
|
|
login = self.user
|
|
password = self.password
|
|
self.login_result = bool(login and password and not password.startswith('bad'))
|
|
self.last_login = (login, password, self.login_result)
|
|
return self.login_result
|
|
|
|
def get_attachments(self, ticket_id):
|
|
try:
|
|
return list(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_attachment(self, ticket_id, attachment_id):
|
|
try:
|
|
att_seq = iter(self.TICKET_DATA[str(ticket_id)])
|
|
except KeyError:
|
|
return None
|
|
att_id = str(attachment_id)
|
|
multipart_id = None
|
|
for attch in att_seq:
|
|
if attch[0] == att_id:
|
|
break
|
|
elif attch[2].startswith('multipart/'):
|
|
multipart_id = attch[0]
|
|
else:
|
|
return None
|
|
tx_id = multipart_id or att_id
|
|
if attch[1] == '(Unnamed)':
|
|
filename = ''
|
|
else:
|
|
filename = attch[1]
|
|
return {
|
|
'id': att_id,
|
|
'ContentType': attch[2],
|
|
'Filename': filename,
|
|
'Transaction': tx_id,
|
|
}
|
|
|
|
def get_ticket(self, ticket_id):
|
|
ticket_id_s = str(ticket_id)
|
|
if ticket_id_s not in self.TICKET_DATA:
|
|
return None
|
|
retval = {
|
|
'id': 'ticket/{}'.format(ticket_id_s),
|
|
'numerical_id': ticket_id_s,
|
|
'Requestors': [
|
|
f'mx{ticket_id_s}@example.org',
|
|
'requestor2@example.org',
|
|
],
|
|
}
|
|
if self.want_cfs:
|
|
retval['CF.{payment-amount}'] = ''
|
|
retval['CF.{payment-method}'] = ''
|
|
retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}'
|
|
return retval
|
|
|
|
def edit_ticket(self, ticket_id, **kwargs):
|
|
self.edits.setdefault(str(ticket_id), {}).update(kwargs)
|
|
return True
|
|
|
|
def get_user(self, user_id):
|
|
user_id_s = str(user_id)
|
|
match = re.search(r'(\d+)@', user_id_s)
|
|
if match is None:
|
|
email = f'mx{user_id_s}@example.org'
|
|
user_id_num = int(user_id_s)
|
|
else:
|
|
email = user_id_s
|
|
user_id_num = int(match.group(1))
|
|
retval = {
|
|
'id': f'user/{user_id_num}',
|
|
'EmailAddress': email,
|
|
'Name': email,
|
|
}
|
|
if self.want_cfs:
|
|
retval['RealName'] = f'Mx. {user_id_num}'
|
|
return retval
|