rtutil: Start module.

For now, this is basically the Python version of
ledger-tag-convert.plx.  It knows how to create RT web links from
ticket and attachment IDs.  It confirms that those objects actually
exist too.  It may grow to encompass other functionality in the
future.
This commit is contained in:
Brett Smith 2020-03-24 17:23:54 -04:00
parent d49173725a
commit d5a6141f6d
3 changed files with 294 additions and 1 deletions

View file

@ -0,0 +1,107 @@
"""RT client utilities"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import functools
import mimetypes
import urllib.parse as urlparse
import rt
from typing import (
Mapping,
Optional,
Tuple,
Union,
)
AttachmentTuple = Tuple[str, str, str, str]
RTId = Union[int, str]
class RT:
def __init__(self, rt_client: rt.Rt) -> None:
self.rt = rt_client
urlparts = urlparse.urlparse(rt_client.url)
try:
index = urlparts.path.index('/REST/')
except ValueError:
base_path = urlparts.path.rstrip('/') + '/'
else:
base_path = urlparts.path[:index + 1]
self.url_base = urlparts._replace(path=base_path)
def _extend_url(self,
path_tail: str,
fragment: Optional[str]=None,
**query: str,
) -> str:
if fragment is None:
fragment = self.url_base.fragment
else:
fragment = urlparse.quote(fragment)
if query:
query_s = urlparse.urlencode(query)
else:
query_s = self.url_base.query
urlparts = self.url_base._replace(
path=self.url_base.path + urlparse.quote(path_tail),
query=query_s,
fragment=fragment,
)
return urlparse.urlunparse(urlparts)
def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
if txn_id is None:
fragment = None
else:
fragment = 'txn-{}'.format(txn_id)
return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
@functools.lru_cache()
def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
attachment = self.rt.get_attachment(ticket_id, attachment_id)
if attachment is None:
return None
mimetype = attachment.get('ContentType', '')
if mimetype.startswith('text/'):
return self._ticket_url(ticket_id, attachment['Transaction'])
else:
filename = attachment.get('Filename', '')
if not filename:
filename = 'RT{} attachment {}{}'.format(
ticket_id,
attachment_id,
mimetypes.guess_extension(mimetype) or '.bin',
)
path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
attachment,
filename,
)
return self._extend_url(path_tail)
def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
return self.url(ticket_id, attachment_id) is not None
@functools.lru_cache()
def ticket_url(self, ticket_id: RTId) -> Optional[str]:
if self.rt.get_ticket(ticket_id) is None:
return None
return self._ticket_url(ticket_id)
def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
if attachment_id is None:
return self.ticket_url(ticket_id)
else:
return self.attachment_url(ticket_id, attachment_id)

99
tests/test_rtutil.py Normal file
View file

@ -0,0 +1,99 @@
"""Test RT integration"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import pytest
from . import testutil
from conservancy_beancount import rtutil
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
EXPECTED_URLS = [
(1, None, 'Ticket/Display.html?id=1'),
(1, 2, 'Ticket/Display.html?id=1#txn-1'),
(1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
(1, 99, None),
(2, 1, None),
(2, 10, 'Ticket/Attachment/7/10/screenshot.png'),
(2, 13, 'Ticket/Display.html?id=2#txn-11'),
(2, 14, 'Ticket/Display.html?id=2#txn-11'), # statement.txt
(3, None, 'Ticket/Display.html?id=3'),
(9, None, None),
]
@pytest.fixture
def rt():
client = testutil.RTClient()
return rtutil.RT(client)
@pytest.fixture
def new_client():
class RTClient(testutil.RTClient):
TICKET_DATA = {'1': [], '2': []}
return RTClient()
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
if expected is not None:
expected = DEFAULT_RT_URL + expected
assert rt.url(ticket_id, attachment_id) == expected
@pytest.mark.parametrize('attachment_id', [
3,
None,
])
def test_url_caches(new_client, attachment_id):
new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
if attachment_id is None:
fragment = ''
else:
fragment = '#txn-3'
expected = '{}Ticket/Display.html?id=1{}'.format(DEFAULT_RT_URL, fragment)
rt = rtutil.RT(new_client)
assert rt.url(1, attachment_id) == expected
new_client.TICKET_DATA.clear()
assert rt.url(1, attachment_id) == expected
@pytest.mark.parametrize('mimetype,extension', [
('application/pdf', 'pdf'),
('image/png', 'png'),
('message/rfc822', 'eml'),
('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
new_client.TICKET_DATA['1'].append(('9', '(Unnamed)', mimetype, '50.5k'))
rt = rtutil.RT(new_client)
expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
assert rt.url(1, 9) == expected
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
expected = False if expected is None else True
assert rt.exists(ticket_id, attachment_id) is expected
def test_exists_caches(new_client):
new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
rt = rtutil.RT(new_client)
assert rt.exists(1, 3)
assert rt.exists(2)
assert not rt.exists(1, 9)
assert not rt.exists(9)
new_client.TICKET_DATA.clear()
assert rt.exists(1, 3)
assert rt.exists(2)
assert not rt.exists(1, 9)
assert not rt.exists(9)

View file

@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
import itertools
import beancount.core.amount as bc_amount
import beancount.core.data as bc_data
@ -114,9 +115,54 @@ class TestConfig:
return self.repo_path
class _TicketBuilder:
MESSAGE_ATTACHMENTS = [
('(Unnamed)', 'multipart/alternative', '0b'),
('(Unnamed)', 'text/plain', '1.2k'),
('(Unnamed)', 'text/html', '1.4k'),
]
MISC_ATTACHMENTS = [
('Forwarded Message.eml', 'message/rfc822', '3.1k'),
('photo.jpg', 'image/jpeg', '65.2k'),
('document.pdf', 'application/pdf', '326k'),
('screenshot.png', 'image/png', '1.9m'),
('statement.txt', 'text/plain', '652b'),
]
def __init__(self):
self.id_seq = itertools.count(1)
self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
def new_attch(self, attch):
return (str(next(self.id_seq)), *attch)
def new_msg_with_attachments(self, attachments_count=1):
for attch in self.MESSAGE_ATTACHMENTS:
yield self.new_attch(attch)
for _ in range(attachments_count):
yield self.new_attch(next(self.misc_attchs))
def new_messages(self, messages_count, attachments_count=None):
for n in range(messages_count):
if attachments_count is None:
att_count = messages_count - n
else:
att_count = attachments_count
yield from self.new_msg_with_attachments(att_count)
class RTClient:
_builder = _TicketBuilder()
DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
TICKET_DATA = {
'1': list(_builder.new_messages(1, 3)),
'2': list(_builder.new_messages(2, 1)),
'3': list(_builder.new_messages(3, 0)),
}
del _builder
def __init__(self,
url,
url=DEFAULT_URL,
default_login=None,
default_password=None,
proxy=None,
@ -145,3 +191,44 @@ class RTClient:
self.login_result = bool(login and password and not password.startswith('bad'))
self.last_login = (login, password, self.login_result)
return self.login_result
def get_attachments(self, ticket_id):
try:
return list(self.TICKET_DATA[str(ticket_id)])
except KeyError:
return None
def get_attachment(self, ticket_id, attachment_id):
try:
att_seq = iter(self.TICKET_DATA[str(ticket_id)])
except KeyError:
None
att_id = str(attachment_id)
multipart_id = None
for attch in att_seq:
if attch[0] == att_id:
break
elif attch[2].startswith('multipart/'):
multipart_id = attch[0]
else:
return None
tx_id = multipart_id or att_id
if attch[1] == '(Unnamed)':
filename = ''
else:
filename = attch[1]
return {
'id': att_id,
'ContentType': attch[2],
'Filename': filename,
'Transaction': tx_id,
}
def get_ticket(self, ticket_id):
ticket_id_s = str(ticket_id)
if ticket_id_s not in self.TICKET_DATA:
return None
return {
'id': 'ticket/{}'.format(ticket_id_s),
'numerical_id': ticket_id_s,
}