"""RT client utilities""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import functools import mimetypes import os import re import sqlite3 import urllib.parse as urlparse import rt from pathlib import Path from . import data from beancount.core import data as bc_data from typing import ( overload, Callable, Iterable, Iterator, MutableMapping, Optional, Set, Tuple, Union, ) from .beancount_types import ( Transaction, ) RTId = Union[int, str] TicketAttachmentIds = Tuple[str, Optional[str]] _LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]] _URLLookup = Callable[..., Optional[str]] class RTLinkCache(_LinkCache): """Cache RT links to disk This class provides a dict-like interface to a cache of RT links. Once an object is in RT, a link to it should never change. The only exception is when objects get shredded, and those objects shouldn't be referenced in books anyway. This implementation is backed by a sqlite database. You can call:: db = RTLinkCache.setup(path) This method will try to open a sqlite database at the given path, and set up necessary tables, etc. If it succeeds, it returns a database connection you can use to initialize the cache. If it fails, it returns None, and the caller should use some other dict-like object (like a normal dict) for caching. You can give the result to the RT utility class either way, and it will do the right thing for itself:: rt = RT(rt_client, db) """ CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache( ticket_id TEXT NOT NULL, attachment_id TEXT, url TEXT NOT NULL, PRIMARY KEY (ticket_id, attachment_id) )""" @classmethod def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]: try: db = sqlite3.connect(os.fspath(cache_path), isolation_level=None) cursor = db.cursor() cursor.execute(cls.CREATE_TABLE_SQL) cursor.execute('SELECT url FROM RTLinkCache LIMIT 1') have_data = cursor.fetchone() is not None except sqlite3.OperationalError: # If we couldn't get this far, sqlite provides no benefit. return None try: # There shouldn't be any records where url is NULL, so running this # DELETE pulls double duty for us: it tells us whether or not we # can write to the database and it enforces database integrity. cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL') except sqlite3.OperationalError: can_write = False else: can_write = True if not (can_write or have_data): # If there's nothing to read and no way to write, sqlite provides # no benefit. return None elif not can_write: # Set up an in-memory database that we can write to, seeded with # the data available to read. try: cursor.close() db.close() db = sqlite3.connect(':memory:', isolation_level=None) cursor = db.cursor() cursor.execute('ATTACH DATABASE ? AS readsource', ('{}?mode=ro'.format(cache_path.as_uri()),)) cursor.execute(cls.CREATE_TABLE_SQL) cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache') cursor.execute('DETACH DATABASE readsource') except sqlite3.OperationalError as error: # We're back to the case of having nothing to read and no way # to write. return None cursor.close() db.commit() return db def __init__(self, cache_db: sqlite3.Connection) -> None: self._db = cache_db self._nourls: Set[TicketAttachmentIds] = set() def __iter__(self) -> Iterator[TicketAttachmentIds]: yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache') yield from self._nourls def __len__(self) -> int: cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache') count: int = cursor.fetchone()[0] return count + len(self._nourls) def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]: if key in self._nourls: return None cursor = self._db.execute( 'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?', key, ) row = cursor.fetchone() if row is None: raise KeyError(key) else: retval: str = row[0] return retval def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None: if value is None: self._nourls.add(key) else: ticket_id, attachment_id = key self._db.execute( 'INSERT INTO RTLinkCache VALUES(?, ?, ?)', (ticket_id, attachment_id, value), ) def __delitem__(self, key: TicketAttachmentIds) -> None: raise NotImplementedError("RTLinkCache.__delitem__") class RT: """RT utility wrapper class Given an RT client object, this class provides common functionality for working with RT links in Beancount metadata: * Parse links * Verify that they refer to extant objects in RT * Convert metadata links to RT web links * Cache results, to reduce network requests. You can set up an RTLinkCache to cache links to disks over multiple runs. Refer to RTLinkCache's docstring for details and instructions. """ PARSE_REGEXPS = [ re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'), re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'), ] def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None: urlparts = urlparse.urlparse(rt_client.url) try: index = urlparts.path.rindex('/REST/') except ValueError: base_path = urlparts.path.rstrip('/') + '/' else: base_path = urlparts.path[:index + 1] self.url_base = urlparts._replace(path=base_path) self.rt = rt_client self._cache: _LinkCache if cache_db is None: self._cache = {} else: self._cache = RTLinkCache(cache_db) # mypy complains that the first argument isn't self, but this isn't meant # to be a method, it's just an internal decrator. def _cache_method(func: _URLLookup) -> _URLLookup: # type:ignore[misc] @functools.wraps(func) def caching_wrapper(self: 'RT', ticket_id: RTId, attachment_id: Optional[RTId]=None, ) -> Optional[str]: cache_key = (str(ticket_id), None if attachment_id is None else str(attachment_id)) url: Optional[str] try: url = self._cache[cache_key] except KeyError: if attachment_id is None: url = func(self, ticket_id) else: url = func(self, ticket_id, attachment_id) self._cache[cache_key] = url return url return caching_wrapper def _extend_url(self, path_tail: str, fragment: Optional[str]=None, **query: str, ) -> str: if fragment is None: fragment = self.url_base.fragment else: fragment = urlparse.quote(fragment) if query: query_s = urlparse.urlencode(query) else: query_s = self.url_base.query urlparts = self.url_base._replace( path=self.url_base.path + urlparse.quote(path_tail), query=query_s, fragment=fragment, ) return urlparse.urlunparse(urlparts) def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str: if txn_id is None: fragment = None else: fragment = 'txn-{}'.format(txn_id) return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id)) @_cache_method def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]: attachment = self.rt.get_attachment(ticket_id, attachment_id) if attachment is None: return None mimetype = attachment.get('ContentType', '') if mimetype.startswith('text/'): return self._ticket_url(ticket_id, attachment['Transaction']) else: filename = attachment.get('Filename', '') if not filename: filename = 'RT{} attachment {}{}'.format( ticket_id, attachment_id, mimetypes.guess_extension(mimetype) or '.bin', ) path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format( attachment, filename, ) return self._extend_url(path_tail) def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool: return self.url(ticket_id, attachment_id) is not None def iter_urls(self, links: Iterable[str], rt_fmt: str='{}', nonrt_fmt: str='{}', missing_fmt: str='{}', ) -> Iterator[str]: """Iterate over metadata links, replacing RT references with web URLs This method iterates over metadata link strings (e.g., from Metadata.get_links()) and transforms them for web presentation. If the string is a valid RT reference, the corresponding web URL will be formatted with ``rt_fmt``. If the string is a well-formed RT reference but the object doesn't exist, it will be formatted with ``missing_fmt``. All other link strings will be formatted with ``nonrt_fmt``. """ for link in links: parsed = self.parse(link) if parsed is None: yield nonrt_fmt.format(link) else: ticket_id, attachment_id = parsed url = self.url(ticket_id, attachment_id) if url is None: yield missing_fmt.format(link) else: yield rt_fmt.format(url) @classmethod def metadata_regexp(self, ticket_id: RTId, attachment_id: Optional[RTId]=None, *, first_link_only: bool=False ) -> str: """Return a pattern to find RT links in metadata Given a ticket ID and optional attachment ID, this method returns a regular expression pattern that will find matching RT links in a metadata value string, written in any format. If the keyword-only argument first_link_only is true, the pattern will only match the first link in a metadata string. Otherwise the pattern matches any link in the string (the default). """ if first_link_only: prolog = r'^\s*' else: prolog = r'(?:^|\s)' if attachment_id is None: attachment = '' else: attachment = r'/(?:attachments?/)?{}'.format(attachment_id) ticket = r'rt:(?://ticket/)?{}'.format(ticket_id) epilog = r'/?(?:$|\s)' return f'{prolog}{ticket}{attachment}{epilog}' @classmethod def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]: for regexp in cls.PARSE_REGEXPS: match = regexp.match(s) if match is not None: ticket_id, attachment_id = match.groups() return (ticket_id, attachment_id) return None @_cache_method def ticket_url(self, ticket_id: RTId) -> Optional[str]: if self.rt.get_ticket(ticket_id) is None: return None return self._ticket_url(ticket_id) @overload def _meta_with_urls(self, meta: None, rt_fmt: str, nonrt_fmt: str, missing_fmt: str, ) -> None: ... @overload def _meta_with_urls(self, meta: bc_data.Meta, rt_fmt: str, nonrt_fmt: str, missing_fmt: str, ) -> bc_data.Meta: ... def _meta_with_urls(self, meta: Optional[bc_data.Meta], rt_fmt: str, nonrt_fmt: str, missing_fmt: str, ) -> Optional[bc_data.Meta]: if meta is None: return None link_meta = data.Metadata(meta) retval = meta.copy() for key in data.LINK_METADATA: try: links = link_meta.get_links(key) except TypeError: links = () if links: retval[key] = ' '.join(self.iter_urls( links, rt_fmt, nonrt_fmt, missing_fmt, )) return retval def txn_with_urls(self, txn: Transaction, rt_fmt: str='<{}>', nonrt_fmt: str='{}', missing_fmt: str='{}', ) -> Transaction: """Copy a transaction with RT references replaced with web URLs Given a Beancount Transaction, this method returns a Transaction that's identical, except any references to RT in the metadata for the Transaction and its Postings are replaced with web URLs. This is useful for reporting tools that want to format the transaction with URLs that are recognizable by other tools. The format string arguments have the same meaning as RT.iter_urls(). See that docstring for details. """ # mypy doesn't recognize that postings is a valid argument, probably a # bug in the NamedTuple→Directive→Transaction hierarchy. return txn._replace( # type:ignore[call-arg] meta=self._meta_with_urls(txn.meta, rt_fmt, nonrt_fmt, missing_fmt), postings=[post._replace(meta=self._meta_with_urls( post.meta, rt_fmt, nonrt_fmt, missing_fmt, )) for post in txn.postings], ) def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]: if attachment_id is None: return self.ticket_url(ticket_id) else: return self.attachment_url(ticket_id, attachment_id)