From cd1b28ae3eb15abbd5c51c501c1194180df02e69 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Sat, 6 Jun 2020 10:29:44 -0400 Subject: [PATCH] cliutil: Add generalized SearchTerm class. This makes the same filtering easily available to other reporting tools for consistency. --- conservancy_beancount/cliutil.py | 89 ++++++++++++ conservancy_beancount/reports/accrual.py | 44 +----- tests/test_cliutil_searchterm.py | 164 +++++++++++++++++++++++ tests/test_reports_accrual.py | 71 +--------- 4 files changed, 261 insertions(+), 107 deletions(-) create mode 100644 tests/test_cliutil_searchterm.py diff --git a/conservancy_beancount/cliutil.py b/conservancy_beancount/cliutil.py index 5a6fde3..e12652b 100644 --- a/conservancy_beancount/cliutil.py +++ b/conservancy_beancount/cliutil.py @@ -23,6 +23,7 @@ import logging import operator import os import pkg_resources +import re import signal import sys import traceback @@ -30,9 +31,15 @@ import types from pathlib import Path +from . import data +from . import filters +from . import rtutil + from typing import ( Any, + Callable, Iterable, + NamedTuple, NoReturn, Optional, Sequence, @@ -40,6 +47,9 @@ from typing import ( Type, Union, ) +from .beancount_types import ( + MetaKey, +) VERSION = pkg_resources.require(PKGNAME)[0].version @@ -114,6 +124,85 @@ class LogLevel(enum.IntEnum): for level in sorted(cls, key=operator.attrgetter('value')): yield level.name.lower() + +class SearchTerm(NamedTuple): + """NamedTuple representing a user's metadata filter + + SearchTerm knows how to parse and store posting metadata filters provided + by the user in `key=value` format. Reporting tools can use this to filter + postings that match the user's criteria, to report on subsets of the books. + + Typical usage looks like:: + + argument_parser.add_argument( + 'search_terms', + type=SearchTerm.arg_parser(), + …, + ) + + args = argument_parser.parse_args(…) + for query in args.search_terms: + postings = query.filter_postings(postings) + """ + meta_key: MetaKey + pattern: str + + @classmethod + def arg_parser(cls, + default_key: Optional[str]=None, + ticket_default_key: Optional[str]=None, + ) -> Callable[[str], 'SearchTerm']: + """Build a SearchTerm parser + + This method returns a function that can parse strings in ``key=value`` + format and return a corresponding SearchTerm. + + If you specify a default key, then strings that just specify a ``value`` + will be parsed as if they said ``default_key=value``. Otherwise, + parsing strings without a metadata key will raise a ValueError. + + If you specify a default key ticket links, then values in the format + ``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as + if they said ``ticket_default_key=value``. + """ + if ticket_default_key is None: + ticket_default_key = default_key + def parse_search_term(arg: str) -> 'SearchTerm': + key: Optional[str] = None + if re.match(r'^[a-z][-\w]*=', arg): + key, _, raw_link = arg.partition('=') + else: + raw_link = arg + rt_ids = rtutil.RT.parse(raw_link) + if rt_ids is None: + rt_ids = rtutil.RT.parse('rt:' + raw_link) + if rt_ids is None: + if key is None: + key = default_key + pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link)) + else: + ticket_id, attachment_id = rt_ids + if key is None: + if attachment_id is None: + key = ticket_default_key + else: + key = default_key + pattern = rtutil.RT.metadata_regexp( + ticket_id, + attachment_id, + first_link_only=key == 'rt-id' and attachment_id is None, + ) + if key is None: + raise ValueError(f"invalid search term {arg!r}: no metadata key") + return cls(key, pattern) + return parse_search_term + + def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]: + return filters.filter_meta_match( + postings, self.meta_key, re.compile(self.pattern), + ) + + def add_loglevel_argument(parser: argparse.ArgumentParser, default: LogLevel=LogLevel.INFO) -> argparse.Action: return parser.add_argument( diff --git a/conservancy_beancount/reports/accrual.py b/conservancy_beancount/reports/accrual.py index ad2015a..67db8e8 100644 --- a/conservancy_beancount/reports/accrual.py +++ b/conservancy_beancount/reports/accrual.py @@ -622,46 +622,13 @@ class ReturnFlag(enum.IntFlag): REPORT_ERRORS = 4 NOTHING_TO_REPORT = 8 - -class SearchTerm(NamedTuple): - meta_key: MetaKey - pattern: str - - @classmethod - def parse(cls, s: str) -> 'SearchTerm': - key_match = re.match(r'^[a-z][-\w]*=', s) - key: Optional[str] - if key_match: - key, _, raw_link = s.partition('=') - else: - key = None - raw_link = s - rt_ids = rtutil.RT.parse(raw_link) - if rt_ids is None: - rt_ids = rtutil.RT.parse('rt:' + raw_link) - if rt_ids is None: - if key is None: - key = 'invoice' - pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link)) - else: - ticket_id, attachment_id = rt_ids - if key is None: - key = 'rt-id' if attachment_id is None else 'invoice' - pattern = rtutil.RT.metadata_regexp( - ticket_id, - attachment_id, - first_link_only=key == 'rt-id' and attachment_id is None, - ) - return cls(key, pattern) - - def filter_search(postings: Iterable[data.Posting], - search_terms: Iterable[SearchTerm], + search_terms: Iterable[cliutil.SearchTerm], ) -> Iterable[data.Posting]: accounts = tuple(AccrualAccount.account_names()) postings = (post for post in postings if post.account.is_under(*accounts)) - for meta_key, pattern in search_terms: - postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern)) + for query in search_terms: + postings = query.filter_postings(postings) return postings def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace: @@ -696,7 +663,9 @@ filename for other reports. """) cliutil.add_loglevel_argument(parser) parser.add_argument( - 'search', + 'search_terms', + metavar='FILTER', + type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'), nargs=argparse.ZERO_OR_MORE, help="""Report on accruals that match this criteria. The format is NAME=TERM. TERM is a link or word that must exist in a posting's NAME @@ -705,7 +674,6 @@ metadata to match. A single ticket number is a shortcut for `TIK/ATT` format, is a shortcut for `invoice=LINK`. """) args = parser.parse_args(arglist) - args.search_terms = [SearchTerm.parse(s) for s in args.search] if args.report_type is None and not args.search_terms: args.report_type = ReportType.AGING return args diff --git a/tests/test_cliutil_searchterm.py b/tests/test_cliutil_searchterm.py new file mode 100644 index 0000000..c0c81b0 --- /dev/null +++ b/tests/test_cliutil_searchterm.py @@ -0,0 +1,164 @@ +"""test_cliutil_searchterm - Unit tests for cliutil.SearchTerm""" +# Copyright © 2020 Brett Smith +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import re + +import pytest + +from . import testutil + +from conservancy_beancount import cliutil +from conservancy_beancount import data + +TICKET_LINKS = [ + '123', + 'rt:123', + 'rt://ticket/123', +] + +ATTACHMENT_LINKS = [ + '123/456', + 'rt:123/456', + 'rt://ticket/123/attachments/456', +] + +REPOSITORY_LINKS = [ + '789.pdf', + 'Documents/789.pdf', +] + +RT_LINKS = TICKET_LINKS + ATTACHMENT_LINKS +ALL_LINKS = RT_LINKS + REPOSITORY_LINKS + +INVALID_METADATA_KEYS = [ + # Must start with a-z + ';key', + ' key', + 'ákey', + 'Key', + # Must only contain alphanumerics, -, and _ + 'key;', + 'key ', + 'a.key', +] + +@pytest.fixture(scope='module') +def defaults_parser(): + return cliutil.SearchTerm.arg_parser('document', 'rt-id') + +@pytest.fixture(scope='module') +def one_default_parser(): + return cliutil.SearchTerm.arg_parser('default') + +@pytest.fixture(scope='module') +def no_default_parser(): + return cliutil.SearchTerm.arg_parser() + +def check_link_regexp(regexp, match_s, first_link_only=False): + assert regexp + assert re.search(regexp, match_s) + assert re.search(regexp, match_s + ' postlink') + assert re.search(regexp, match_s + '0') is None + assert re.search(regexp, '1' + match_s) is None + end_match = re.search(regexp, 'prelink ' + match_s) + if first_link_only: + assert end_match is None + else: + assert end_match + +@pytest.mark.parametrize('arg', TICKET_LINKS) +def test_search_term_parse_ticket(defaults_parser, arg): + key, regexp = defaults_parser(arg) + assert key == 'rt-id' + check_link_regexp(regexp, 'rt:123', first_link_only=True) + check_link_regexp(regexp, 'rt://ticket/123', first_link_only=True) + +@pytest.mark.parametrize('arg', ATTACHMENT_LINKS) +def test_search_term_parse_attachment(defaults_parser, arg): + key, regexp = defaults_parser(arg) + assert key == 'document' + check_link_regexp(regexp, 'rt:123/456') + check_link_regexp(regexp, 'rt://ticket/123/attachments/456') + +@pytest.mark.parametrize('key,query', testutil.combine_values( + ['approval', 'contract', 'invoice'], + RT_LINKS, +)) +def test_search_term_parse_metadata_rt_shortcut(defaults_parser, key, query): + actual_key, regexp = defaults_parser(f'{key}={query}') + assert actual_key == key + if query.endswith('/456'): + check_link_regexp(regexp, 'rt:123/456') + check_link_regexp(regexp, 'rt://ticket/123/attachments/456') + else: + check_link_regexp(regexp, 'rt:123') + check_link_regexp(regexp, 'rt://ticket/123') + +@pytest.mark.parametrize('search_prefix', [ + '', + 'approval=', + 'contract=', + 'invoice=', +]) +def test_search_term_parse_repo_link(defaults_parser, search_prefix): + document = '1234.pdf' + actual_key, regexp = defaults_parser(f'{search_prefix}{document}') + if search_prefix: + expect_key = search_prefix.rstrip('=') + else: + expect_key = 'document' + assert actual_key == expect_key + check_link_regexp(regexp, document) + +@pytest.mark.parametrize('search,unmatched', [ + ('1234.pdf', '1234_pdf'), +]) +def test_search_term_parse_regexp_escaping(defaults_parser, search, unmatched): + _, regexp = defaults_parser(search) + assert re.search(regexp, unmatched) is None + +@pytest.mark.parametrize('key', INVALID_METADATA_KEYS) +def test_non_metadata_key(one_default_parser, key): + document = f'{key}=890' + key, pattern = one_default_parser(document) + assert key == 'default' + check_link_regexp(pattern, document) + +@pytest.mark.parametrize('arg', ALL_LINKS) +def test_default_parser(one_default_parser, arg): + key, _ = one_default_parser(arg) + assert key == 'default' + +@pytest.mark.parametrize('arg', ALL_LINKS + [ + f'{nonkey}={link}' for nonkey, link in testutil.combine_values( + INVALID_METADATA_KEYS, ALL_LINKS, + ) +]) +def test_no_key(no_default_parser, arg): + with pytest.raises(ValueError): + key, pattern = no_default_parser(arg) + +@pytest.mark.parametrize('key', ['zero', 'one', 'two']) +def test_filter_postings(key): + txn = testutil.Transaction(postings=[ + ('Income:Other', 3, {'one': '1', 'two': '2'}), + ('Income:Other', 2, {'two': '2'}), + ('Income:Other', 1, {'one': '1'}), + ]) + search = cliutil.SearchTerm(key, '.') + actual = list(search.filter_postings(data.Posting.from_txn(txn))) + assert len(actual) == 0 if key == 'zero' else 2 + assert all(post.meta.get(key) for post in actual) diff --git a/tests/test_reports_accrual.py b/tests/test_reports_accrual.py index 9b14372..230545c 100644 --- a/tests/test_reports_accrual.py +++ b/tests/test_reports_accrual.py @@ -37,6 +37,7 @@ from typing import NamedTuple, Optional, Sequence from beancount.core import data as bc_data from beancount import loader as bc_loader +from conservancy_beancount import cliutil from conservancy_beancount import data from conservancy_beancount import rtutil from conservancy_beancount.reports import accrual @@ -144,18 +145,6 @@ class RTClient(testutil.RTClient): def accrual_postings(): return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS)) -def check_link_regexp(regexp, match_s, first_link_only=False): - assert regexp - assert re.search(regexp, match_s) - assert re.search(regexp, match_s + ' postlink') - assert re.search(regexp, match_s + '0') is None - assert re.search(regexp, '1' + match_s) is None - end_match = re.search(regexp, 'prelink ' + match_s) - if first_link_only: - assert end_match is None - else: - assert end_match - def accruals_by_meta(postings, value, key='invoice', wrap_type=iter): return wrap_type( post for post in postings @@ -223,63 +212,6 @@ def check_aging_ods(ods_file, check_aging_sheet(sheets[0], recv_rows, date, -60) check_aging_sheet(sheets[1], pay_rows, date, -30) -@pytest.mark.parametrize('link_fmt', [ - '{}', - 'rt:{}', - 'rt://ticket/{}', -]) -def test_search_term_parse_rt_shortcuts(link_fmt): - key, regexp = accrual.SearchTerm.parse(link_fmt.format(220)) - assert key == 'rt-id' - check_link_regexp(regexp, 'rt:220', first_link_only=True) - check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True) - -@pytest.mark.parametrize('link_fmt', [ - '{}/{}', - 'rt:{}/{}', - 'rt://ticket/{}/attachments/{}', -]) -def test_search_term_parse_invoice_shortcuts(link_fmt): - key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660)) - assert key == 'invoice' - check_link_regexp(regexp, 'rt:330/660') - check_link_regexp(regexp, 'rt://ticket/330/attachments/660') - -@pytest.mark.parametrize('key', [ - 'approval', - 'contract', - 'invoice', -]) -def test_search_term_parse_metadata_rt_shortcut(key): - actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420') - assert actual_key == key - check_link_regexp(regexp, 'rt:440/420') - check_link_regexp(regexp, 'rt://ticket/440/attachments/420') - -@pytest.mark.parametrize('key', [ - None, - 'approval', - 'contract', - 'invoice', -]) -def test_search_term_parse_repo_link(key): - document = '1234.pdf' - if key is None: - key = 'invoice' - search = document - else: - search = f'{key}={document}' - actual_key, regexp = accrual.SearchTerm.parse(search) - assert actual_key == key - check_link_regexp(regexp, document) - -@pytest.mark.parametrize('search,unmatched', [ - ('1234.pdf', '1234_pdf'), -]) -def test_search_term_parse_regexp_escaping(search, unmatched): - _, regexp = accrual.SearchTerm.parse(search) - assert re.search(regexp, unmatched) is None - @pytest.mark.parametrize('search_terms,expect_count,check_func', [ ([], ACCRUALS_COUNT, lambda post: post.account.is_under( 'Assets:Receivable:', 'Liabilities:Payable:', @@ -292,6 +224,7 @@ def test_search_term_parse_regexp_escaping(search, unmatched): ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False), ]) def test_filter_search(accrual_postings, search_terms, expect_count, check_func): + search_terms = [cliutil.SearchTerm._make(query) for query in search_terms] actual = list(accrual.filter_search(accrual_postings, search_terms)) if expect_count < ACCRUALS_COUNT: assert ACCRUALS_COUNT > len(actual) >= expect_count