
This makes it easier to iterate on a query because you don't have to restart the program and reload the books if something goes sideways.
969 lines
35 KiB
Python
969 lines
35 KiB
Python
"""query.py - Report arbitrary queries with advanced loading and formatting
|
|
|
|
This tool extends Beancount's bean-query with the following:
|
|
|
|
* ``META_DOCS("meta-key")`` function: Given the name of metadata with
|
|
documentation in it, returns a list of the documentation links.
|
|
|
|
* ``STR_META("meta-key")`` function: Looks up metadata like ANY_META, then
|
|
returns the result as a nicely formatted string, to make sorting easier or
|
|
just provide prettier reports.
|
|
|
|
* ``SET()`` function: Returns only unique arguments over a GROUP BY.
|
|
|
|
* ODS output format, with proper formatting of currency and documentation links.
|
|
|
|
* Loads books from your configuration file, with options available to specify
|
|
which year(s) to load.
|
|
|
|
* Can load rewrite rules just like other reports. Your queries will show the
|
|
transformed entries.
|
|
|
|
* Improved error reporting.
|
|
|
|
Run it like bean-query, except instead of specifying a books filename, use
|
|
date options like ``--begin``, ``--end``, ``--fy``, and ``--cy`` to specify
|
|
the year(s) you want to load. Run ``query-report --help`` for details about
|
|
those.
|
|
|
|
Start an interactive shell::
|
|
|
|
query-report [year options]
|
|
|
|
Write a spreadsheet with results for one query::
|
|
|
|
query-report [year options] [-O OUTPUT.ods] <query string>
|
|
|
|
query-report [year options] [-O OUTPUT.ods] < QUERY_FILE.bql
|
|
|
|
query-report also accepts all the same options as bean-query, like ``--format``
|
|
and ``--numberify``.
|
|
"""
|
|
# Copyright © 2021 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import argparse
|
|
import contextlib
|
|
import datetime
|
|
import enum
|
|
import functools
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
|
|
from typing import (
|
|
cast,
|
|
Any,
|
|
Callable,
|
|
ClassVar,
|
|
Dict,
|
|
Hashable,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
MutableMapping,
|
|
NamedTuple,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
TextIO,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
)
|
|
from ..beancount_types import (
|
|
MetaKey,
|
|
MetaValue,
|
|
OptionsMap,
|
|
Posting,
|
|
Transaction,
|
|
)
|
|
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from beancount.core.amount import _Amount as BeancountAmount
|
|
from beancount.core.inventory import Inventory
|
|
from beancount.core.position import _Position as Position
|
|
|
|
import beancount.query.numberify as bc_query_numberify
|
|
import beancount.query.query_compile as bc_query_compile
|
|
import beancount.query.query_env as bc_query_env
|
|
import beancount.query.query_execute as bc_query_execute
|
|
import beancount.query.query_parser as bc_query_parser
|
|
import beancount.query.query_render as bc_query_render
|
|
import beancount.query.shell as bc_query_shell
|
|
import odf.table # type:ignore[import]
|
|
import rt
|
|
|
|
from . import core
|
|
from . import rewrite
|
|
from .. import books
|
|
from .. import cliutil
|
|
from .. import config as configmod
|
|
from .. import data
|
|
from .. import rtutil
|
|
|
|
PROGNAME = 'query-report'
|
|
logger = logging.getLogger('conservancy_beancount.reports.query')
|
|
|
|
CellFunc = Callable[[Any], odf.table.TableCell]
|
|
EnvironmentColumns = Dict[
|
|
# The real key type is something like:
|
|
# Union[str, Tuple[str, Type, ...]]
|
|
# but two issues with that. One, you can't use Ellipses in a Tuple like
|
|
# that, so there's no short way to declare this. Second, Beancount doesn't
|
|
# declare it anyway, and mypy infers it as Sequence[object]. So just use
|
|
# that.
|
|
Sequence[object],
|
|
Type[bc_query_compile.EvalColumn],
|
|
]
|
|
EnvironmentFunctions = Dict[Sequence[object], Type[bc_query_compile.EvalFunction]]
|
|
RowTypes = Sequence[Tuple[str, Type]]
|
|
Rows = Sequence[NamedTuple]
|
|
RTResult = Optional[Mapping[Any, Any]]
|
|
Store = List[Any]
|
|
QueryExpression = Union[
|
|
bc_query_parser.Column,
|
|
bc_query_parser.Constant,
|
|
bc_query_parser.Function,
|
|
bc_query_parser.UnaryOp,
|
|
]
|
|
QueryStatement = Union[
|
|
bc_query_parser.Balances,
|
|
bc_query_parser.Journal,
|
|
bc_query_parser.Select,
|
|
]
|
|
|
|
# This class annotates the types that Beancount's RowContexts have when they're
|
|
# passed to EvalFunction.__call__(). These types get set across
|
|
# create_row_context and execute_query.
|
|
class PostingContext:
|
|
posting: Posting
|
|
entry: Transaction
|
|
balance: Inventory
|
|
options_map: OptionsMap
|
|
account_types: Mapping
|
|
open_close_map: Mapping
|
|
commodity_map: Mapping
|
|
price_map: Mapping
|
|
# Dynamically set by execute_query
|
|
store: Store
|
|
|
|
|
|
def ContextMeta(context: PostingContext) -> data.PostingMeta:
|
|
"""Build a read-only PostingMeta object from the query context"""
|
|
# We use sys.maxsize as the index because using a constant is fast, and
|
|
# that helps keep the object read-only: if it ever tries to manipulate
|
|
# the transaction, it'll get an IndexError.
|
|
return data.PostingMeta(context.entry, sys.maxsize, context.posting).detached()
|
|
|
|
|
|
class DBColumn(bc_query_compile.EvalColumn):
|
|
_db_cursor: ClassVar[sqlite3.Cursor]
|
|
_db_query: ClassVar[str]
|
|
_dtype: ClassVar[Type] = set
|
|
_return: ClassVar[Callable[['DBColumn'], object]]
|
|
__intypes__ = [Posting]
|
|
|
|
@classmethod
|
|
def with_db(cls, connection: sqlite3.Connection) -> Type['DBColumn']:
|
|
return type(cls.__name__, (cls,), {'_db_cursor': connection.cursor()})
|
|
|
|
def __init_subclass__(cls) -> None:
|
|
if issubclass(cls._dtype, set):
|
|
cls._return = cls._return_set
|
|
else:
|
|
cls._return = cls._return_scalar
|
|
|
|
def __init__(self, colname: Optional[str]=None) -> None:
|
|
if not hasattr(self, '_db_cursor'):
|
|
if colname is None:
|
|
colname = type(self).__name__.lower().replace('db', 'db_', 1)
|
|
raise RuntimeError(f"no entity database loaded - {colname} not available")
|
|
super().__init__(self._dtype)
|
|
|
|
def _entity(self, meta: data.PostingMeta) -> str:
|
|
entity = meta.get('entity')
|
|
return entity if isinstance(entity, str) else '\0'
|
|
|
|
def _return_scalar(self) -> object:
|
|
row = self._db_cursor.fetchone()
|
|
return self._dtype() if row is None else self._dtype(row[0])
|
|
|
|
def _return_set(self) -> object:
|
|
return self._dtype(value for value, in self._db_cursor)
|
|
|
|
def __call__(self, context: PostingContext) -> object:
|
|
entity = self._entity(ContextMeta(context))
|
|
self._db_cursor.execute(self._db_query, (entity,))
|
|
return self._return()
|
|
|
|
|
|
class DBEmail(DBColumn):
|
|
"""Look up an entity's email addresses from the database"""
|
|
_db_query = """
|
|
SELECT email.email_address
|
|
FROM donor
|
|
JOIN donor_email_address_mapping map ON donor.id = map.donor_id
|
|
JOIN email_address email ON map.email_address_id = email.id
|
|
WHERE donor.ledger_entity_id = ?
|
|
ORDER BY email.date_encountered DESC
|
|
"""
|
|
|
|
|
|
class DBId(DBColumn):
|
|
"""Look up an entity's numeric id from the database"""
|
|
_db_query = "SELECT id FROM donor WHERE ledger_entity_id = ?"
|
|
_dtype = int
|
|
|
|
|
|
class DBName(DBColumn):
|
|
"""Look up an entity's display name from the database"""
|
|
_db_query = "SELECT display_name FROM donor WHERE ledger_entity_id = ?"
|
|
_dtype = str
|
|
|
|
|
|
class DBPostal(DBColumn):
|
|
"""Look up an entity's postal addresses from the database"""
|
|
_db_query = """
|
|
SELECT postal.formatted_address
|
|
FROM donor
|
|
JOIN donor_postal_address_mapping map ON donor.id = map.donor_id
|
|
JOIN postal_address postal ON map.postal_address_id = postal.id
|
|
WHERE donor.ledger_entity_id = ?
|
|
ORDER BY postal.date_encountered DESC
|
|
"""
|
|
|
|
|
|
class MetaDocs(bc_query_env.AnyMeta):
|
|
"""Return a list of document links from metadata."""
|
|
def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
|
|
super(bc_query_env.AnyMeta, self).__init__(operands, set)
|
|
# The second argument is our return type.
|
|
# It should match the annotated return type of __call__.
|
|
|
|
def __call__(self, context: PostingContext) -> Set[str]:
|
|
raw_value = super().__call__(context)
|
|
seq = raw_value.split() if isinstance(raw_value, str) else ''
|
|
return set(seq)
|
|
|
|
|
|
class RTField(NamedTuple):
|
|
key: str
|
|
parse: Optional[Callable[[str], object]]
|
|
unset_value: Optional[str] = None
|
|
|
|
def load(self, rt_ticket: RTResult) -> object:
|
|
value = rt_ticket.get(self.key) if rt_ticket else None
|
|
if not value or value == self.unset_value:
|
|
return None
|
|
elif self.parse is None:
|
|
return value
|
|
else:
|
|
return self.parse(value)
|
|
|
|
|
|
class RTTicket(bc_query_compile.EvalFunction):
|
|
"""Look up a field from RT ticket(s) mentioned in metadata documentation"""
|
|
__intypes__ = [str, str, int]
|
|
_CF_REGEXPS = [
|
|
re.compile(r'^CF_([-\w]+)$', re.IGNORECASE),
|
|
re.compile(r'^CF\.\{([-\w]+)\}$', re.IGNORECASE),
|
|
]
|
|
FIELDS = {key: RTField(key, None) for key in [
|
|
'AdminCc',
|
|
'Cc',
|
|
'Creator',
|
|
'Owner',
|
|
'Queue',
|
|
'Status',
|
|
'Subject',
|
|
'Requestors',
|
|
]}
|
|
FIELDS.update((key, RTField(key, int, '0')) for key in [
|
|
'numerical_id',
|
|
'FinalPriority',
|
|
'InitialPriority',
|
|
'Priority',
|
|
'TimeEstimated',
|
|
'TimeLeft',
|
|
'TimeWorked',
|
|
])
|
|
FIELDS.update((key, RTField(key, rtutil.RTDateTime, 'Not set')) for key in [
|
|
'Created',
|
|
'Due',
|
|
'LastUpdated',
|
|
'Resolved',
|
|
'Started',
|
|
'Starts',
|
|
'Told',
|
|
])
|
|
FIELDS.update({key.lower(): value for key, value in FIELDS.items()})
|
|
FIELDS['id'] = FIELDS['numerical_id']
|
|
FIELDS['AdminCC'] = FIELDS['AdminCc']
|
|
FIELDS['CC'] = FIELDS['Cc']
|
|
RT_CLIENT: ClassVar[rt.Rt]
|
|
# _CACHES holds all of the caches for different RT instances that have
|
|
# been passed through RTTicket.with_client().
|
|
_CACHES: ClassVar[Dict[Hashable, MutableMapping[str, RTResult]]] = {}
|
|
# _rt_cache is the cache specific to this RT_CLIENT.
|
|
_rt_cache: ClassVar[MutableMapping[str, RTResult]] = {}
|
|
|
|
@classmethod
|
|
def with_client(cls, client: rt.Rt, cache_key: Hashable) -> Type['RTTicket']:
|
|
return type(cls.__name__, (cls,), {
|
|
'RT_CLIENT': client,
|
|
'_rt_cache': cls._CACHES.setdefault(cache_key, {}),
|
|
})
|
|
|
|
def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
|
|
if not hasattr(self, 'RT_CLIENT'):
|
|
raise RuntimeError("no RT client available - cannot use rt_ticket()")
|
|
rt_op, meta_op, *rest = operands
|
|
# We have to evaluate the RT and meta keys on each call, because they
|
|
# might themselves be dynamic. In the common case they're constants.
|
|
# In that case, check for typos so we can report an error to the user
|
|
# before execution even begins.
|
|
if isinstance(rt_op, bc_query_compile.EvalConstant):
|
|
self._rt_key(rt_op.value)
|
|
if isinstance(meta_op, bc_query_compile.EvalConstant):
|
|
self._meta_key(meta_op.value)
|
|
if not rest:
|
|
operands.append(bc_query_compile.EvalConstant(sys.maxsize))
|
|
super().__init__(operands, set)
|
|
|
|
def _rt_key(self, key: str) -> RTField:
|
|
try:
|
|
return self.FIELDS[key]
|
|
except KeyError:
|
|
for regexp in self._CF_REGEXPS:
|
|
match = regexp.fullmatch(key)
|
|
if match is not None:
|
|
cfield = RTField(f'CF.{{{match.group(1)}}}', None)
|
|
self.FIELDS[cfield.key] = cfield
|
|
self.FIELDS[key] = cfield
|
|
return cfield
|
|
raise ValueError(f"unknown RT ticket field {key!r}") from None
|
|
|
|
def _meta_key(self, key: str) -> str:
|
|
if key in data.LINK_METADATA:
|
|
return key
|
|
else:
|
|
raise ValueError(f"metadata key {key!r} does not contain documentation links")
|
|
|
|
def __call__(self, context: PostingContext) -> Set[object]:
|
|
rt_key: str
|
|
meta_key: str
|
|
limit: int
|
|
rt_key, meta_key, limit = self.eval_args(context)
|
|
rt_field = self._rt_key(rt_key)
|
|
meta_key = self._meta_key(meta_key)
|
|
if limit < 1:
|
|
return set()
|
|
ticket_ids: Set[str] = set()
|
|
for link_s in ContextMeta(context).report_links(meta_key):
|
|
rt_id = rtutil.RT.parse(link_s)
|
|
if rt_id is not None:
|
|
ticket_ids.add(rt_id[0])
|
|
if len(ticket_ids) >= limit:
|
|
break
|
|
retval: Set[object] = set()
|
|
for ticket_id in ticket_ids:
|
|
try:
|
|
rt_ticket = self._rt_cache[ticket_id]
|
|
except KeyError:
|
|
rt_ticket = self.RT_CLIENT.get_ticket(ticket_id)
|
|
self._rt_cache[ticket_id] = rt_ticket
|
|
field_value = rt_field.load(rt_ticket)
|
|
if field_value is None:
|
|
pass
|
|
elif isinstance(field_value, list):
|
|
retval.update(field_value)
|
|
else:
|
|
retval.add(field_value)
|
|
return retval
|
|
|
|
|
|
class StrMeta(bc_query_env.AnyMeta):
|
|
"""Looks up metadata like AnyMeta, then always returns a string."""
|
|
def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
|
|
super(bc_query_env.AnyMeta, self).__init__(operands, str)
|
|
|
|
def __call__(self, context: PostingContext) -> str:
|
|
raw_value = super().__call__(context)
|
|
if raw_value is None:
|
|
return ''
|
|
else:
|
|
return str(raw_value)
|
|
|
|
|
|
class AggregateSet(bc_query_compile.EvalAggregator):
|
|
"""Filter argument values that aren't unique."""
|
|
__intypes__ = [object]
|
|
|
|
def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
|
|
super().__init__(operands, set)
|
|
|
|
def allocate(self, allocator: bc_query_execute.Allocator) -> None:
|
|
"""Allocate and save an index handle into result storage."""
|
|
self.handle = allocator.allocate()
|
|
|
|
def initialize(self, store: Store) -> None:
|
|
"""Prepare result storage for a new aggregation."""
|
|
store[self.handle] = self.dtype()
|
|
# self.dtype() is our return type, aka the second argument to __init__
|
|
# above, aka the annotated return type of __call__.
|
|
|
|
def update(self, store: Store, context: PostingContext) -> None:
|
|
"""Update existing storage with new result data."""
|
|
value, = self.eval_args(context)
|
|
if isinstance(value, Iterable) and not isinstance(value, (str, tuple)):
|
|
store[self.handle].update(value)
|
|
else:
|
|
store[self.handle].add(value)
|
|
|
|
def __call__(self, context: PostingContext) -> set:
|
|
"""Return the result for an aggregation."""
|
|
return context.store[self.handle] # type:ignore[no-any-return]
|
|
|
|
|
|
class _EnvironmentMixin:
|
|
db_path = Path('Financial', 'Ledger', 'supporters.db')
|
|
columns: EnvironmentColumns
|
|
functions: EnvironmentFunctions
|
|
|
|
@classmethod
|
|
def with_config(cls, config: configmod.Config) -> Type['_EnvironmentMixin']:
|
|
columns = cls.columns.copy()
|
|
repo_path = config.repository_path()
|
|
try:
|
|
if repo_path is None:
|
|
raise sqlite3.Error("no repository configured to host database")
|
|
db_conn = sqlite3.connect(os.fspath(repo_path / cls.db_path))
|
|
except (OSError, sqlite3.Error):
|
|
columns['db_email'] = DBEmail
|
|
columns['db_id'] = DBId
|
|
columns['db_name'] = DBName
|
|
columns['db_postal'] = DBPostal
|
|
else:
|
|
columns['db_email'] = DBEmail.with_db(db_conn)
|
|
columns['db_id'] = DBId.with_db(db_conn)
|
|
columns['db_name'] = DBName.with_db(db_conn)
|
|
columns['db_postal'] = DBPostal.with_db(db_conn)
|
|
|
|
rt_credentials = config.rt_credentials()
|
|
rt_client = config.rt_client(rt_credentials)
|
|
if rt_client is None:
|
|
rt_ticket = RTTicket
|
|
else:
|
|
rt_ticket = RTTicket.with_client(rt_client, rt_credentials.idstr())
|
|
functions = cls.functions.copy()
|
|
functions[('rt_ticket', str, str)] = rt_ticket
|
|
functions[('rt_ticket', str, str, int)] = rt_ticket
|
|
return type(cls.__name__, (cls,), {
|
|
'columns': columns,
|
|
'functions': functions,
|
|
})
|
|
|
|
|
|
class FilterPostingsEnvironment(bc_query_env.FilterPostingsEnvironment, _EnvironmentMixin):
|
|
columns: EnvironmentColumns # type:ignore[assignment]
|
|
functions: EnvironmentFunctions = bc_query_env.FilterPostingsEnvironment.functions.copy() # type:ignore[assignment]
|
|
functions['meta_docs'] = MetaDocs
|
|
functions['str_meta'] = StrMeta
|
|
|
|
|
|
class TargetsEnvironment(bc_query_env.TargetsEnvironment, _EnvironmentMixin):
|
|
columns: EnvironmentColumns # type:ignore[assignment]
|
|
functions: EnvironmentFunctions = FilterPostingsEnvironment.functions.copy() # type:ignore[assignment]
|
|
functions.update(bc_query_env.AGGREGATOR_FUNCTIONS)
|
|
functions['set'] = AggregateSet
|
|
|
|
|
|
class BooksLoader:
|
|
"""Closure to load books with a zero-argument callable
|
|
|
|
This matches the load interface that BQLShell expects.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
books_loader: Optional[books.Loader],
|
|
start_date: Optional[datetime.date]=None,
|
|
stop_date: Optional[datetime.date]=None,
|
|
rewrite_rules: Sequence[rewrite.RewriteRuleset]=(),
|
|
) -> None:
|
|
self.books_loader = books_loader
|
|
self.start_date = start_date
|
|
self.stop_date = stop_date
|
|
self.rewrite_rules = rewrite_rules
|
|
|
|
def __call__(self) -> books.LoadResult:
|
|
logger.debug("BooksLoader called")
|
|
result = books.Loader.dispatch(self.books_loader, self.start_date, self.stop_date)
|
|
logger.debug("books loaded from Beancount")
|
|
if self.rewrite_rules:
|
|
for index, entry in enumerate(result.entries):
|
|
# entry might not be a Transaction; we catch that later.
|
|
# The type ignores are because the underlying Beancount type isn't
|
|
# type-checkable.
|
|
postings = data.Posting.from_txn(entry) # type:ignore[arg-type]
|
|
for ruleset in self.rewrite_rules:
|
|
postings = ruleset.rewrite(postings)
|
|
try:
|
|
result.entries[index] = entry._replace(postings=list(postings)) # type:ignore[call-arg]
|
|
except AttributeError:
|
|
pass
|
|
logger.debug("rewrite rules applied")
|
|
return result
|
|
|
|
|
|
class BQLShell(bc_query_shell.BQLShell):
|
|
def __init__(
|
|
self,
|
|
config: configmod.Config,
|
|
is_interactive: bool,
|
|
loadfun: Callable[[], books.LoadResult],
|
|
outfile: TextIO,
|
|
default_format: str='text',
|
|
do_numberify: bool=False,
|
|
) -> None:
|
|
super().__init__(is_interactive, loadfun, outfile, default_format, do_numberify)
|
|
self.env_postings = FilterPostingsEnvironment.with_config(config)()
|
|
self.env_targets = TargetsEnvironment.with_config(config)()
|
|
self.ods = QueryODS(config.rt_wrapper())
|
|
self.last_line_parsed = ''
|
|
|
|
def run_parser(
|
|
self,
|
|
line: str,
|
|
default_close_date: Optional[datetime.datetime]=None,
|
|
) -> None:
|
|
self.last_line_parsed = line
|
|
super().run_parser(line, default_close_date)
|
|
|
|
def _select(self, statement: QueryStatement) -> None:
|
|
output_format: str = self.vars['format']
|
|
try:
|
|
render_func = getattr(self, f'_render_{output_format}')
|
|
except AttributeError:
|
|
logger.error("unknown output format %r", output_format)
|
|
return
|
|
|
|
try:
|
|
logger.debug("compiling query")
|
|
compiled_query = bc_query_compile.compile(
|
|
statement, self.env_targets, self.env_postings, self.env_entries,
|
|
)
|
|
logger.debug("executing query")
|
|
row_types, rows = bc_query_execute.execute_query(
|
|
compiled_query, self.entries, self.options_map,
|
|
)
|
|
if self.vars['numberify']:
|
|
logger.debug("numberifying query")
|
|
row_types, rows = bc_query_numberify.numberify_results(
|
|
row_types, rows, self.options_map['dcontext'].build(),
|
|
)
|
|
except Exception as error:
|
|
logger.error(str(error), exc_info=logger.isEnabledFor(logging.DEBUG))
|
|
try:
|
|
hint_func = getattr(self, f'_hint_{type(error).__name__}')
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
hint_func(error, statement)
|
|
return
|
|
|
|
if not rows and output_format != 'ods':
|
|
print("(empty)", file=self.outfile)
|
|
else:
|
|
logger.debug("rendering query as %s", output_format)
|
|
render_func(statement, row_types, rows)
|
|
|
|
@functools.wraps(bc_query_shell.BQLShell.on_Select, ('__doc__',))
|
|
def on_Select(self, statement: QueryStatement) -> None:
|
|
try:
|
|
self._select(statement)
|
|
except KeyboardInterrupt:
|
|
if self.is_interactive:
|
|
logger.info("interrupted")
|
|
else:
|
|
raise
|
|
|
|
def _hint_TypeError(self, error: TypeError, statement: QueryStatement) -> None:
|
|
try:
|
|
errmsg = str(error.args[0])
|
|
except IndexError:
|
|
return
|
|
if ' not supported between instances ' in errmsg:
|
|
logger.info(
|
|
"HINT: Are you using ORDER BY or comparisons with metadata "
|
|
"that isn't consistently set?\n "
|
|
"Try looking up that metadata with str_meta() instead to "
|
|
"ensure your comparisons use a consistent data type.",
|
|
)
|
|
elif errmsg.startswith('unhashable type: '):
|
|
logger.info(
|
|
"HINT: bean-query does not support selecting columns or "
|
|
"functions that return multiple items as non-aggregate data in "
|
|
"GROUP BY queries.\n "
|
|
"If you want to aggregate that data, run it through set().",
|
|
)
|
|
|
|
def _render_csv(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
|
bc_query_render.render_csv(
|
|
row_types,
|
|
rows,
|
|
self.options_map['dcontext'],
|
|
self.outfile,
|
|
self.vars['expand'],
|
|
)
|
|
|
|
def _render_ods(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
|
self.ods.write_query(statement, row_types, rows, self.last_line_parsed)
|
|
logger.info(
|
|
"%s rows of results saved in sheet %s",
|
|
len(rows),
|
|
self.ods.sheet.getAttribute('name'),
|
|
)
|
|
|
|
def _render_text(self, statement: QueryStatement, row_types: RowTypes, rows: Rows) -> None:
|
|
with contextlib.ExitStack() as stack:
|
|
if self.is_interactive:
|
|
output = stack.enter_context(self.get_pager())
|
|
else:
|
|
output = self.outfile
|
|
bc_query_render.render_text(
|
|
row_types,
|
|
rows,
|
|
self.options_map['dcontext'],
|
|
output,
|
|
self.vars['expand'],
|
|
self.vars['boxed'],
|
|
self.vars['spaced'],
|
|
)
|
|
|
|
|
|
class QueryODS(core.BaseODS[NamedTuple, None]):
|
|
META_FNAMES = frozenset([
|
|
# Names of functions, as defined in Environments, that look up
|
|
# posting metadata that could contain documentation links
|
|
'any_meta',
|
|
'entry_meta',
|
|
'meta',
|
|
'meta_docs',
|
|
'str_meta',
|
|
])
|
|
|
|
def is_empty(self) -> bool:
|
|
return not self.document.spreadsheet.firstChild.getAttribute('name').startswith('Query ')
|
|
|
|
def section_key(self, row: NamedTuple) -> None:
|
|
return None
|
|
|
|
def _generic_cell(self, value: Any) -> odf.table.TableCell:
|
|
if isinstance(value, Iterable) and not isinstance(value, (str, tuple)):
|
|
return self.multiline_cell(value)
|
|
else:
|
|
return self.string_cell('' if value is None else str(value))
|
|
|
|
def _inventory_cell(self, value: Inventory) -> odf.table.TableCell:
|
|
return self.balance_cell(core.Balance(pos.units for pos in value))
|
|
|
|
def _link_string_cell(self, value: str) -> odf.table.TableCell:
|
|
return self.meta_links_cell(value.split())
|
|
|
|
def _metadata_cell(self, value: MetaValue) -> odf.table.TableCell:
|
|
return self._cell_type(type(value))(value)
|
|
|
|
def _position_cell(self, value: Position) -> odf.table.TableCell:
|
|
return self.currency_cell(value.units)
|
|
|
|
def _cell_type(self, row_type: Type) -> CellFunc:
|
|
"""Return a function to create a cell, for non-metadata row types."""
|
|
if issubclass(row_type, Inventory):
|
|
return self._inventory_cell
|
|
elif issubclass(row_type, Position):
|
|
return self._position_cell
|
|
elif issubclass(row_type, BeancountAmount):
|
|
return self.currency_cell
|
|
elif issubclass(row_type, (int, float, Decimal)):
|
|
return self.float_cell
|
|
elif issubclass(row_type, datetime.date):
|
|
return self.date_cell
|
|
elif issubclass(row_type, str):
|
|
return self.string_cell
|
|
else:
|
|
return self._generic_cell
|
|
|
|
def _link_cell_type(self, row_type: Type) -> CellFunc:
|
|
"""Return a function to create a cell from metadata with documentation links."""
|
|
if issubclass(row_type, str):
|
|
return self._link_string_cell
|
|
elif issubclass(row_type, tuple):
|
|
return self._generic_cell
|
|
elif issubclass(row_type, Iterable):
|
|
return self.meta_links_cell
|
|
else:
|
|
return self._generic_cell
|
|
|
|
def _meta_target(self, target: QueryExpression) -> Optional[MetaKey]:
|
|
"""Return the metadata key looked up by this target, if any
|
|
|
|
This function takes a parsed target (i.e., what we're SELECTing) and
|
|
recurses it to see whether it's looking up any metadata. If so, it
|
|
returns the key of that metadata. Otherwise it returns None.
|
|
"""
|
|
if isinstance(target, bc_query_parser.UnaryOp):
|
|
return self._meta_target(target.operand)
|
|
elif not isinstance(target, bc_query_parser.Function):
|
|
return None
|
|
try:
|
|
operand = target.operands[0]
|
|
except IndexError:
|
|
return None
|
|
if (target.fname in self.META_FNAMES
|
|
and isinstance(operand, bc_query_parser.Constant)):
|
|
return operand.value # type:ignore[no-any-return]
|
|
else:
|
|
for operand in target.operands:
|
|
retval = self._meta_target(operand)
|
|
if retval is not None:
|
|
break
|
|
return retval
|
|
|
|
def _cell_types(self, statement: QueryStatement, row_types: RowTypes) -> Iterator[CellFunc]:
|
|
"""Return functions to create table cells from result rows
|
|
|
|
Given a parsed query and the types of return rows, yields a function
|
|
to create a cell for each column in the row, in order. The returned
|
|
functions vary in order to provide the best available formatting for
|
|
different data types.
|
|
"""
|
|
if (isinstance(statement, bc_query_parser.Select)
|
|
and isinstance(statement.targets, Sequence)):
|
|
targets = [t.expression for t in statement.targets]
|
|
else:
|
|
# Synthesize something that makes clear we're not loading metadata.
|
|
targets = [bc_query_parser.Column(name) for name, _ in row_types]
|
|
for target, (_, row_type) in zip(targets, row_types):
|
|
meta_key = self._meta_target(target)
|
|
if meta_key is None:
|
|
yield self._cell_type(row_type)
|
|
elif meta_key in data.LINK_METADATA:
|
|
yield self._link_cell_type(row_type)
|
|
else:
|
|
yield self._metadata_cell
|
|
|
|
def write_query(
|
|
self,
|
|
statement: QueryStatement,
|
|
row_types: RowTypes,
|
|
rows: Rows,
|
|
query_string: Optional[str]=None,
|
|
) -> None:
|
|
if self.is_empty():
|
|
query_count = 1
|
|
else:
|
|
query_count = len(self.document.spreadsheet.childNodes) + 1
|
|
# We avoid using self.use_sheet() because fully building the sheet
|
|
# before adding it to the doc makes the query safer to interrupt.
|
|
self.sheet = odf.table.Table(name=f"Query {query_count}")
|
|
for name, row_type in row_types:
|
|
if issubclass(row_type, datetime.date):
|
|
col_width = 1.0
|
|
elif issubclass(row_type, (BeancountAmount, Inventory, Position)):
|
|
col_width = 1.5
|
|
else:
|
|
col_width = 2.0
|
|
col_style = self.column_style(col_width)
|
|
self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
|
|
self.add_row(*(
|
|
self.string_cell(data.Metadata.human_name(name), stylename=self.style_bold)
|
|
for name, _ in row_types
|
|
))
|
|
self.lock_first_row()
|
|
if query_string:
|
|
self.add_annotation(query_string, parent=self.sheet.lastChild.firstChild)
|
|
cell_funcs = list(self._cell_types(statement, row_types))
|
|
for row in rows:
|
|
self.add_row(*(
|
|
cell_func(value)
|
|
for cell_func, value in zip(cell_funcs, row)
|
|
))
|
|
if query_count == 1:
|
|
self.document.spreadsheet.childNodes[-1] = self.sheet
|
|
else:
|
|
self.document.spreadsheet.appendChild(self.sheet)
|
|
|
|
|
|
class ReportFormat(enum.Enum):
|
|
TEXT = 'text'
|
|
TXT = TEXT
|
|
CSV = 'csv'
|
|
ODS = 'ods'
|
|
|
|
|
|
class SetCYDates(argparse.Action):
|
|
def __call__(self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: Union[Sequence[Any], str, None]=None,
|
|
option_string: Optional[str]=None,
|
|
) -> None:
|
|
value = cliutil.year_or_date_arg(str(values))
|
|
if isinstance(value, int):
|
|
value = datetime.date(value, 1, 1)
|
|
namespace.start_date = value
|
|
namespace.stop_date = cliutil.diff_year(value, 1)
|
|
|
|
|
|
class SetFYDates(argparse.Action):
|
|
def __call__(self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: Union[Sequence[Any], str, None]=None,
|
|
option_string: Optional[str]=None,
|
|
) -> None:
|
|
value = cliutil.year_or_date_arg(str(values))
|
|
namespace.start_date = value
|
|
# The configuration hasn't been loaded, so we don't know the boundaries
|
|
# of a fiscal year yet. But that's okay, because we just need to set
|
|
# enough so that when these arguments are passed to a BooksLoader,
|
|
# it'll load the right fiscal year.
|
|
if isinstance(value, int):
|
|
namespace.stop_date = value
|
|
else:
|
|
namespace.stop_date = value + datetime.timedelta(days=1)
|
|
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(prog=PROGNAME)
|
|
cliutil.add_version_argument(parser)
|
|
cliutil.add_loglevel_argument(parser)
|
|
parser.add_argument(
|
|
'--begin', '--start', '-b',
|
|
dest='start_date',
|
|
metavar='YEAR',
|
|
type=cliutil.year_or_date_arg,
|
|
help="""Begin loading entries from this fiscal year. You can specify a
|
|
full date, and %(prog)s will use the fiscal year for that date.
|
|
""")
|
|
parser.add_argument(
|
|
'--end', '--stop', '-e',
|
|
dest='stop_date',
|
|
metavar='YEAR',
|
|
type=cliutil.year_or_date_arg,
|
|
help="""End loading entries at this fiscal year. You can specify a
|
|
full date, and %(prog)s will use the fiscal year for that date.
|
|
""")
|
|
parser.add_argument(
|
|
'--calendar-year', '--cy',
|
|
action=SetCYDates,
|
|
metavar='YEAR',
|
|
help="""Shortcut to set --begin and --end to load a single calendar year.
|
|
You can specify a full date, or just a year to start from January 1.
|
|
""")
|
|
parser.add_argument(
|
|
'--fiscal-year', '--fy',
|
|
action=SetFYDates,
|
|
metavar='YEAR',
|
|
help="""Shortcut to set --begin and --end to load a single fiscal year.
|
|
You can specify a full date, and %(prog)s will use the fiscal year for that date.
|
|
""")
|
|
cliutil.add_rewrite_rules_argument(parser)
|
|
format_arg = cliutil.EnumArgument(ReportFormat)
|
|
parser.add_argument(
|
|
'--report-type', '--format', '-t', '-f',
|
|
metavar='TYPE',
|
|
type=format_arg.enum_type,
|
|
help="""Format of report to generate. Choices are
|
|
{format_arg.choices_str()}. Default is guessed from your output filename
|
|
extension. If that fails, default is 'text' for interactive output, and 'ods'
|
|
otherwise.
|
|
""")
|
|
parser.add_argument(
|
|
'--numberify', '-m',
|
|
action='store_true',
|
|
help="""Separate currency amounts into numeric columns by currency
|
|
""")
|
|
parser.add_argument(
|
|
'--output-file', '-O', '-o',
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Write the report to this file, or stdout when PATH is `-`.
|
|
The default is stdout for text and CSV reports, and a generated filename for
|
|
ODS reports.
|
|
""")
|
|
parser.add_argument(
|
|
'query',
|
|
nargs=argparse.ZERO_OR_MORE,
|
|
default=[],
|
|
help="""Query to run non-interactively. If none is provided, and
|
|
standard input is not a terminal, reads the query from stdin instead.
|
|
""")
|
|
return parser.parse_args(arglist)
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
config: Optional[configmod.Config]=None,
|
|
) -> int:
|
|
args = parse_arguments(arglist)
|
|
cliutil.set_loglevel(logger, args.loglevel)
|
|
if config is None:
|
|
config = configmod.Config()
|
|
config.load_file()
|
|
|
|
query = ' '.join(args.query).strip()
|
|
if not query and not sys.stdin.isatty():
|
|
query = sys.stdin.read().strip()
|
|
if args.report_type is None:
|
|
try:
|
|
args.report_type = ReportFormat[args.output_file.suffix[1:].upper()]
|
|
except (AttributeError, KeyError):
|
|
args.report_type = ReportFormat.ODS if query else ReportFormat.TEXT
|
|
|
|
load_func = BooksLoader(
|
|
config.books_loader(),
|
|
args.start_date,
|
|
args.stop_date,
|
|
[rewrite.RewriteRuleset.from_yaml(path) for path in args.rewrite_rules],
|
|
)
|
|
shell = BQLShell(
|
|
config,
|
|
not query,
|
|
load_func,
|
|
stdout,
|
|
args.report_type.value,
|
|
args.numberify,
|
|
)
|
|
shell.on_Reload()
|
|
if query:
|
|
shell.onecmd(query)
|
|
else:
|
|
shell.cmdloop()
|
|
|
|
if not shell.ods.is_empty():
|
|
shell.ods.set_common_properties(config.books_repo())
|
|
if args.output_file is None:
|
|
out_dir_path = config.repository_path() or Path()
|
|
args.output_file = out_dir_path / 'QueryResults_{}.ods'.format(
|
|
datetime.datetime.now().isoformat(timespec='seconds'),
|
|
)
|
|
logger.info("Writing spreadsheet to %s", args.output_file)
|
|
ods_file = cliutil.bytes_output(args.output_file, stdout)
|
|
shell.ods.save_file(ods_file)
|
|
|
|
return cliutil.ExitCode.OK
|
|
|
|
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
|
|
|
if __name__ == '__main__':
|
|
exit(entry_point())
|