From d920c5842a69d802958cb0583fddb86e900a46c1 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Wed, 3 Jun 2020 18:54:49 -0400 Subject: [PATCH] reports: Start BaseSpreadsheet class. --- conservancy_beancount/reports/core.py | 73 +++++++++++++++++++++++++ tests/test_reports_spreadsheet.py | 79 +++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 tests/test_reports_spreadsheet.py diff --git a/conservancy_beancount/reports/core.py b/conservancy_beancount/reports/core.py index c26c43c..f0f3723 100644 --- a/conservancy_beancount/reports/core.py +++ b/conservancy_beancount/reports/core.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import abc import collections import operator @@ -31,6 +32,7 @@ from typing import ( Callable, DefaultDict, Dict, + Generic, Iterable, Iterator, List, @@ -51,6 +53,8 @@ from ..beancount_types import ( DecimalCompat = data.DecimalCompat BalanceType = TypeVar('BalanceType', bound='Balance') RelatedType = TypeVar('RelatedType', bound='RelatedPostings') +RT = TypeVar('RT', bound=Sequence) +ST = TypeVar('ST') class Balance(Mapping[str, data.Amount]): """A collection of amounts mapped by currency @@ -275,3 +279,72 @@ class RelatedPostings(Sequence[data.Posting]): default: Optional[MetaValue]=None, ) -> Set[Optional[MetaValue]]: return {post.meta.get(key, default) for post in self} + + +class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta): + """Abstract base class to help write spreadsheets + + This class provides the very core logic to write an arbitrary set of data + rows to arbitrary output. It calls hooks when it starts writing the + spreadsheet, starts a new "section" of rows, ends a section, and ends the + spreadsheet. + + RT is the type of the input data rows. ST is the type of the section + identifier that you create from each row. If you don't want to use the + section logic at all, set ST to None and define section_key to return None. + """ + + @abc.abstractmethod + def section_key(self, row: RT) -> ST: + """Return the section a row belongs to + + Given a data row, this method should return some identifier for the + "section" the row belongs to. The write method uses this to + determine when to call start_section and end_section. + + If your spreadsheet doesn't need sections, define this to return None. + """ + ... + + @abc.abstractmethod + def write_row(self, row: RT) -> None: + """Write a data row to the output spreadsheet + + This method is called once for each data row in the input. + """ + ... + + # The next four methods are all called by the write method when the name + # says. You may override them to output headers or sums, record + # state, etc. The default implementations are all noops. + + def start_spreadsheet(self) -> None: + pass + + def start_section(self, key: ST) -> None: + pass + + def end_section(self, key: ST) -> None: + pass + + def end_spreadsheet(self) -> None: + pass + + def write(self, rows: Iterable[RT]) -> None: + prev_section: Optional[ST] = None + self.start_spreadsheet() + for row in rows: + section = self.section_key(row) + if section != prev_section: + if prev_section is not None: + self.end_section(prev_section) + self.start_section(section) + prev_section = section + self.write_row(row) + try: + should_end = section is not None + except NameError: + should_end = False + if should_end: + self.end_section(section) + self.end_spreadsheet() diff --git a/tests/test_reports_spreadsheet.py b/tests/test_reports_spreadsheet.py new file mode 100644 index 0000000..a6e4be0 --- /dev/null +++ b/tests/test_reports_spreadsheet.py @@ -0,0 +1,79 @@ +"""test_reports_spreadsheet - Unit tests for spreadsheet classes""" +# Copyright © 2020 Brett Smith +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import pytest + +from . import testutil + +from conservancy_beancount.reports import core + +class BaseTester(core.BaseSpreadsheet[tuple, str]): + def __init__(self): + self.start_call = None + self.end_call = None + self.started_sections = [] + self.ended_sections = [] + self.written_rows = [] + + def section_key(self, row): + return row[0] + + def start_spreadsheet(self): + self.start_call = self.started_sections.copy() + + def start_section(self, key): + self.started_sections.append(key) + + def end_section(self, key): + self.ended_sections.append(key) + + def end_spreadsheet(self): + self.end_call = self.ended_sections.copy() + + def write_row(self, key): + self.written_rows.append(key) + + +@pytest.fixture +def spreadsheet(): + return BaseTester() + +def test_spreadsheet(spreadsheet): + rows = [(ch, ii) for ii, ch in enumerate('aabbcc', 1)] + spreadsheet.write(iter(rows)) + assert spreadsheet.written_rows == rows + assert spreadsheet.ended_sections == spreadsheet.started_sections + assert spreadsheet.started_sections == list('abc') + assert spreadsheet.start_call == [] + assert spreadsheet.end_call == spreadsheet.ended_sections + +def test_empty_spreadsheet(spreadsheet): + empty_list = [] + spreadsheet.write(iter(empty_list)) + assert spreadsheet.start_call == empty_list + assert spreadsheet.end_call == empty_list + assert spreadsheet.started_sections == empty_list + assert spreadsheet.ended_sections == empty_list + assert spreadsheet.written_rows == empty_list + +def test_one_section_spreadsheet(spreadsheet): + rows = [('A', n) for n in range(1, 4)] + spreadsheet.write(iter(rows)) + assert spreadsheet.written_rows == rows + assert spreadsheet.ended_sections == spreadsheet.started_sections + assert spreadsheet.started_sections == list('A') + assert spreadsheet.start_call == [] + assert spreadsheet.end_call == spreadsheet.ended_sections