"""Mock Beancount objects for testing""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import itertools import re import unittest.mock import beancount.core.amount as bc_amount import beancount.core.data as bc_data import beancount.loader as bc_loader import beancount.parser.options as bc_options import git import odf.element import odf.opendocument import odf.table from decimal import Decimal from pathlib import Path from typing import Any, Optional, NamedTuple from conservancy_beancount import books, data, rtutil EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30) FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99) FY_START_DATE = datetime.date(2020, 3, 1) FY_MID_DATE = datetime.date(2020, 9, 1) PAST_DATE = datetime.date(2000, 1, 1) TESTS_DIR = Path(__file__).parent # This function is a teardown fixture, but different test files use # it with different scopes. Typical usage looks like: # clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta) def clean_account_meta(): try: yield finally: data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS) data.Account._meta_map.clear() def _ods_cell_value_type(cell): assert cell.tagName == 'table:table-cell' return cell.getAttribute('valuetype') def _ods_cell_value(cell): value_type = cell.getAttribute('valuetype') if value_type == 'currency' or value_type == 'float': return Decimal(cell.getAttribute('value')) elif value_type == 'date': return datetime.datetime.strptime( cell.getAttribute('datevalue'), '%Y-%m-%d', ).date() else: return cell.getAttribute('value') def _ods_elem_text(elem): if isinstance(elem, odf.element.Text): return elem.data else: return '\0'.join(_ods_elem_text(child) for child in elem.childNodes) odf.element.Element.value_type = property(_ods_cell_value_type) odf.element.Element.value = property(_ods_cell_value) odf.element.Element.text = property(_ods_elem_text) def check_lines_match(lines, expect_patterns, source='output'): for pattern in expect_patterns: assert any(re.search(pattern, line) for line in lines), \ f"{pattern!r} not found in {source}" def check_logs_match(caplog, expected): records = iter(caplog.records) for exp_level, exp_msg in expected: exp_level = exp_level.upper() assert any( log.levelname == exp_level and log.message == exp_msg for log in records ), f"{exp_level} log {exp_msg!r} not found" def check_post_meta(txn, *expected_meta, default=None): assert len(txn.postings) == len(expected_meta) for post, expected in zip(txn.postings, expected_meta): if not expected: assert not post.meta else: actual = None if post.meta is None else { key: post.meta.get(key, default) for key in expected } assert actual == expected def combine_values(*value_seqs): stop = 0 for seq in value_seqs: try: stop = max(stop, len(seq)) except TypeError: pass return itertools.islice( zip(*(itertools.cycle(seq) for seq in value_seqs)), stop, ) def date_seq(date=FY_MID_DATE, step=1): while True: yield date date += datetime.timedelta(days=step) def parse_date(s, fmt='%Y-%m-%d'): return datetime.datetime.strptime(s, fmt).date() def test_path(s): if s is None: return s s = Path(s) if not s.is_absolute(): s = TESTS_DIR / s return s def Amount(number, currency='USD'): return bc_amount.Amount(Decimal(number), currency) def Cost(number, currency='USD', date=FY_MID_DATE, label=None): return bc_data.Cost(Decimal(number), currency, date, label) def Posting(account, number, currency='USD', cost=None, price=None, flag=None, _post_type=bc_data.Posting, _meta_type=None, **meta): if cost is not None: cost = Cost(*cost) if not meta: meta = None elif _meta_type: meta = _meta_type(meta) return _post_type( account, Amount(number, currency), cost, price, flag, meta, ) def Transaction(date=FY_MID_DATE, flag='*', payee=None, narration='', tags=None, links=None, postings=(), **meta): if isinstance(date, str): date = parse_date(date) meta.setdefault('filename', '') meta.setdefault('lineno', 0) real_postings = [] for post in postings: try: post.account except AttributeError: if isinstance(post[-1], dict): args = post[:-1] kwargs = post[-1] else: args = post kwargs = {} post = Posting(*args, **kwargs) real_postings.append(post) return bc_data.Transaction( meta, date, flag, payee, narration, set(tags or ''), set(links or ''), real_postings, ) LINK_METADATA_STRINGS = { 'Invoices/304321.pdf', 'rt:123/456', 'rt://ticket/234', } NON_LINK_METADATA_STRINGS = { '', ' ', ' ', } NON_STRING_METADATA_VALUES = [ Decimal(5), FY_MID_DATE, Amount(50), Amount(500, None), ] OPENING_EQUITY_ACCOUNTS = itertools.cycle([ 'Equity:Funds:Unrestricted', 'Equity:Funds:Restricted', 'Equity:OpeningBalance', ]) class ODSCell: @classmethod def from_row(cls, row): return row.getElementsByType(odf.table.TableCell) @classmethod def from_sheet(cls, spreadsheet): for row in spreadsheet.getElementsByType(odf.table.TableRow): yield list(cls.from_row(row)) @classmethod def from_ods_file(cls, path): ods = odf.opendocument.load(path) return cls.from_sheet(ods.spreadsheet) def OpeningBalance(acct=None, **txn_meta): if acct is None: acct = next(OPENING_EQUITY_ACCOUNTS) return Transaction(**txn_meta, postings=[ ('Assets:Receivable:Accounts', 100), ('Assets:Receivable:Loans', 200), ('Liabilities:Payable:Accounts', -15), ('Liabilities:Payable:Vacation', -25), (acct, -260), ]) class TestBooksLoader(books.Loader): def __init__(self, source): self.source = source def load_all(self, from_year=None): return bc_loader.load_file(self.source) def load_fy_range(self, from_fy, to_fy=None): return self.load_all() class TestConfig: def __init__(self, *, books_path=None, fiscal_year=(3, 1), payment_threshold=0, repo_path=None, rt_client=None, ): self._books_path = books_path self.fiscal_year = fiscal_year self._payment_threshold = Decimal(payment_threshold) self.repo_path = test_path(repo_path) self._rt_client = rt_client if rt_client is None: self._rt_wrapper = None else: self._rt_wrapper = rtutil.RT(rt_client) def books_loader(self): if self._books_path is None: return None else: return TestBooksLoader(self._books_path) def books_path(self): return self._books_path def books_repo(self): return None def config_file_path(self): return test_path('userconfig/conservancy_beancount/config.ini') def fiscal_year_begin(self): return books.FiscalYear(*self.fiscal_year) def payment_threshold(self): return self._payment_threshold def repository_path(self): return self.repo_path def rt_client(self): return self._rt_client def rt_wrapper(self): return self._rt_wrapper def TestRepo(head_hexsha='abcd1234', dirty=False): retval = unittest.mock.Mock(spec=git.Repo) retval.is_dirty.return_value = dirty retval.head.commit.hexsha = head_hexsha return retval class _TicketBuilder: MESSAGE_ATTACHMENTS = [ ('(Unnamed)', 'multipart/alternative', '0b'), ('(Unnamed)', 'text/plain', '1.2k'), ('(Unnamed)', 'text/html', '1.4k'), ] MISC_ATTACHMENTS = [ ('Forwarded Message.eml', 'message/rfc822', '3.1k'), ('photo.jpg', 'image/jpeg', '65.2k'), ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'), ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'), ('statement.txt', 'text/plain', '652b'), ('screenshot.png', 'image/png', '1.9m'), ] def __init__(self): self.id_seq = itertools.count(1) self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS) def new_attch(self, attch): return (str(next(self.id_seq)), *attch) def new_msg_with_attachments(self, attachments_count=1): for attch in self.MESSAGE_ATTACHMENTS: yield self.new_attch(attch) for _ in range(attachments_count): yield self.new_attch(next(self.misc_attchs)) def new_messages(self, messages_count, attachments_count=None): for n in range(messages_count): if attachments_count is None: att_count = messages_count - n else: att_count = attachments_count yield from self.new_msg_with_attachments(att_count) class RTClient: _builder = _TicketBuilder() DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/' TICKET_DATA = { '1': list(_builder.new_messages(1, 3)), '2': list(_builder.new_messages(2, 1)), '3': list(_builder.new_messages(3, 0)), } del _builder def __init__(self, url=DEFAULT_URL, default_login=None, default_password=None, proxy=None, default_queue='General', skip_login=False, verify_cert=True, http_auth=None, want_cfs=True, ): self.url = url if http_auth is None: self.user = default_login self.password = default_password self.auth_method = 'login' self.login_result = skip_login or None else: self.user = http_auth.username self.password = http_auth.password self.auth_method = type(http_auth).__name__ self.login_result = True self.last_login = None self.want_cfs = want_cfs self.edits = {} def login(self, login=None, password=None): if login is None and password is None: login = self.user password = self.password self.login_result = bool(login and password and not password.startswith('bad')) self.last_login = (login, password, self.login_result) return self.login_result def get_attachments(self, ticket_id): try: return list(self.TICKET_DATA[str(ticket_id)]) except KeyError: return None def get_attachment(self, ticket_id, attachment_id): try: att_seq = iter(self.TICKET_DATA[str(ticket_id)]) except KeyError: return None att_id = str(attachment_id) multipart_id = None for attch in att_seq: if attch[0] == att_id: break elif attch[2].startswith('multipart/'): multipart_id = attch[0] else: return None tx_id = multipart_id or att_id if attch[1] == '(Unnamed)': filename = '' else: filename = attch[1] return { 'id': att_id, 'ContentType': attch[2], 'Filename': filename, 'Transaction': tx_id, } def get_ticket(self, ticket_id): ticket_id_s = str(ticket_id) if ticket_id_s not in self.TICKET_DATA: return None retval = { 'id': 'ticket/{}'.format(ticket_id_s), 'numerical_id': ticket_id_s, 'Requestors': [ f'mx{ticket_id_s}@example.org', 'requestor2@example.org', ], } if self.want_cfs: retval['CF.{payment-amount}'] = '' retval['CF.{payment-method}'] = '' retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}' return retval def edit_ticket(self, ticket_id, **kwargs): self.edits.setdefault(str(ticket_id), {}).update(kwargs) return True def get_user(self, user_id): user_id_s = str(user_id) match = re.search(r'(\d+)@', user_id_s) if match is None: email = f'mx{user_id_s}@example.org' user_id_num = int(user_id_s) else: email = user_id_s user_id_num = int(match.group(1)) retval = { 'id': f'user/{user_id_num}', 'EmailAddress': email, 'Name': email, } if self.want_cfs: retval['RealName'] = f'Mx. {user_id_num}' return retval