diff --git a/conservancy_beancount/rtutil.py b/conservancy_beancount/rtutil.py
new file mode 100644
index 0000000..3bdc2f9
--- /dev/null
+++ b/conservancy_beancount/rtutil.py
@@ -0,0 +1,107 @@
+"""RT client utilities"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import functools
+import mimetypes
+import urllib.parse as urlparse
+
+import rt
+
+from typing import (
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+)
+
+AttachmentTuple = Tuple[str, str, str, str]
+RTId = Union[int, str]
+
+class RT:
+ def __init__(self, rt_client: rt.Rt) -> None:
+ self.rt = rt_client
+ urlparts = urlparse.urlparse(rt_client.url)
+ try:
+ index = urlparts.path.index('/REST/')
+ except ValueError:
+ base_path = urlparts.path.rstrip('/') + '/'
+ else:
+ base_path = urlparts.path[:index + 1]
+ self.url_base = urlparts._replace(path=base_path)
+
+ def _extend_url(self,
+ path_tail: str,
+ fragment: Optional[str]=None,
+ **query: str,
+ ) -> str:
+ if fragment is None:
+ fragment = self.url_base.fragment
+ else:
+ fragment = urlparse.quote(fragment)
+ if query:
+ query_s = urlparse.urlencode(query)
+ else:
+ query_s = self.url_base.query
+ urlparts = self.url_base._replace(
+ path=self.url_base.path + urlparse.quote(path_tail),
+ query=query_s,
+ fragment=fragment,
+ )
+ return urlparse.urlunparse(urlparts)
+
+ def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
+ if txn_id is None:
+ fragment = None
+ else:
+ fragment = 'txn-{}'.format(txn_id)
+ return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
+
+ @functools.lru_cache()
+ def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
+ attachment = self.rt.get_attachment(ticket_id, attachment_id)
+ if attachment is None:
+ return None
+ mimetype = attachment.get('ContentType', '')
+ if mimetype.startswith('text/'):
+ return self._ticket_url(ticket_id, attachment['Transaction'])
+ else:
+ filename = attachment.get('Filename', '')
+ if not filename:
+ filename = 'RT{} attachment {}{}'.format(
+ ticket_id,
+ attachment_id,
+ mimetypes.guess_extension(mimetype) or '.bin',
+ )
+ path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
+ attachment,
+ filename,
+ )
+ return self._extend_url(path_tail)
+
+ def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
+ return self.url(ticket_id, attachment_id) is not None
+
+ @functools.lru_cache()
+ def ticket_url(self, ticket_id: RTId) -> Optional[str]:
+ if self.rt.get_ticket(ticket_id) is None:
+ return None
+ return self._ticket_url(ticket_id)
+
+ def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
+ if attachment_id is None:
+ return self.ticket_url(ticket_id)
+ else:
+ return self.attachment_url(ticket_id, attachment_id)
diff --git a/tests/test_rtutil.py b/tests/test_rtutil.py
new file mode 100644
index 0000000..accb7af
--- /dev/null
+++ b/tests/test_rtutil.py
@@ -0,0 +1,99 @@
+"""Test RT integration"""
+# Copyright © 2020 Brett Smith
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import pytest
+
+from . import testutil
+
+from conservancy_beancount import rtutil
+
+DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
+
+EXPECTED_URLS = [
+ (1, None, 'Ticket/Display.html?id=1'),
+ (1, 2, 'Ticket/Display.html?id=1#txn-1'),
+ (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
+ (1, 99, None),
+ (2, 1, None),
+ (2, 10, 'Ticket/Attachment/7/10/screenshot.png'),
+ (2, 13, 'Ticket/Display.html?id=2#txn-11'),
+ (2, 14, 'Ticket/Display.html?id=2#txn-11'), # statement.txt
+ (3, None, 'Ticket/Display.html?id=3'),
+ (9, None, None),
+]
+
+@pytest.fixture
+def rt():
+ client = testutil.RTClient()
+ return rtutil.RT(client)
+
+@pytest.fixture
+def new_client():
+ class RTClient(testutil.RTClient):
+ TICKET_DATA = {'1': [], '2': []}
+ return RTClient()
+
+@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
+def test_url(rt, ticket_id, attachment_id, expected):
+ if expected is not None:
+ expected = DEFAULT_RT_URL + expected
+ assert rt.url(ticket_id, attachment_id) == expected
+
+@pytest.mark.parametrize('attachment_id', [
+ 3,
+ None,
+])
+def test_url_caches(new_client, attachment_id):
+ new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
+ if attachment_id is None:
+ fragment = ''
+ else:
+ fragment = '#txn-3'
+ expected = '{}Ticket/Display.html?id=1{}'.format(DEFAULT_RT_URL, fragment)
+ rt = rtutil.RT(new_client)
+ assert rt.url(1, attachment_id) == expected
+ new_client.TICKET_DATA.clear()
+ assert rt.url(1, attachment_id) == expected
+
+@pytest.mark.parametrize('mimetype,extension', [
+ ('application/pdf', 'pdf'),
+ ('image/png', 'png'),
+ ('message/rfc822', 'eml'),
+ ('x-test/x-unknown', 'bin'),
+])
+def test_url_default_filename(new_client, mimetype, extension):
+ new_client.TICKET_DATA['1'].append(('9', '(Unnamed)', mimetype, '50.5k'))
+ rt = rtutil.RT(new_client)
+ expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
+ assert rt.url(1, 9) == expected
+
+@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
+def test_exists(rt, ticket_id, attachment_id, expected):
+ expected = False if expected is None else True
+ assert rt.exists(ticket_id, attachment_id) is expected
+
+def test_exists_caches(new_client):
+ new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
+ rt = rtutil.RT(new_client)
+ assert rt.exists(1, 3)
+ assert rt.exists(2)
+ assert not rt.exists(1, 9)
+ assert not rt.exists(9)
+ new_client.TICKET_DATA.clear()
+ assert rt.exists(1, 3)
+ assert rt.exists(2)
+ assert not rt.exists(1, 9)
+ assert not rt.exists(9)
diff --git a/tests/testutil.py b/tests/testutil.py
index bb30ffd..c85e901 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -15,6 +15,7 @@
# along with this program. If not, see .
import datetime
+import itertools
import beancount.core.amount as bc_amount
import beancount.core.data as bc_data
@@ -114,9 +115,54 @@ class TestConfig:
return self.repo_path
+class _TicketBuilder:
+ MESSAGE_ATTACHMENTS = [
+ ('(Unnamed)', 'multipart/alternative', '0b'),
+ ('(Unnamed)', 'text/plain', '1.2k'),
+ ('(Unnamed)', 'text/html', '1.4k'),
+ ]
+ MISC_ATTACHMENTS = [
+ ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
+ ('photo.jpg', 'image/jpeg', '65.2k'),
+ ('document.pdf', 'application/pdf', '326k'),
+ ('screenshot.png', 'image/png', '1.9m'),
+ ('statement.txt', 'text/plain', '652b'),
+ ]
+
+ def __init__(self):
+ self.id_seq = itertools.count(1)
+ self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
+
+ def new_attch(self, attch):
+ return (str(next(self.id_seq)), *attch)
+
+ def new_msg_with_attachments(self, attachments_count=1):
+ for attch in self.MESSAGE_ATTACHMENTS:
+ yield self.new_attch(attch)
+ for _ in range(attachments_count):
+ yield self.new_attch(next(self.misc_attchs))
+
+ def new_messages(self, messages_count, attachments_count=None):
+ for n in range(messages_count):
+ if attachments_count is None:
+ att_count = messages_count - n
+ else:
+ att_count = attachments_count
+ yield from self.new_msg_with_attachments(att_count)
+
+
class RTClient:
+ _builder = _TicketBuilder()
+ DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
+ TICKET_DATA = {
+ '1': list(_builder.new_messages(1, 3)),
+ '2': list(_builder.new_messages(2, 1)),
+ '3': list(_builder.new_messages(3, 0)),
+ }
+ del _builder
+
def __init__(self,
- url,
+ url=DEFAULT_URL,
default_login=None,
default_password=None,
proxy=None,
@@ -145,3 +191,44 @@ class RTClient:
self.login_result = bool(login and password and not password.startswith('bad'))
self.last_login = (login, password, self.login_result)
return self.login_result
+
+ def get_attachments(self, ticket_id):
+ try:
+ return list(self.TICKET_DATA[str(ticket_id)])
+ except KeyError:
+ return None
+
+ def get_attachment(self, ticket_id, attachment_id):
+ try:
+ att_seq = iter(self.TICKET_DATA[str(ticket_id)])
+ except KeyError:
+ None
+ att_id = str(attachment_id)
+ multipart_id = None
+ for attch in att_seq:
+ if attch[0] == att_id:
+ break
+ elif attch[2].startswith('multipart/'):
+ multipart_id = attch[0]
+ else:
+ return None
+ tx_id = multipart_id or att_id
+ if attch[1] == '(Unnamed)':
+ filename = ''
+ else:
+ filename = attch[1]
+ return {
+ 'id': att_id,
+ 'ContentType': attch[2],
+ 'Filename': filename,
+ 'Transaction': tx_id,
+ }
+
+ def get_ticket(self, ticket_id):
+ ticket_id_s = str(ticket_id)
+ if ticket_id_s not in self.TICKET_DATA:
+ return None
+ return {
+ 'id': 'ticket/{}'.format(ticket_id_s),
+ 'numerical_id': ticket_id_s,
+ }