conservancy_beancount/conservancy_beancount/rtutil.py

132 lines
4.4 KiB
Python
Raw Normal View History

"""RT client utilities"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import functools
import mimetypes
import re
import urllib.parse as urlparse
import rt
from typing import (
Optional,
Tuple,
Union,
)
RTId = Union[int, str]
class RT:
"""RT utility wrapper class
Given an RT client object, this class provides common functionality for
working with RT links in Beancount metadata:
* Parse links
* Verify that they refer to extant objects in RT
* Convert metadata links to RT web links
* Cache results, to reduce network requests
"""
PARSE_REGEXPS = [
re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
]
def __init__(self, rt_client: rt.Rt) -> None:
self.rt = rt_client
urlparts = urlparse.urlparse(rt_client.url)
try:
index = urlparts.path.index('/REST/')
except ValueError:
base_path = urlparts.path.rstrip('/') + '/'
else:
base_path = urlparts.path[:index + 1]
self.url_base = urlparts._replace(path=base_path)
def _extend_url(self,
path_tail: str,
fragment: Optional[str]=None,
**query: str,
) -> str:
if fragment is None:
fragment = self.url_base.fragment
else:
fragment = urlparse.quote(fragment)
if query:
query_s = urlparse.urlencode(query)
else:
query_s = self.url_base.query
urlparts = self.url_base._replace(
path=self.url_base.path + urlparse.quote(path_tail),
query=query_s,
fragment=fragment,
)
return urlparse.urlunparse(urlparts)
def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
if txn_id is None:
fragment = None
else:
fragment = 'txn-{}'.format(txn_id)
return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
@functools.lru_cache()
def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
attachment = self.rt.get_attachment(ticket_id, attachment_id)
if attachment is None:
return None
mimetype = attachment.get('ContentType', '')
if mimetype.startswith('text/'):
return self._ticket_url(ticket_id, attachment['Transaction'])
else:
filename = attachment.get('Filename', '')
if not filename:
filename = 'RT{} attachment {}{}'.format(
ticket_id,
attachment_id,
mimetypes.guess_extension(mimetype) or '.bin',
)
path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
attachment,
filename,
)
return self._extend_url(path_tail)
def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
return self.url(ticket_id, attachment_id) is not None
@classmethod
def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
for regexp in cls.PARSE_REGEXPS:
match = regexp.match(s)
if match is not None:
ticket_id, attachment_id = match.groups()
return (ticket_id, attachment_id)
return None
@functools.lru_cache()
def ticket_url(self, ticket_id: RTId) -> Optional[str]:
if self.rt.get_ticket(ticket_id) is None:
return None
return self._ticket_url(ticket_id)
def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
if attachment_id is None:
return self.ticket_url(ticket_id)
else:
return self.attachment_url(ticket_id, attachment_id)