442 lines
16 KiB
Python
442 lines
16 KiB
Python
"""RT client utilities"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import functools
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import urllib.parse as urlparse
|
|
|
|
import rt
|
|
|
|
from pathlib import Path
|
|
|
|
from . import data
|
|
from beancount.core import data as bc_data
|
|
|
|
from typing import (
|
|
overload,
|
|
Callable,
|
|
Iterable,
|
|
Iterator,
|
|
MutableMapping,
|
|
Optional,
|
|
Set,
|
|
Tuple,
|
|
Union,
|
|
)
|
|
from .beancount_types import (
|
|
Transaction,
|
|
)
|
|
|
|
RTId = Union[int, str]
|
|
TicketAttachmentIds = Tuple[str, Optional[str]]
|
|
_LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]]
|
|
_URLLookup = Callable[..., Optional[str]]
|
|
|
|
class RTLinkCache(_LinkCache):
|
|
"""Cache RT links to disk
|
|
|
|
This class provides a dict-like interface to a cache of RT links.
|
|
Once an object is in RT, a link to it should never change.
|
|
The only exception is when objects get shredded, and those objects
|
|
shouldn't be referenced in books anyway.
|
|
|
|
This implementation is backed by a sqlite database. You can call::
|
|
|
|
db = RTLinkCache.setup(path)
|
|
|
|
This method will try to open a sqlite database at the given path,
|
|
and set up necessary tables, etc.
|
|
If it succeeds, it returns a database connection you can use to
|
|
initialize the cache.
|
|
If it fails, it returns None, and the caller should use some other
|
|
dict-like object (like a normal dict) for caching.
|
|
You can give the result to the RT utility class either way,
|
|
and it will do the right thing for itself::
|
|
|
|
rt = RT(rt_client, db)
|
|
"""
|
|
|
|
CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache(
|
|
ticket_id TEXT NOT NULL,
|
|
attachment_id TEXT,
|
|
url TEXT NOT NULL,
|
|
PRIMARY KEY (ticket_id, attachment_id)
|
|
)"""
|
|
logger = logging.getLogger('conservancy_beancount.rtutil.RTLinkCache')
|
|
|
|
@classmethod
|
|
def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]:
|
|
try:
|
|
db = sqlite3.connect(os.fspath(cache_path), isolation_level=None)
|
|
cursor = db.cursor()
|
|
cursor.execute(cls.CREATE_TABLE_SQL)
|
|
cursor.execute('SELECT url FROM RTLinkCache LIMIT 1')
|
|
have_data = cursor.fetchone() is not None
|
|
except sqlite3.OperationalError:
|
|
# If we couldn't get this far, sqlite provides no benefit.
|
|
cls.logger.debug("setup: error loading %s", cache_path, exc_info=True)
|
|
return None
|
|
try:
|
|
# There shouldn't be any records where url is NULL, so running this
|
|
# DELETE pulls double duty for us: it tells us whether or not we
|
|
# can write to the database and it enforces database integrity.
|
|
cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL')
|
|
except sqlite3.OperationalError:
|
|
cls.logger.debug("setup: error writing %s", cache_path, exc_info=True)
|
|
can_write = False
|
|
else:
|
|
can_write = True
|
|
if not (can_write or have_data):
|
|
# If there's nothing to read and no way to write, sqlite provides
|
|
# no benefit.
|
|
cls.logger.debug("setup: not using %s: nothing to read or write", cache_path)
|
|
return None
|
|
elif not can_write:
|
|
# Set up an in-memory database that we can write to, seeded with
|
|
# the data available to read.
|
|
try:
|
|
cursor.close()
|
|
db.close()
|
|
db = sqlite3.connect(':memory:', isolation_level=None)
|
|
cursor = db.cursor()
|
|
# It would better to use
|
|
# '{}?mode=ro'.format(cache_path.as_uri())
|
|
# as the argument here, but that doesn't work on SUSE 15,
|
|
# possibly because its sqlite doesn't recognize query
|
|
# arguments (added to upstream sqlite in late 2016).
|
|
cursor.execute('ATTACH DATABASE ? AS readsource',
|
|
(os.fspath(cache_path),))
|
|
cursor.execute(cls.CREATE_TABLE_SQL)
|
|
cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
|
|
cursor.execute('DETACH DATABASE readsource')
|
|
except sqlite3.OperationalError as error:
|
|
# We're back to the case of having nothing to read and no way
|
|
# to write.
|
|
cls.logger.debug("setup: error loading %s into memory", cache_path, exc_info=True)
|
|
return None
|
|
else:
|
|
cls.logger.debug("setup: loaded %s into memory", cache_path)
|
|
else:
|
|
cls.logger.debug("setup: caching at %s", cache_path)
|
|
cursor.close()
|
|
db.commit()
|
|
return db
|
|
|
|
def __init__(self, cache_db: sqlite3.Connection) -> None:
|
|
self._db = cache_db
|
|
self._nourls: Set[TicketAttachmentIds] = set()
|
|
|
|
def __iter__(self) -> Iterator[TicketAttachmentIds]:
|
|
yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
|
|
yield from self._nourls
|
|
|
|
def __len__(self) -> int:
|
|
cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache')
|
|
count: int = cursor.fetchone()[0]
|
|
return count + len(self._nourls)
|
|
|
|
def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]:
|
|
if key in self._nourls:
|
|
return None
|
|
cursor = self._db.execute(
|
|
'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
|
|
key,
|
|
)
|
|
row = cursor.fetchone()
|
|
if row is None:
|
|
raise KeyError(key)
|
|
else:
|
|
retval: str = row[0]
|
|
return retval
|
|
|
|
def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
|
|
if value is None:
|
|
self._nourls.add(key)
|
|
else:
|
|
ticket_id, attachment_id = key
|
|
self._db.execute(
|
|
'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
|
|
(ticket_id, attachment_id, value),
|
|
)
|
|
|
|
def __delitem__(self, key: TicketAttachmentIds) -> None:
|
|
raise NotImplementedError("RTLinkCache.__delitem__")
|
|
|
|
|
|
class RT:
|
|
"""RT utility wrapper class
|
|
|
|
Given an RT client object, this class provides common functionality for
|
|
working with RT links in Beancount metadata:
|
|
|
|
* Parse links
|
|
* Verify that they refer to extant objects in RT
|
|
* Convert metadata links to RT web links
|
|
* Cache results, to reduce network requests.
|
|
You can set up an RTLinkCache to cache links to disks over multiple runs.
|
|
Refer to RTLinkCache's docstring for details and instructions.
|
|
"""
|
|
|
|
PARSE_REGEXPS = [
|
|
re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
|
|
re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
|
|
]
|
|
|
|
def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
|
|
urlparts = urlparse.urlparse(rt_client.url)
|
|
try:
|
|
index = urlparts.path.rindex('/REST/')
|
|
except ValueError:
|
|
base_path = urlparts.path.rstrip('/') + '/'
|
|
else:
|
|
base_path = urlparts.path[:index + 1]
|
|
self.url_base = urlparts._replace(path=base_path)
|
|
self.rt = rt_client
|
|
self._cache: _LinkCache
|
|
if cache_db is None:
|
|
self._cache = {}
|
|
else:
|
|
self._cache = RTLinkCache(cache_db)
|
|
|
|
# mypy complains that the first argument isn't self, but this isn't meant
|
|
# to be a method, it's just an internal decrator.
|
|
def _cache_method(func: _URLLookup) -> _URLLookup: # type:ignore[misc]
|
|
@functools.wraps(func)
|
|
def caching_wrapper(self: 'RT',
|
|
ticket_id: RTId,
|
|
attachment_id: Optional[RTId]=None,
|
|
) -> Optional[str]:
|
|
cache_key = (str(ticket_id),
|
|
None if attachment_id is None else str(attachment_id))
|
|
url: Optional[str]
|
|
try:
|
|
url = self._cache[cache_key]
|
|
except KeyError:
|
|
if attachment_id is None:
|
|
url = func(self, ticket_id)
|
|
else:
|
|
url = func(self, ticket_id, attachment_id)
|
|
self._cache[cache_key] = url
|
|
return url
|
|
return caching_wrapper
|
|
|
|
def _extend_url(self,
|
|
path_tail: str,
|
|
fragment: Optional[str]=None,
|
|
**query: str,
|
|
) -> str:
|
|
if fragment is None:
|
|
fragment = self.url_base.fragment
|
|
else:
|
|
fragment = urlparse.quote(fragment)
|
|
if query:
|
|
query_s = urlparse.urlencode(query)
|
|
else:
|
|
query_s = self.url_base.query
|
|
urlparts = self.url_base._replace(
|
|
path=self.url_base.path + urlparse.quote(path_tail),
|
|
query=query_s,
|
|
fragment=fragment,
|
|
)
|
|
return urlparse.urlunparse(urlparts)
|
|
|
|
def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
|
|
if txn_id is None:
|
|
fragment = None
|
|
else:
|
|
fragment = 'txn-{}'.format(txn_id)
|
|
return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
|
|
|
|
@_cache_method
|
|
def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
|
|
attachment = self.rt.get_attachment(ticket_id, attachment_id)
|
|
if attachment is None:
|
|
return None
|
|
mimetype = attachment.get('ContentType', '')
|
|
if mimetype.startswith('text/'):
|
|
return self._ticket_url(ticket_id, attachment['Transaction'])
|
|
else:
|
|
filename = attachment.get('Filename', '')
|
|
if not filename:
|
|
filename = 'RT{} attachment {}{}'.format(
|
|
ticket_id,
|
|
attachment_id,
|
|
mimetypes.guess_extension(mimetype) or '.bin',
|
|
)
|
|
path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
|
|
attachment,
|
|
filename,
|
|
)
|
|
return self._extend_url(path_tail)
|
|
|
|
def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
|
|
return self.url(ticket_id, attachment_id) is not None
|
|
|
|
def iter_urls(self,
|
|
links: Iterable[str],
|
|
rt_fmt: str='{}',
|
|
nonrt_fmt: str='{}',
|
|
missing_fmt: str='{}',
|
|
) -> Iterator[str]:
|
|
"""Iterate over metadata links, replacing RT references with web URLs
|
|
|
|
This method iterates over metadata link strings (e.g., from
|
|
Metadata.get_links()) and transforms them for web presentation.
|
|
|
|
If the string is a valid RT reference, the corresponding web URL
|
|
will be formatted with ``rt_fmt``.
|
|
|
|
If the string is a well-formed RT reference but the object doesn't
|
|
exist, it will be formatted with ``missing_fmt``.
|
|
|
|
All other link strings will be formatted with ``nonrt_fmt``.
|
|
|
|
"""
|
|
for link in links:
|
|
parsed = self.parse(link)
|
|
if parsed is None:
|
|
yield nonrt_fmt.format(link)
|
|
else:
|
|
ticket_id, attachment_id = parsed
|
|
url = self.url(ticket_id, attachment_id)
|
|
if url is None:
|
|
yield missing_fmt.format(link)
|
|
else:
|
|
yield rt_fmt.format(url)
|
|
|
|
@classmethod
|
|
def metadata_regexp(self,
|
|
ticket_id: RTId,
|
|
attachment_id: Optional[RTId]=None,
|
|
*,
|
|
first_link_only: bool=False
|
|
) -> str:
|
|
"""Return a pattern to find RT links in metadata
|
|
|
|
Given a ticket ID and optional attachment ID, this method returns a
|
|
regular expression pattern that will find matching RT links in a
|
|
metadata value string, written in any format.
|
|
|
|
If the keyword-only argument first_link_only is true, the pattern will
|
|
only match the first link in a metadata string. Otherwise the pattern
|
|
matches any link in the string (the default).
|
|
"""
|
|
if first_link_only:
|
|
prolog = r'^\s*'
|
|
else:
|
|
prolog = r'(?:^|\s)'
|
|
if attachment_id is None:
|
|
attachment = ''
|
|
else:
|
|
attachment = r'/(?:attachments?/)?{}'.format(attachment_id)
|
|
ticket = r'rt:(?://ticket/)?{}'.format(ticket_id)
|
|
epilog = r'/?(?:$|\s)'
|
|
return f'{prolog}{ticket}{attachment}{epilog}'
|
|
|
|
@classmethod
|
|
def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
|
|
for regexp in cls.PARSE_REGEXPS:
|
|
match = regexp.match(s)
|
|
if match is not None:
|
|
ticket_id, attachment_id = match.groups()
|
|
return (ticket_id, attachment_id)
|
|
return None
|
|
|
|
@_cache_method
|
|
def ticket_url(self, ticket_id: RTId) -> Optional[str]:
|
|
if self.rt.get_ticket(ticket_id) is None:
|
|
return None
|
|
return self._ticket_url(ticket_id)
|
|
|
|
@overload
|
|
def _meta_with_urls(self,
|
|
meta: None,
|
|
rt_fmt: str,
|
|
nonrt_fmt: str,
|
|
missing_fmt: str,
|
|
) -> None: ...
|
|
|
|
@overload
|
|
def _meta_with_urls(self,
|
|
meta: bc_data.Meta,
|
|
rt_fmt: str,
|
|
nonrt_fmt: str,
|
|
missing_fmt: str,
|
|
) -> bc_data.Meta: ...
|
|
|
|
def _meta_with_urls(self,
|
|
meta: Optional[bc_data.Meta],
|
|
rt_fmt: str,
|
|
nonrt_fmt: str,
|
|
missing_fmt: str,
|
|
) -> Optional[bc_data.Meta]:
|
|
if meta is None:
|
|
return None
|
|
link_meta = data.Metadata(meta)
|
|
retval = meta.copy()
|
|
for key in data.LINK_METADATA:
|
|
try:
|
|
links = link_meta.get_links(key)
|
|
except TypeError:
|
|
links = ()
|
|
if links:
|
|
retval[key] = ' '.join(self.iter_urls(
|
|
links, rt_fmt, nonrt_fmt, missing_fmt,
|
|
))
|
|
return retval
|
|
|
|
def txn_with_urls(self, txn: Transaction,
|
|
rt_fmt: str='<{}>',
|
|
nonrt_fmt: str='{}',
|
|
missing_fmt: str='{}',
|
|
) -> Transaction:
|
|
"""Copy a transaction with RT references replaced with web URLs
|
|
|
|
Given a Beancount Transaction, this method returns a Transaction
|
|
that's identical, except any references to RT in the metadata for
|
|
the Transaction and its Postings are replaced with web URLs.
|
|
This is useful for reporting tools that want to format the
|
|
transaction with URLs that are recognizable by other tools.
|
|
|
|
The format string arguments have the same meaning as RT.iter_urls().
|
|
See that docstring for details.
|
|
"""
|
|
# mypy doesn't recognize that postings is a valid argument, probably a
|
|
# bug in the NamedTuple→Directive→Transaction hierarchy.
|
|
return txn._replace( # type:ignore[call-arg]
|
|
meta=self._meta_with_urls(txn.meta, rt_fmt, nonrt_fmt, missing_fmt),
|
|
postings=[post._replace(meta=self._meta_with_urls(
|
|
post.meta, rt_fmt, nonrt_fmt, missing_fmt,
|
|
)) for post in txn.postings],
|
|
)
|
|
|
|
@classmethod
|
|
def unparse(cls, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> str:
|
|
"""Return a metadata link string for the given ticket+attachment id"""
|
|
if attachment_id is None:
|
|
return f'rt:{ticket_id}'
|
|
else:
|
|
return f'rt:{ticket_id}/{attachment_id}'
|
|
|
|
def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
|
|
if attachment_id is None:
|
|
return self.ticket_url(ticket_id)
|
|
else:
|
|
return self.attachment_url(ticket_id, attachment_id)
|