reports: Add RelatedPostings.group_by_first_meta_link() method.
This commit is contained in:
		
							parent
							
								
									f52ad4fbc1
								
							
						
					
					
						commit
						52fc0d1b5f
					
				
					 3 changed files with 86 additions and 7 deletions
				
			
		| 
						 | 
				
			
			@ -694,7 +694,7 @@ def main(arglist: Optional[Sequence[str]]=None,
 | 
			
		|||
 | 
			
		||||
    returncode = 0
 | 
			
		||||
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
 | 
			
		||||
    groups: PostGroups = dict(AccrualPostings.group_by_meta(postings, 'invoice'))
 | 
			
		||||
    groups: PostGroups = dict(AccrualPostings.group_by_first_meta_link(postings, 'invoice'))
 | 
			
		||||
    for error in load_errors:
 | 
			
		||||
        bc_printer.print_error(error, file=stderr)
 | 
			
		||||
        returncode |= ReturnFlag.LOAD_ERRORS
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -46,7 +46,6 @@ from typing import (
 | 
			
		|||
    Any,
 | 
			
		||||
    BinaryIO,
 | 
			
		||||
    Callable,
 | 
			
		||||
    DefaultDict,
 | 
			
		||||
    Dict,
 | 
			
		||||
    Generic,
 | 
			
		||||
    Iterable,
 | 
			
		||||
| 
						 | 
				
			
			@ -255,6 +254,17 @@ class RelatedPostings(Sequence[data.Posting]):
 | 
			
		|||
        else:
 | 
			
		||||
            self._postings = list(source)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def _group_by(cls: Type[RelatedType],
 | 
			
		||||
                  postings: Iterable[data.Posting],
 | 
			
		||||
                  key: Callable[[data.Posting], T],
 | 
			
		||||
    ) -> Iterator[Tuple[T, RelatedType]]:
 | 
			
		||||
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 | 
			
		||||
        for post in postings:
 | 
			
		||||
            mapping[key(post)].append(post)
 | 
			
		||||
        for value, posts in mapping.items():
 | 
			
		||||
            yield value, cls(posts, _can_own=True)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def group_by_meta(cls: Type[RelatedType],
 | 
			
		||||
                      postings: Iterable[data.Posting],
 | 
			
		||||
| 
						 | 
				
			
			@ -268,11 +278,27 @@ class RelatedPostings(Sequence[data.Posting]):
 | 
			
		|||
        The values are RelatedPostings instances that contain all the postings
 | 
			
		||||
        that had that same metadata value.
 | 
			
		||||
        """
 | 
			
		||||
        mapping: DefaultDict[Optional[MetaValue], List[data.Posting]] = collections.defaultdict(list)
 | 
			
		||||
        for post in postings:
 | 
			
		||||
            mapping[post.meta.get(key, default)].append(post)
 | 
			
		||||
        for value, posts in mapping.items():
 | 
			
		||||
            yield value, cls(posts, _can_own=True)
 | 
			
		||||
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 | 
			
		||||
            return post.meta.get(key, default)
 | 
			
		||||
        return cls._group_by(postings, key_func)
 | 
			
		||||
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def group_by_first_meta_link(
 | 
			
		||||
            cls: Type[RelatedType],
 | 
			
		||||
            postings: Iterable[data.Posting],
 | 
			
		||||
            key: MetaKey,
 | 
			
		||||
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 | 
			
		||||
        """Relate postings by the first link in metadata
 | 
			
		||||
 | 
			
		||||
        This method takes an iterable of postings and returns a mapping.
 | 
			
		||||
        The keys of the mapping are the values of
 | 
			
		||||
        post.meta.first_link(key, None).
 | 
			
		||||
        The values are RelatedPostings instances that contain all the postings
 | 
			
		||||
        that had that same first metadata link.
 | 
			
		||||
        """
 | 
			
		||||
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 | 
			
		||||
            return post.meta.first_link(key, None)
 | 
			
		||||
        return cls._group_by(postings, key_func)
 | 
			
		||||
 | 
			
		||||
    def __repr__(self) -> str:
 | 
			
		||||
        return f'<{type(self).__name__} {self._postings!r}>'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -63,6 +63,27 @@ def two_accruals_three_payments():
 | 
			
		|||
        (-550, 'EUR'),
 | 
			
		||||
    ))
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def link_swap_posts():
 | 
			
		||||
    retval = []
 | 
			
		||||
    meta = {
 | 
			
		||||
        'rt-id': 'rt:12 rt:16',
 | 
			
		||||
        '_post_type': data.Posting,
 | 
			
		||||
        '_meta_type': data.Metadata,
 | 
			
		||||
    }
 | 
			
		||||
    for n in range(1, 3):
 | 
			
		||||
        n = Decimal(n)
 | 
			
		||||
        retval.append(testutil.Posting(
 | 
			
		||||
            'Assets:Receivable:Accounts', n * 10, metanum=n, **meta,
 | 
			
		||||
        ))
 | 
			
		||||
    meta['rt-id'] = 'rt:16 rt:12'
 | 
			
		||||
    for n in range(1, 3):
 | 
			
		||||
        n = Decimal(n)
 | 
			
		||||
        retval.append(testutil.Posting(
 | 
			
		||||
            'Liabilities:Payable:Accounts', n * -10, metanum=n, **meta,
 | 
			
		||||
        ))
 | 
			
		||||
    return retval
 | 
			
		||||
 | 
			
		||||
def test_initialize_with_list(credit_card_cycle):
 | 
			
		||||
    related = core.RelatedPostings(credit_card_cycle[0].postings)
 | 
			
		||||
    assert len(related) == 2
 | 
			
		||||
| 
						 | 
				
			
			@ -313,3 +334,35 @@ def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 | 
			
		|||
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 | 
			
		||||
    assert set(actual) == {post.units.number for post in postings}
 | 
			
		||||
    assert len(actual) == len(postings)
 | 
			
		||||
 | 
			
		||||
def test_group_by_first_meta_link_zero():
 | 
			
		||||
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))
 | 
			
		||||
 | 
			
		||||
def test_group_by_first_meta_link_no_key(link_swap_posts):
 | 
			
		||||
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 | 
			
		||||
        iter(link_swap_posts), 'Nonexistent',
 | 
			
		||||
    ))
 | 
			
		||||
    assert len(actual) == 1
 | 
			
		||||
    assert list(actual[None]) == link_swap_posts
 | 
			
		||||
 | 
			
		||||
def test_group_by_first_meta_link_bad_type(link_swap_posts):
 | 
			
		||||
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
 | 
			
		||||
        "did not find metadata required by test"
 | 
			
		||||
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 | 
			
		||||
        iter(link_swap_posts), 'metanum',
 | 
			
		||||
    ))
 | 
			
		||||
    assert len(actual) == 1
 | 
			
		||||
    assert list(actual[None]) == link_swap_posts
 | 
			
		||||
 | 
			
		||||
def test_group_by_first_meta_link(link_swap_posts):
 | 
			
		||||
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
 | 
			
		||||
        iter(link_swap_posts), 'rt-id',
 | 
			
		||||
    ))
 | 
			
		||||
    assert len(actual_all) == 2
 | 
			
		||||
    for key, expect_account in [
 | 
			
		||||
            ('rt:12', 'Assets:Receivable:Accounts'),
 | 
			
		||||
            ('rt:16', 'Liabilities:Payable:Accounts'),
 | 
			
		||||
    ]:
 | 
			
		||||
        actual = actual_all.get(key, '')
 | 
			
		||||
        assert len(actual) == 2
 | 
			
		||||
        assert all(post.account == expect_account for post in actual)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue