231 lines
7.8 KiB
Python
231 lines
7.8 KiB
Python
"""User configuration for Conservancy bookkeeping tools"""
|
|
# Copyright © 2020 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import configparser
|
|
import datetime
|
|
import decimal
|
|
import functools
|
|
import os
|
|
import re
|
|
import urllib.parse as urlparse
|
|
|
|
import git # type:ignore[import]
|
|
import requests.auth
|
|
import rt
|
|
|
|
from pathlib import Path
|
|
from typing import (
|
|
Mapping,
|
|
NamedTuple,
|
|
Optional,
|
|
Tuple,
|
|
Type,
|
|
)
|
|
|
|
from . import books
|
|
from . import rtutil
|
|
|
|
class RTCredentials(NamedTuple):
|
|
server: Optional[str] = None
|
|
user: Optional[str] = None
|
|
passwd: Optional[str] = None
|
|
auth: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_env(cls) -> 'RTCredentials':
|
|
values = dict(cls._field_defaults)
|
|
for key in values:
|
|
env_key = 'RT{}'.format(key.upper())
|
|
try:
|
|
values[key] = os.environ[env_key]
|
|
except KeyError:
|
|
pass
|
|
return cls(**values)
|
|
|
|
@classmethod
|
|
def from_rtrc(cls) -> 'RTCredentials':
|
|
values = dict(cls._field_defaults)
|
|
rtrc_path = Path.home() / '.rtrc'
|
|
try:
|
|
with rtrc_path.open() as rtrc_file:
|
|
for line in rtrc_file:
|
|
try:
|
|
key, value = line.split(None, 1)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
if key in values:
|
|
values[key] = value.rstrip('\n')
|
|
except OSError:
|
|
return cls()
|
|
else:
|
|
return cls(**values)
|
|
|
|
|
|
class Config:
|
|
_ENVIRON_DEFAULT_PATHS = {
|
|
'XDG_CACHE_HOME': Path('.cache'),
|
|
'XDG_CONFIG_HOME': Path('.config'),
|
|
}
|
|
|
|
def __init__(self) -> None:
|
|
self.file_config = configparser.ConfigParser()
|
|
self.file_config.read_string("[Beancount]\n")
|
|
|
|
def load_file(self, config_path: Optional[Path]=None) -> None:
|
|
if config_path is None:
|
|
config_path = self.config_file_path()
|
|
with config_path.open() as config_file:
|
|
self.file_config.read_file(config_file)
|
|
|
|
def load_string(self, config_str: str) -> None:
|
|
self.file_config.read_string(config_str)
|
|
|
|
def _abspath(self, source: Mapping[str, str], key: str) -> Optional[Path]:
|
|
try:
|
|
retval = Path(source[key])
|
|
except (KeyError, ValueError):
|
|
ok = False
|
|
else:
|
|
if source is not os.environ:
|
|
retval = retval.expanduser()
|
|
ok = retval.is_absolute()
|
|
return retval if ok else None
|
|
|
|
def _dir_or_none(self, path: Path) -> Optional[Path]:
|
|
try:
|
|
path.mkdir(exist_ok=True)
|
|
except OSError:
|
|
return None
|
|
else:
|
|
return path
|
|
|
|
def _path_from_environ(self, key: str, default: Optional[Path]=None) -> Path:
|
|
retval = self._abspath(os.environ, key)
|
|
if retval is None:
|
|
retval = default or (Path.home() / self._ENVIRON_DEFAULT_PATHS[key])
|
|
return retval
|
|
|
|
def books_loader(self) -> Optional[books.Loader]:
|
|
books_path = self.books_path()
|
|
if books_path is None:
|
|
return None
|
|
else:
|
|
return books.Loader(books_path, self.fiscal_year_begin())
|
|
|
|
def books_path(self) -> Optional[Path]:
|
|
return self._abspath(self.file_config['Beancount'], 'books dir')
|
|
|
|
def books_repo(self) -> Optional[git.Repo]:
|
|
"""Return a git.Repo object for the books directory
|
|
|
|
Returns None if the books directory is not a valid Git repository.
|
|
"""
|
|
try:
|
|
return git.Repo(self.file_config['Beancount']['books dir'])
|
|
except (KeyError, git.exc.GitError):
|
|
return None
|
|
|
|
def cache_dir_path(self, name: str='conservancy_beancount') -> Optional[Path]:
|
|
cache_root = self._path_from_environ('XDG_CACHE_HOME')
|
|
return (
|
|
self._dir_or_none(cache_root)
|
|
and self._dir_or_none(cache_root / name)
|
|
)
|
|
|
|
def config_file_path(self, name: str='conservancy_beancount') -> Path:
|
|
config_root = self._path_from_environ('XDG_CONFIG_HOME')
|
|
return Path(config_root, name, 'config.ini')
|
|
|
|
def fiscal_year_begin(self) -> books.FiscalYear:
|
|
s = self.file_config.get('Beancount', 'fiscal year begin', fallback='3 1')
|
|
match = re.match(r'([01]?[0-9])(?:\s*[-./ ]\s*([0-3]?[0-9]))?$', s.strip())
|
|
if match is None:
|
|
raise ValueError(f"fiscal year begin {s!r} has unknown format")
|
|
try:
|
|
month = int(match.group(1))
|
|
day = int(match.group(2) or 1)
|
|
# To check date validity we use an arbitrary year that's
|
|
# 1. definitely using the modern calendar
|
|
# 2. far enough in the past to not have books (pre-Unix epoch)
|
|
# 3. not a leap year
|
|
datetime.date(1959, month, day)
|
|
except ValueError as e:
|
|
raise ValueError(f"fiscal year begin {s!r} is invalid date: {e.args[0]}")
|
|
else:
|
|
return books.FiscalYear(month, day)
|
|
|
|
def payment_threshold(self) -> decimal.Decimal:
|
|
try:
|
|
return decimal.Decimal(self.file_config['Beancount']['payment threshold'])
|
|
except (KeyError, ValueError):
|
|
return decimal.Decimal(10)
|
|
|
|
def repository_path(self) -> Optional[Path]:
|
|
retval = self._abspath(self.file_config['Beancount'], 'repository dir')
|
|
if retval is None:
|
|
retval = self._abspath(os.environ, 'CONSERVANCY_REPOSITORY')
|
|
return retval
|
|
|
|
def rt_credentials(self) -> RTCredentials:
|
|
all_creds = zip(
|
|
RTCredentials.from_env(),
|
|
RTCredentials.from_rtrc(),
|
|
RTCredentials(auth='rt'),
|
|
)
|
|
return RTCredentials._make(v0 or v1 or v2 for v0, v1, v2 in all_creds)
|
|
|
|
def rt_client(self,
|
|
credentials: RTCredentials=None,
|
|
client: Type[rt.Rt]=rt.Rt,
|
|
) -> Optional[rt.Rt]:
|
|
if credentials is None:
|
|
credentials = self.rt_credentials()
|
|
if credentials.server is None:
|
|
return None
|
|
urlparts = urlparse.urlparse(credentials.server)
|
|
rest_path = urlparts.path.rstrip('/') + '/REST/1.0/'
|
|
url = urlparse.urlunparse(urlparts._replace(path=rest_path))
|
|
if credentials.auth == 'basic':
|
|
auth = requests.auth.HTTPBasicAuth(credentials.user, credentials.passwd)
|
|
retval = client(url, http_auth=auth)
|
|
else:
|
|
retval = client(url, credentials.user, credentials.passwd)
|
|
if retval.login():
|
|
return retval
|
|
else:
|
|
return None
|
|
|
|
@functools.lru_cache(4)
|
|
def _rt_wrapper(self, credentials: RTCredentials, client: Type[rt.Rt]) -> Optional[rtutil.RT]:
|
|
wrapper_client = self.rt_client(credentials, client)
|
|
if wrapper_client is None:
|
|
return None
|
|
cache_dir_path = self.cache_dir_path()
|
|
if cache_dir_path is None:
|
|
cache_db = None
|
|
else:
|
|
cache_name = '{}@{}.sqlite3'.format(
|
|
credentials.user,
|
|
urlparse.quote(str(credentials.server), ''),
|
|
)
|
|
cache_path = cache_dir_path / cache_name
|
|
try:
|
|
cache_path.touch(0o600)
|
|
except OSError:
|
|
# RTLinkCache.setup() will handle the problem.
|
|
pass
|
|
cache_db = rtutil.RTLinkCache.setup(cache_path)
|
|
return rtutil.RT(wrapper_client, cache_db)
|
|
|
|
def rt_wrapper(self,
|
|
credentials: RTCredentials=None,
|
|
client: Type[rt.Rt]=rt.Rt,
|
|
) -> Optional[rtutil.RT]:
|
|
if credentials is None:
|
|
credentials = self.rt_credentials()
|
|
return self._rt_wrapper(credentials, client)
|