cliutil: Add generalized SearchTerm class.

This makes the same filtering easily available to other reporting tools for
consistency.
This commit is contained in:
Brett Smith 2020-06-06 10:29:44 -04:00
parent 0431d15d68
commit cd1b28ae3e
4 changed files with 261 additions and 107 deletions

View file

@ -23,6 +23,7 @@ import logging
import operator import operator
import os import os
import pkg_resources import pkg_resources
import re
import signal import signal
import sys import sys
import traceback import traceback
@ -30,9 +31,15 @@ import types
from pathlib import Path from pathlib import Path
from . import data
from . import filters
from . import rtutil
from typing import ( from typing import (
Any, Any,
Callable,
Iterable, Iterable,
NamedTuple,
NoReturn, NoReturn,
Optional, Optional,
Sequence, Sequence,
@ -40,6 +47,9 @@ from typing import (
Type, Type,
Union, Union,
) )
from .beancount_types import (
MetaKey,
)
VERSION = pkg_resources.require(PKGNAME)[0].version VERSION = pkg_resources.require(PKGNAME)[0].version
@ -114,6 +124,85 @@ class LogLevel(enum.IntEnum):
for level in sorted(cls, key=operator.attrgetter('value')): for level in sorted(cls, key=operator.attrgetter('value')):
yield level.name.lower() yield level.name.lower()
class SearchTerm(NamedTuple):
"""NamedTuple representing a user's metadata filter
SearchTerm knows how to parse and store posting metadata filters provided
by the user in `key=value` format. Reporting tools can use this to filter
postings that match the user's criteria, to report on subsets of the books.
Typical usage looks like::
argument_parser.add_argument(
'search_terms',
type=SearchTerm.arg_parser(),
,
)
args = argument_parser.parse_args()
for query in args.search_terms:
postings = query.filter_postings(postings)
"""
meta_key: MetaKey
pattern: str
@classmethod
def arg_parser(cls,
default_key: Optional[str]=None,
ticket_default_key: Optional[str]=None,
) -> Callable[[str], 'SearchTerm']:
"""Build a SearchTerm parser
This method returns a function that can parse strings in ``key=value``
format and return a corresponding SearchTerm.
If you specify a default key, then strings that just specify a ``value``
will be parsed as if they said ``default_key=value``. Otherwise,
parsing strings without a metadata key will raise a ValueError.
If you specify a default key ticket links, then values in the format
``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as
if they said ``ticket_default_key=value``.
"""
if ticket_default_key is None:
ticket_default_key = default_key
def parse_search_term(arg: str) -> 'SearchTerm':
key: Optional[str] = None
if re.match(r'^[a-z][-\w]*=', arg):
key, _, raw_link = arg.partition('=')
else:
raw_link = arg
rt_ids = rtutil.RT.parse(raw_link)
if rt_ids is None:
rt_ids = rtutil.RT.parse('rt:' + raw_link)
if rt_ids is None:
if key is None:
key = default_key
pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
else:
ticket_id, attachment_id = rt_ids
if key is None:
if attachment_id is None:
key = ticket_default_key
else:
key = default_key
pattern = rtutil.RT.metadata_regexp(
ticket_id,
attachment_id,
first_link_only=key == 'rt-id' and attachment_id is None,
)
if key is None:
raise ValueError(f"invalid search term {arg!r}: no metadata key")
return cls(key, pattern)
return parse_search_term
def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
return filters.filter_meta_match(
postings, self.meta_key, re.compile(self.pattern),
)
def add_loglevel_argument(parser: argparse.ArgumentParser, def add_loglevel_argument(parser: argparse.ArgumentParser,
default: LogLevel=LogLevel.INFO) -> argparse.Action: default: LogLevel=LogLevel.INFO) -> argparse.Action:
return parser.add_argument( return parser.add_argument(

View file

@ -622,46 +622,13 @@ class ReturnFlag(enum.IntFlag):
REPORT_ERRORS = 4 REPORT_ERRORS = 4
NOTHING_TO_REPORT = 8 NOTHING_TO_REPORT = 8
class SearchTerm(NamedTuple):
meta_key: MetaKey
pattern: str
@classmethod
def parse(cls, s: str) -> 'SearchTerm':
key_match = re.match(r'^[a-z][-\w]*=', s)
key: Optional[str]
if key_match:
key, _, raw_link = s.partition('=')
else:
key = None
raw_link = s
rt_ids = rtutil.RT.parse(raw_link)
if rt_ids is None:
rt_ids = rtutil.RT.parse('rt:' + raw_link)
if rt_ids is None:
if key is None:
key = 'invoice'
pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
else:
ticket_id, attachment_id = rt_ids
if key is None:
key = 'rt-id' if attachment_id is None else 'invoice'
pattern = rtutil.RT.metadata_regexp(
ticket_id,
attachment_id,
first_link_only=key == 'rt-id' and attachment_id is None,
)
return cls(key, pattern)
def filter_search(postings: Iterable[data.Posting], def filter_search(postings: Iterable[data.Posting],
search_terms: Iterable[SearchTerm], search_terms: Iterable[cliutil.SearchTerm],
) -> Iterable[data.Posting]: ) -> Iterable[data.Posting]:
accounts = tuple(AccrualAccount.account_names()) accounts = tuple(AccrualAccount.account_names())
postings = (post for post in postings if post.account.is_under(*accounts)) postings = (post for post in postings if post.account.is_under(*accounts))
for meta_key, pattern in search_terms: for query in search_terms:
postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern)) postings = query.filter_postings(postings)
return postings return postings
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace: def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
@ -696,7 +663,9 @@ filename for other reports.
""") """)
cliutil.add_loglevel_argument(parser) cliutil.add_loglevel_argument(parser)
parser.add_argument( parser.add_argument(
'search', 'search_terms',
metavar='FILTER',
type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
nargs=argparse.ZERO_OR_MORE, nargs=argparse.ZERO_OR_MORE,
help="""Report on accruals that match this criteria. The format is help="""Report on accruals that match this criteria. The format is
NAME=TERM. TERM is a link or word that must exist in a posting's NAME NAME=TERM. TERM is a link or word that must exist in a posting's NAME
@ -705,7 +674,6 @@ metadata to match. A single ticket number is a shortcut for
`TIK/ATT` format, is a shortcut for `invoice=LINK`. `TIK/ATT` format, is a shortcut for `invoice=LINK`.
""") """)
args = parser.parse_args(arglist) args = parser.parse_args(arglist)
args.search_terms = [SearchTerm.parse(s) for s in args.search]
if args.report_type is None and not args.search_terms: if args.report_type is None and not args.search_terms:
args.report_type = ReportType.AGING args.report_type = ReportType.AGING
return args return args

View file

@ -0,0 +1,164 @@
"""test_cliutil_searchterm - Unit tests for cliutil.SearchTerm"""
# Copyright © 2020 Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import re
import pytest
from . import testutil
from conservancy_beancount import cliutil
from conservancy_beancount import data
TICKET_LINKS = [
'123',
'rt:123',
'rt://ticket/123',
]
ATTACHMENT_LINKS = [
'123/456',
'rt:123/456',
'rt://ticket/123/attachments/456',
]
REPOSITORY_LINKS = [
'789.pdf',
'Documents/789.pdf',
]
RT_LINKS = TICKET_LINKS + ATTACHMENT_LINKS
ALL_LINKS = RT_LINKS + REPOSITORY_LINKS
INVALID_METADATA_KEYS = [
# Must start with a-z
';key',
' key',
'ákey',
'Key',
# Must only contain alphanumerics, -, and _
'key;',
'key ',
'a.key',
]
@pytest.fixture(scope='module')
def defaults_parser():
return cliutil.SearchTerm.arg_parser('document', 'rt-id')
@pytest.fixture(scope='module')
def one_default_parser():
return cliutil.SearchTerm.arg_parser('default')
@pytest.fixture(scope='module')
def no_default_parser():
return cliutil.SearchTerm.arg_parser()
def check_link_regexp(regexp, match_s, first_link_only=False):
assert regexp
assert re.search(regexp, match_s)
assert re.search(regexp, match_s + ' postlink')
assert re.search(regexp, match_s + '0') is None
assert re.search(regexp, '1' + match_s) is None
end_match = re.search(regexp, 'prelink ' + match_s)
if first_link_only:
assert end_match is None
else:
assert end_match
@pytest.mark.parametrize('arg', TICKET_LINKS)
def test_search_term_parse_ticket(defaults_parser, arg):
key, regexp = defaults_parser(arg)
assert key == 'rt-id'
check_link_regexp(regexp, 'rt:123', first_link_only=True)
check_link_regexp(regexp, 'rt://ticket/123', first_link_only=True)
@pytest.mark.parametrize('arg', ATTACHMENT_LINKS)
def test_search_term_parse_attachment(defaults_parser, arg):
key, regexp = defaults_parser(arg)
assert key == 'document'
check_link_regexp(regexp, 'rt:123/456')
check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
@pytest.mark.parametrize('key,query', testutil.combine_values(
['approval', 'contract', 'invoice'],
RT_LINKS,
))
def test_search_term_parse_metadata_rt_shortcut(defaults_parser, key, query):
actual_key, regexp = defaults_parser(f'{key}={query}')
assert actual_key == key
if query.endswith('/456'):
check_link_regexp(regexp, 'rt:123/456')
check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
else:
check_link_regexp(regexp, 'rt:123')
check_link_regexp(regexp, 'rt://ticket/123')
@pytest.mark.parametrize('search_prefix', [
'',
'approval=',
'contract=',
'invoice=',
])
def test_search_term_parse_repo_link(defaults_parser, search_prefix):
document = '1234.pdf'
actual_key, regexp = defaults_parser(f'{search_prefix}{document}')
if search_prefix:
expect_key = search_prefix.rstrip('=')
else:
expect_key = 'document'
assert actual_key == expect_key
check_link_regexp(regexp, document)
@pytest.mark.parametrize('search,unmatched', [
('1234.pdf', '1234_pdf'),
])
def test_search_term_parse_regexp_escaping(defaults_parser, search, unmatched):
_, regexp = defaults_parser(search)
assert re.search(regexp, unmatched) is None
@pytest.mark.parametrize('key', INVALID_METADATA_KEYS)
def test_non_metadata_key(one_default_parser, key):
document = f'{key}=890'
key, pattern = one_default_parser(document)
assert key == 'default'
check_link_regexp(pattern, document)
@pytest.mark.parametrize('arg', ALL_LINKS)
def test_default_parser(one_default_parser, arg):
key, _ = one_default_parser(arg)
assert key == 'default'
@pytest.mark.parametrize('arg', ALL_LINKS + [
f'{nonkey}={link}' for nonkey, link in testutil.combine_values(
INVALID_METADATA_KEYS, ALL_LINKS,
)
])
def test_no_key(no_default_parser, arg):
with pytest.raises(ValueError):
key, pattern = no_default_parser(arg)
@pytest.mark.parametrize('key', ['zero', 'one', 'two'])
def test_filter_postings(key):
txn = testutil.Transaction(postings=[
('Income:Other', 3, {'one': '1', 'two': '2'}),
('Income:Other', 2, {'two': '2'}),
('Income:Other', 1, {'one': '1'}),
])
search = cliutil.SearchTerm(key, '.')
actual = list(search.filter_postings(data.Posting.from_txn(txn)))
assert len(actual) == 0 if key == 'zero' else 2
assert all(post.meta.get(key) for post in actual)

View file

@ -37,6 +37,7 @@ from typing import NamedTuple, Optional, Sequence
from beancount.core import data as bc_data from beancount.core import data as bc_data
from beancount import loader as bc_loader from beancount import loader as bc_loader
from conservancy_beancount import cliutil
from conservancy_beancount import data from conservancy_beancount import data
from conservancy_beancount import rtutil from conservancy_beancount import rtutil
from conservancy_beancount.reports import accrual from conservancy_beancount.reports import accrual
@ -144,18 +145,6 @@ class RTClient(testutil.RTClient):
def accrual_postings(): def accrual_postings():
return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS)) return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))
def check_link_regexp(regexp, match_s, first_link_only=False):
assert regexp
assert re.search(regexp, match_s)
assert re.search(regexp, match_s + ' postlink')
assert re.search(regexp, match_s + '0') is None
assert re.search(regexp, '1' + match_s) is None
end_match = re.search(regexp, 'prelink ' + match_s)
if first_link_only:
assert end_match is None
else:
assert end_match
def accruals_by_meta(postings, value, key='invoice', wrap_type=iter): def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
return wrap_type( return wrap_type(
post for post in postings post for post in postings
@ -223,63 +212,6 @@ def check_aging_ods(ods_file,
check_aging_sheet(sheets[0], recv_rows, date, -60) check_aging_sheet(sheets[0], recv_rows, date, -60)
check_aging_sheet(sheets[1], pay_rows, date, -30) check_aging_sheet(sheets[1], pay_rows, date, -30)
@pytest.mark.parametrize('link_fmt', [
'{}',
'rt:{}',
'rt://ticket/{}',
])
def test_search_term_parse_rt_shortcuts(link_fmt):
key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
assert key == 'rt-id'
check_link_regexp(regexp, 'rt:220', first_link_only=True)
check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
@pytest.mark.parametrize('link_fmt', [
'{}/{}',
'rt:{}/{}',
'rt://ticket/{}/attachments/{}',
])
def test_search_term_parse_invoice_shortcuts(link_fmt):
key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
assert key == 'invoice'
check_link_regexp(regexp, 'rt:330/660')
check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
@pytest.mark.parametrize('key', [
'approval',
'contract',
'invoice',
])
def test_search_term_parse_metadata_rt_shortcut(key):
actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
assert actual_key == key
check_link_regexp(regexp, 'rt:440/420')
check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
@pytest.mark.parametrize('key', [
None,
'approval',
'contract',
'invoice',
])
def test_search_term_parse_repo_link(key):
document = '1234.pdf'
if key is None:
key = 'invoice'
search = document
else:
search = f'{key}={document}'
actual_key, regexp = accrual.SearchTerm.parse(search)
assert actual_key == key
check_link_regexp(regexp, document)
@pytest.mark.parametrize('search,unmatched', [
('1234.pdf', '1234_pdf'),
])
def test_search_term_parse_regexp_escaping(search, unmatched):
_, regexp = accrual.SearchTerm.parse(search)
assert re.search(regexp, unmatched) is None
@pytest.mark.parametrize('search_terms,expect_count,check_func', [ @pytest.mark.parametrize('search_terms,expect_count,check_func', [
([], ACCRUALS_COUNT, lambda post: post.account.is_under( ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
'Assets:Receivable:', 'Liabilities:Payable:', 'Assets:Receivable:', 'Liabilities:Payable:',
@ -292,6 +224,7 @@ def test_search_term_parse_regexp_escaping(search, unmatched):
([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False), ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
]) ])
def test_filter_search(accrual_postings, search_terms, expect_count, check_func): def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
actual = list(accrual.filter_search(accrual_postings, search_terms)) actual = list(accrual.filter_search(accrual_postings, search_terms))
if expect_count < ACCRUALS_COUNT: if expect_count < ACCRUALS_COUNT:
assert ACCRUALS_COUNT > len(actual) >= expect_count assert ACCRUALS_COUNT > len(actual) >= expect_count