289 lines
10 KiB
Python
289 lines
10 KiB
Python
"""Test RT integration"""
|
|
# Copyright © 2020 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import contextlib
|
|
import itertools
|
|
import logging
|
|
import re
|
|
|
|
import pytest
|
|
|
|
from . import testutil
|
|
|
|
from conservancy_beancount import rtutil
|
|
|
|
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
|
|
|
|
EXPECTED_URLS = [
|
|
(1, None, 'Ticket/Display.html?id=1'),
|
|
(1, 2, 'Ticket/Display.html?id=1#txn-1'),
|
|
(1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
|
|
(1, 99, None),
|
|
(2, 1, None),
|
|
(2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
|
|
(2, 13, 'Ticket/Display.html?id=2#txn-11'),
|
|
(2, 14, 'Ticket/Display.html?id=2#txn-11'), # statement.txt
|
|
(3, None, 'Ticket/Display.html?id=3'),
|
|
(9, None, None),
|
|
]
|
|
|
|
EXPECTED_URLS_MAP = {
|
|
(ticket_id, attachment_id): url
|
|
for ticket_id, attachment_id, url in EXPECTED_URLS
|
|
}
|
|
|
|
@pytest.fixture(scope='module')
|
|
def rt():
|
|
client = testutil.RTClient()
|
|
return rtutil.RT(client)
|
|
|
|
@pytest.fixture
|
|
def new_client():
|
|
class RTClient(testutil.RTClient):
|
|
TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
|
|
return RTClient()
|
|
|
|
@contextlib.contextmanager
|
|
def nullcontext(thing):
|
|
yield thing
|
|
|
|
def new_cache(database=':memory:'):
|
|
db = rtutil.RTLinkCache.setup(database)
|
|
if db is None:
|
|
print("NOTE: did not set up database cache at {}".format(database))
|
|
return nullcontext(db)
|
|
else:
|
|
return contextlib.closing(db)
|
|
|
|
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
|
|
def test_url(rt, ticket_id, attachment_id, expected):
|
|
if expected is not None:
|
|
expected = DEFAULT_RT_URL + expected
|
|
assert rt.url(ticket_id, attachment_id) == expected
|
|
|
|
@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
|
|
[245, None],
|
|
[True, False],
|
|
))
|
|
def test_metadata_regexp(rt, attachment_id, first_link_only):
|
|
if attachment_id is None:
|
|
match_links = ['rt:220', 'rt://ticket/220']
|
|
else:
|
|
match_links = [f'rt:220/{attachment_id}',
|
|
f'rt://ticket/220/attachments/{attachment_id}']
|
|
regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
|
|
for link in match_links:
|
|
assert re.search(regexp, link)
|
|
assert re.search(regexp, link + ' link2')
|
|
assert re.search(regexp, link + '0') is None
|
|
assert re.search(regexp, 'a' + link) is None
|
|
end_match = re.search(regexp, 'link0 ' + link)
|
|
if first_link_only:
|
|
assert end_match is None
|
|
else:
|
|
assert end_match
|
|
|
|
@pytest.mark.parametrize('attachment_id', [
|
|
13,
|
|
None,
|
|
])
|
|
def test_url_caches(new_client, attachment_id):
|
|
if attachment_id is None:
|
|
fragment = ''
|
|
else:
|
|
fragment = '#txn-11'
|
|
expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
|
|
rt = rtutil.RT(new_client)
|
|
assert rt.url(2, attachment_id) == expected
|
|
new_client.TICKET_DATA.clear()
|
|
assert rt.url(2, attachment_id) == expected
|
|
|
|
@pytest.mark.parametrize('mimetype,extension', [
|
|
('application/pdf', 'pdf'),
|
|
('image/png', 'png'),
|
|
('message/rfc822', 'eml'),
|
|
('x-test/x-unknown', 'bin'),
|
|
])
|
|
def test_url_default_filename(new_client, mimetype, extension):
|
|
new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
|
|
rt = rtutil.RT(new_client)
|
|
expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
|
|
assert rt.url(1, 9) == expected
|
|
|
|
@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
|
|
('{}', '{}', '{}',),
|
|
('<{}>', '[{}]', '({})'),
|
|
])
|
|
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
|
|
expected_map = {
|
|
'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
|
|
for tid, aid, url in EXPECTED_URLS
|
|
}
|
|
expected_map['https://example.com'] = None
|
|
expected_map['invoice.pdf'] = None
|
|
keys = list(expected_map)
|
|
urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
|
|
for key, actual in itertools.zip_longest(keys, urls):
|
|
expected = expected_map[key]
|
|
if expected is None:
|
|
if key.startswith('rt:'):
|
|
expected = missing_fmt.format(key)
|
|
else:
|
|
expected = nonrt_fmt.format(key)
|
|
else:
|
|
expected = rt_fmt.format(DEFAULT_RT_URL + expected)
|
|
assert actual == expected
|
|
|
|
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
|
|
def test_exists(rt, ticket_id, attachment_id, expected):
|
|
expected = False if expected is None else True
|
|
assert rt.exists(ticket_id, attachment_id) is expected
|
|
|
|
def test_exists_caches(new_client):
|
|
rt = rtutil.RT(new_client)
|
|
assert rt.exists(1, 3)
|
|
assert rt.exists(2)
|
|
assert not rt.exists(1, 99)
|
|
assert not rt.exists(9)
|
|
new_client.TICKET_DATA.clear()
|
|
assert rt.exists(1, 3)
|
|
assert rt.exists(2)
|
|
assert not rt.exists(1, 99)
|
|
assert not rt.exists(9)
|
|
|
|
@pytest.mark.parametrize('link,expected', [
|
|
('rt:1/2', ('1', '2')),
|
|
('rt:123/456', ('123', '456')),
|
|
('rt:12345', ('12345', None)),
|
|
('rt:12346/', ('12346', None)),
|
|
('rt:12346/789', ('12346', '789')),
|
|
('rt:12346/780/', ('12346', '780')),
|
|
('rt://ticket/1', ('1', None)),
|
|
('rt://ticket/1/', ('1', None)),
|
|
('rt://ticket/1234/attachments/5678', ('1234', '5678')),
|
|
('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
|
|
('rt://ticket/1234/attachment/5678', ('1234', '5678')),
|
|
('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
|
|
('rt:', None),
|
|
('rt://', None),
|
|
('rt:example.org', None),
|
|
('rt:example.org/1', None),
|
|
('rt://example.org', None),
|
|
('rt://example.org/1', None),
|
|
('https://example.org/rt/Ticket/Display.html?id=123', None),
|
|
])
|
|
def test_parse(rt, link, expected):
|
|
assert rt.parse(link) == expected
|
|
|
|
@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
|
|
('12', None, 'rt:12'),
|
|
(34, None, 'rt:34'),
|
|
('56', '78', 'rt:56/78'),
|
|
(90, 880, 'rt:90/880'),
|
|
])
|
|
def test_unparse(rt, ticket_id, attachment_id, expected):
|
|
assert rt.unparse(ticket_id, attachment_id) == expected
|
|
|
|
def test_uncommon_server_url_parsing():
|
|
url = 'https://example.org/REST/1.0/'
|
|
client = testutil.RTClient(url + 'REST/1.0/')
|
|
rt = rtutil.RT(client)
|
|
assert rt.url(1).startswith(url)
|
|
|
|
def test_shared_cache(new_client):
|
|
ticket_id, _, expected = EXPECTED_URLS[0]
|
|
expected = DEFAULT_RT_URL + expected
|
|
with new_cache() as cachedb:
|
|
rt1 = rtutil.RT(new_client, cachedb)
|
|
assert rt1.url(ticket_id) == expected
|
|
new_client.TICKET_DATA.clear()
|
|
rt2 = rtutil.RT(new_client, cachedb)
|
|
assert rt2.url(ticket_id) == expected
|
|
assert not rt2.exists(ticket_id + 1)
|
|
assert rt1 is not rt2
|
|
|
|
def test_no_shared_cache(new_client):
|
|
with new_cache() as cache1, new_cache() as cache2:
|
|
rt1 = rtutil.RT(new_client, cache1)
|
|
rt2 = rtutil.RT(new_client, cache2)
|
|
assert rt1.exists(1)
|
|
new_client.TICKET_DATA.clear()
|
|
assert not rt2.exists(1)
|
|
assert rt1.exists(1)
|
|
|
|
def test_read_only_cache(new_client, tmp_path, caplog):
|
|
caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
|
|
db_path = tmp_path / 'test.db'
|
|
ticket_id, _, expected = EXPECTED_URLS[0]
|
|
expected = DEFAULT_RT_URL + expected
|
|
with new_cache(db_path) as cache1:
|
|
rt1 = rtutil.RT(new_client, cache1)
|
|
assert rt1.url(ticket_id) == expected
|
|
new_client.TICKET_DATA.clear()
|
|
db_path.chmod(0o400)
|
|
with new_cache(db_path) as cache2:
|
|
rt2 = rtutil.RT(new_client, cache2)
|
|
assert rt2.url(ticket_id) == expected
|
|
assert rt2.url(ticket_id + 1) is None
|
|
|
|
def test_results_not_found_only_in_transient_cache(new_client):
|
|
with new_cache() as cache:
|
|
rt1 = rtutil.RT(new_client, cache)
|
|
rt2 = rtutil.RT(new_client, cache)
|
|
assert not rt1.exists(9)
|
|
new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
|
|
assert not rt1.exists(9)
|
|
assert rt2.exists(9)
|
|
|
|
def test_txn_with_urls(rt):
|
|
txn_meta = {
|
|
'rt-id': 'rt:1',
|
|
'contract': 'RepoLink.pdf',
|
|
'statement': 'doc1.txt rt:1/4 doc2.txt',
|
|
}
|
|
txn = testutil.Transaction(**txn_meta, postings=[
|
|
('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
|
|
('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
|
|
])
|
|
actual = rt.txn_with_urls(txn)
|
|
def check(source, key, ticket_id, attachment_id=None):
|
|
url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
|
|
assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
|
|
expected_keys = set(txn_meta)
|
|
expected_keys.update(['filename', 'lineno'])
|
|
assert set(actual.meta) == expected_keys
|
|
check(actual, 'rt-id', 1)
|
|
assert actual.meta['contract'] == txn_meta['contract']
|
|
assert actual.meta['statement'].startswith('doc1.txt ')
|
|
check(actual, 'statement', 1, 4)
|
|
check(actual.postings[0], 'receipt', 2, 13)
|
|
assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
|
|
check(actual.postings[1], 'receipt', 2, 14)
|
|
assert actual.postings[1].meta['receipt'].startswith('cash.png ')
|
|
# Check the original transaction is unchanged
|
|
for key, expected in txn_meta.items():
|
|
assert txn.meta[key] == expected
|
|
assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
|
|
assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'
|
|
|
|
def test_txn_with_urls_with_fmts(rt):
|
|
txn_meta = {
|
|
'rt-id': 'rt:1',
|
|
'contract': 'RepoLink.pdf',
|
|
'statement': 'rt:1/99 rt:1/4 stmt.txt',
|
|
}
|
|
txn = testutil.Transaction(**txn_meta)
|
|
actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
|
|
rt_id_path = EXPECTED_URLS_MAP[(1, None)]
|
|
assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
|
|
assert actual.meta['contract'] == '[RepoLink.pdf]'
|
|
statement_path = EXPECTED_URLS_MAP[(1, 4)]
|
|
assert actual.meta['statement'] == ' '.join([
|
|
'(rt:1/99)',
|
|
f'<{DEFAULT_RT_URL}{statement_path}>',
|
|
'[stmt.txt]',
|
|
])
|