"""Mock Beancount objects for testing""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import datetime import itertools import re import beancount.core.amount as bc_amount import beancount.core.data as bc_data import beancount.loader as bc_loader from decimal import Decimal from pathlib import Path from conservancy_beancount import books, rtutil EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30) FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99) FY_START_DATE = datetime.date(2020, 3, 1) FY_MID_DATE = datetime.date(2020, 9, 1) PAST_DATE = datetime.date(2000, 1, 1) TESTS_DIR = Path(__file__).parent def check_lines_match(lines, expect_patterns, source='output'): for pattern in expect_patterns: assert any(re.search(pattern, line) for line in lines), \ f"{pattern!r} not found in {source}" def check_post_meta(txn, *expected_meta, default=None): assert len(txn.postings) == len(expected_meta) for post, expected in zip(txn.postings, expected_meta): if not expected: assert not post.meta else: actual = None if post.meta is None else { key: post.meta.get(key, default) for key in expected } assert actual == expected def combine_values(*value_seqs): stop = 0 for seq in value_seqs: try: stop = max(stop, len(seq)) except TypeError: pass return itertools.islice( zip(*(itertools.cycle(seq) for seq in value_seqs)), stop, ) def date_seq(date=FY_MID_DATE, step=1): while True: yield date date += datetime.timedelta(days=step) def parse_date(s, fmt='%Y-%m-%d'): return datetime.datetime.strptime(s, fmt).date() def test_path(s): if s is None: return s s = Path(s) if not s.is_absolute(): s = TESTS_DIR / s return s def Amount(number, currency='USD'): return bc_amount.Amount(Decimal(number), currency) def Cost(number, currency='USD', date=FY_MID_DATE, label=None): return bc_data.Cost(Decimal(number), currency, date, label) def Posting(account, number, currency='USD', cost=None, price=None, flag=None, type_=bc_data.Posting, **meta): if cost is not None: cost = Cost(*cost) if not meta: meta = None return type_( account, Amount(number, currency), cost, price, flag, meta, ) def Transaction(date=FY_MID_DATE, flag='*', payee=None, narration='', tags=None, links=None, postings=(), **meta): if isinstance(date, str): date = parse_date(date) meta.setdefault('filename', '') meta.setdefault('lineno', 0) real_postings = [] for post in postings: try: post.account except AttributeError: if isinstance(post[-1], dict): args = post[:-1] kwargs = post[-1] else: args = post kwargs = {} post = Posting(*args, **kwargs) real_postings.append(post) return bc_data.Transaction( meta, date, flag, payee, narration, set(tags or ''), set(links or ''), real_postings, ) LINK_METADATA_STRINGS = { 'Invoices/304321.pdf', 'rt:123/456', 'rt://ticket/234', } NON_LINK_METADATA_STRINGS = { '', ' ', ' ', } NON_STRING_METADATA_VALUES = [ Decimal(5), FY_MID_DATE, Amount(50), Amount(500, None), ] OPENING_EQUITY_ACCOUNTS = itertools.cycle([ 'Equity:Funds:Unrestricted', 'Equity:Funds:Restricted', 'Equity:OpeningBalance', ]) def balance_map(source=None, **kwargs): # The source and/or kwargs should map currency name strings to # things you can pass to Decimal (a decimal string, an int, etc.) # This returns a dict that maps currency name strings to Amount instances. retval = {} if source is not None: retval.update((currency, Amount(number, currency)) for currency, number in source) if kwargs: retval.update(balance_map(kwargs.items())) return retval def OpeningBalance(acct=None, **txn_meta): if acct is None: acct = next(OPENING_EQUITY_ACCOUNTS) return Transaction(**txn_meta, postings=[ ('Assets:Receivable:Accounts', 100), ('Assets:Receivable:Loans', 200), ('Liabilities:Payable:Accounts', -15), ('Liabilities:Payable:Vacation', -25), (acct, -260), ]) class TestBooksLoader(books.Loader): def __init__(self, source): self.source = source def load_fy_range(self, from_fy, to_fy=None): return bc_loader.load_file(self.source) class TestConfig: def __init__(self, *, books_path=None, payment_threshold=0, repo_path=None, rt_client=None, ): if books_path is None: self._books_loader = None else: self._books_loader = TestBooksLoader(books_path) self._payment_threshold = Decimal(payment_threshold) self.repo_path = test_path(repo_path) self._rt_client = rt_client if rt_client is None: self._rt_wrapper = None else: self._rt_wrapper = rtutil.RT(rt_client) def books_loader(self): return self._books_loader def config_file_path(self): return test_path('userconfig/conservancy_beancount/config.ini') def payment_threshold(self): return self._payment_threshold def repository_path(self): return self.repo_path def rt_client(self): return self._rt_client def rt_wrapper(self): return self._rt_wrapper class _TicketBuilder: MESSAGE_ATTACHMENTS = [ ('(Unnamed)', 'multipart/alternative', '0b'), ('(Unnamed)', 'text/plain', '1.2k'), ('(Unnamed)', 'text/html', '1.4k'), ] MISC_ATTACHMENTS = [ ('Forwarded Message.eml', 'message/rfc822', '3.1k'), ('photo.jpg', 'image/jpeg', '65.2k'), ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'), ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'), ('statement.txt', 'text/plain', '652b'), ('screenshot.png', 'image/png', '1.9m'), ] def __init__(self): self.id_seq = itertools.count(1) self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS) def new_attch(self, attch): return (str(next(self.id_seq)), *attch) def new_msg_with_attachments(self, attachments_count=1): for attch in self.MESSAGE_ATTACHMENTS: yield self.new_attch(attch) for _ in range(attachments_count): yield self.new_attch(next(self.misc_attchs)) def new_messages(self, messages_count, attachments_count=None): for n in range(messages_count): if attachments_count is None: att_count = messages_count - n else: att_count = attachments_count yield from self.new_msg_with_attachments(att_count) class RTClient: _builder = _TicketBuilder() DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/' TICKET_DATA = { '1': list(_builder.new_messages(1, 3)), '2': list(_builder.new_messages(2, 1)), '3': list(_builder.new_messages(3, 0)), } del _builder def __init__(self, url=DEFAULT_URL, default_login=None, default_password=None, proxy=None, default_queue='General', skip_login=False, verify_cert=True, http_auth=None, want_cfs=True, ): self.url = url if http_auth is None: self.user = default_login self.password = default_password self.auth_method = 'login' self.login_result = skip_login or None else: self.user = http_auth.username self.password = http_auth.password self.auth_method = type(http_auth).__name__ self.login_result = True self.last_login = None self.want_cfs = want_cfs def login(self, login=None, password=None): if login is None and password is None: login = self.user password = self.password self.login_result = bool(login and password and not password.startswith('bad')) self.last_login = (login, password, self.login_result) return self.login_result def get_attachments(self, ticket_id): try: return list(self.TICKET_DATA[str(ticket_id)]) except KeyError: return None def get_attachment(self, ticket_id, attachment_id): try: att_seq = iter(self.TICKET_DATA[str(ticket_id)]) except KeyError: return None att_id = str(attachment_id) multipart_id = None for attch in att_seq: if attch[0] == att_id: break elif attch[2].startswith('multipart/'): multipart_id = attch[0] else: return None tx_id = multipart_id or att_id if attch[1] == '(Unnamed)': filename = '' else: filename = attch[1] return { 'id': att_id, 'ContentType': attch[2], 'Filename': filename, 'Transaction': tx_id, } def get_ticket(self, ticket_id): ticket_id_s = str(ticket_id) if ticket_id_s not in self.TICKET_DATA: return None retval = { 'id': 'ticket/{}'.format(ticket_id_s), 'numerical_id': ticket_id_s, 'Requestors': [ f'mx{ticket_id_s}@example.org', 'requestor2@example.org', ], } if self.want_cfs: retval['CF.{payment-method}'] = f'payment method {ticket_id_s}' retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}' return retval def get_user(self, user_id): user_id_s = str(user_id) match = re.search(r'(\d+)@', user_id_s) if match is None: email = f'mx{user_id_s}@example.org' user_id_num = int(user_id_s) else: email = user_id_s user_id_num = int(match.group(1)) return { 'id': f'user/{user_id_num}', 'EmailAddress': email, 'Name': email, 'RealName': f'Mx. {user_id_num}', }