ledger: Only display accounts requested with --account.

Now that we're accepting classifications, it's possible to specify account
options that select some but not all accounts at the same level of the
hierarchy. This commit tracks requested account names separately from sheet
names to do that correctly.
This commit is contained in:
Brett Smith 2020-07-20 13:13:22 -04:00
parent aaa26e9e61
commit 52e7f3a221
3 changed files with 120 additions and 42 deletions

View file

@ -115,19 +115,54 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
def __init__(self, def __init__(self,
start_date: datetime.date, start_date: datetime.date,
stop_date: datetime.date, stop_date: datetime.date,
sheet_names: Optional[Sequence[str]]=None, accounts: Optional[Sequence[str]]=None,
rt_wrapper: Optional[rtutil.RT]=None, rt_wrapper: Optional[rtutil.RT]=None,
sheet_size: Optional[int]=None, sheet_size: Optional[int]=None,
) -> None: ) -> None:
if sheet_names is None:
sheet_names = list(self.ACCOUNT_COLUMNS)
if sheet_size is None: if sheet_size is None:
sheet_size = self.SHEET_SIZE sheet_size = self.SHEET_SIZE
super().__init__(rt_wrapper) super().__init__(rt_wrapper)
self.date_range = ranges.DateRange(start_date, stop_date) self.date_range = ranges.DateRange(start_date, stop_date)
self.required_sheet_names = sheet_names
self.sheet_size = sheet_size self.sheet_size = sheet_size
if accounts is None:
self.accounts = set(data.Account.iter_accounts())
self.required_sheet_names = list(self.ACCOUNT_COLUMNS)
else:
self.accounts = set()
self.required_sheet_names = []
for acct_spec in accounts:
subaccounts = frozenset(data.Account.iter_accounts_by_hierarchy(acct_spec))
if subaccounts:
self.accounts.update(subaccounts)
self._require_sheet(acct_spec)
else:
account_roots_map = collections.defaultdict(list)
for account in data.Account.iter_accounts_by_classification(acct_spec):
self.accounts.add(account)
account_roots_map[account.root_part()].append(account)
if not account_roots_map:
raise ValueError("unknown account name or classification", acct_spec)
for root_part, accounts in account_roots_map.items():
start_count = min(account.count_parts() for account in accounts)
for count in range(start_count, 1, -1):
target = accounts[0].root_part(count)
if all(acct.root_part(count) == target for acct in accounts):
self._require_sheet(target)
break
else:
self._require_sheet(root_part)
def _require_sheet(self, new_sheet: str) -> None:
for index, sheet in enumerate(self.required_sheet_names):
if new_sheet == sheet:
break
elif new_sheet.startswith(sheet):
self.required_sheet_names.insert(index, new_sheet)
break
else:
self.required_sheet_names.append(new_sheet)
def init_styles(self) -> None: def init_styles(self) -> None:
super().init_styles() super().init_styles()
self.amount_column = self.column_style(1.2) self.amount_column = self.column_style(1.2)
@ -349,16 +384,17 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
self.account_groups = dict(related_cls.group_by_account( self.account_groups = dict(related_cls.group_by_account(
post for post in rows if post.meta.date < self.date_range.stop post for post in rows if post.meta.date < self.date_range.stop
)) ))
for empty_acct in self.accounts.difference(self.account_groups):
self.account_groups[empty_acct] = related_cls()
self.write_balance_sheet() self.write_balance_sheet()
tally_by_account_iter = ( tally_by_account_iter = (
(account, len(related)) (account, len(self.account_groups[account]))
for account, related in self.account_groups.items() for account in self.accounts
) )
tally_by_account = { tally_by_account = {
# 3 for the rows generated by start_section+end_section # 3 for the rows generated by start_section+end_section
account: count + 3 account: count + 3
for account, count in tally_by_account_iter for account, count in tally_by_account_iter
if count or account.keeps_balance()
} }
sheet_names = self.plan_sheets( sheet_names = self.plan_sheets(
tally_by_account, self.required_sheet_names, self.sheet_size, tally_by_account, self.required_sheet_names, self.sheet_size,
@ -373,7 +409,7 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
postings = self.account_groups[account] postings = self.account_groups[account]
if postings: if postings:
super().write(postings) super().write(postings)
elif account.keeps_balance() and account.is_open_on_date(self.date_range.start): elif account.is_open_on_date(self.date_range.start):
self.start_section(account) self.start_section(account)
self.end_section(account) self.end_section(account)
for index in range(using_sheet_index + 1, len(sheet_names)): for index in range(using_sheet_index + 1, len(sheet_names)):
@ -449,9 +485,9 @@ metadata to match. A single ticket number is a shortcut for
'Income', 'Income',
'Expenses', 'Expenses',
'Assets:Receivable', 'Assets:Receivable',
'Liabilities:Payable',
'Assets:Prepaid', 'Assets:Prepaid',
'Liabilities:UnearnedIncome', 'Liabilities:UnearnedIncome',
'Liabilities:Payable',
] ]
else: else:
args.accounts = list(LedgerODS.ACCOUNT_COLUMNS) args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
@ -498,33 +534,26 @@ def main(arglist: Optional[Sequence[str]]=None,
returncode |= ReturnFlag.LOAD_ERRORS returncode |= ReturnFlag.LOAD_ERRORS
data.Account.load_from_books(entries, options) data.Account.load_from_books(entries, options)
accounts: Set[data.Account] = set() postings = data.Posting.from_entries(entries)
sheet_names: Dict[str, None] = collections.OrderedDict()
for acct_arg in args.accounts:
for account in data.Account.iter_accounts(acct_arg):
accounts.add(account)
if not account.is_under(*sheet_names):
new_sheet = account.is_under(*LedgerODS.ACCOUNT_COLUMNS)
assert new_sheet is not None
sheet_names[new_sheet] = None
postings = (post for post in data.Posting.from_entries(entries)
if post.account in accounts)
for search_term in args.search_terms: for search_term in args.search_terms:
postings = search_term.filter_postings(postings) postings = search_term.filter_postings(postings)
rt_wrapper = config.rt_wrapper() rt_wrapper = config.rt_wrapper()
if rt_wrapper is None: if rt_wrapper is None:
logger.warning("could not initialize RT client; spreadsheet links will be broken") logger.warning("could not initialize RT client; spreadsheet links will be broken")
report = LedgerODS( try:
args.start_date, report = LedgerODS(
args.stop_date, args.start_date,
list(sheet_names), args.stop_date,
rt_wrapper, args.accounts,
args.sheet_size, rt_wrapper,
) args.sheet_size,
)
except ValueError as error:
logger.error("%s: %r", *error.args)
return 2
report.write(postings) report.write(postings)
if not report.account_groups: if not any(report.account_groups.values()):
logger.warning("no matching postings found to report") logger.warning("no matching postings found to report")
returncode |= ReturnFlag.NOTHING_TO_REPORT returncode |= ReturnFlag.NOTHING_TO_REPORT

View file

@ -1,6 +1,10 @@
2018-01-01 open Equity:OpeningBalance 2018-01-01 open Equity:OpeningBalance
2018-01-01 open Assets:Checking 2018-01-01 open Assets:Checking
classification: "Cash" classification: "Cash"
2018-01-01 open Assets:PayPal
classification: "Cash"
2018-01-01 open Assets:Prepaid
classification: "Prepaid expenses"
2018-01-01 open Assets:Receivable:Accounts 2018-01-01 open Assets:Receivable:Accounts
classification: "Accounts receivable" classification: "Accounts receivable"
2018-01-01 open Expenses:Other 2018-01-01 open Expenses:Other
@ -11,6 +15,8 @@
classification: "Accounts payable" classification: "Accounts payable"
2018-01-01 open Liabilities:Payable:Accounts 2018-01-01 open Liabilities:Payable:Accounts
classification: "Accounts payable" classification: "Accounts payable"
2018-01-01 open Liabilities:UnearnedIncome
classification: "Unearned income"
2018-02-28 * "Opening balance" 2018-02-28 * "Opening balance"
Equity:OpeningBalance -10,000 USD Equity:OpeningBalance -10,000 USD

View file

@ -46,10 +46,15 @@ DEFAULT_REPORT_SHEETS = [
'Equity', 'Equity',
'Assets:Receivable', 'Assets:Receivable',
'Liabilities:Payable', 'Liabilities:Payable',
'Assets:PayPal',
'Assets', 'Assets',
'Liabilities', 'Liabilities',
] ]
PROJECT_REPORT_SHEETS = DEFAULT_REPORT_SHEETS[:6] PROJECT_REPORT_SHEETS = DEFAULT_REPORT_SHEETS[:5] + [
'Assets:Prepaid',
'Liabilities:UnearnedIncome',
'Liabilities:Payable',
]
del PROJECT_REPORT_SHEETS[3] del PROJECT_REPORT_SHEETS[3]
OVERSIZE_RE = re.compile( OVERSIZE_RE = re.compile(
r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$' r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
@ -107,7 +112,7 @@ class ExpectedPostings(core.RelatedPostings):
else: else:
return return
closing_bal = norm_func(expect_posts.balance_at_cost()) closing_bal = norm_func(expect_posts.balance_at_cost())
if account.is_under('Assets', 'Equity', 'Liabilities'): if account.is_under('Assets', 'Liabilities'):
opening_row = testutil.ODSCell.from_row(next(rows)) opening_row = testutil.ODSCell.from_row(next(rows))
assert opening_row[0].value == start_date assert opening_row[0].value == start_date
assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0') assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
@ -125,7 +130,8 @@ class ExpectedPostings(core.RelatedPostings):
assert next(cells).value == norm_func(expected.at_cost().number) assert next(cells).value == norm_func(expected.at_cost().number)
closing_row = testutil.ODSCell.from_row(next(rows)) closing_row = testutil.ODSCell.from_row(next(rows))
assert closing_row[0].value == end_date assert closing_row[0].value == end_date
assert closing_row[4].text == closing_bal.format(None, empty='$0.00', sep='\0') empty = '$0.00' if expect_posts else '0'
assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
def get_sheet_names(ods): def get_sheet_names(ods):
@ -238,22 +244,26 @@ def test_plan_sheets_full_split_required(caplog):
(STOP_DATE, STOP_DATE.replace(month=12)), (STOP_DATE, STOP_DATE.replace(month=12)),
]) ])
def test_date_range_report(ledger_entries, start_date, stop_date): def test_date_range_report(ledger_entries, start_date, stop_date):
postings = list(data.Posting.from_entries(ledger_entries)) postings = list(data.Posting.from_entries(iter(ledger_entries)))
report = ledger.LedgerODS(start_date, stop_date) with clean_account_meta():
report.write(iter(postings)) data.Account.load_openings_and_closings(iter(ledger_entries))
report = ledger.LedgerODS(start_date, stop_date)
report.write(iter(postings))
for _, expected in ExpectedPostings.group_by_account(postings): for _, expected in ExpectedPostings.group_by_account(postings):
expected.check_report(report.document, start_date, stop_date) expected.check_report(report.document, start_date, stop_date)
@pytest.mark.parametrize('sheet_names', [ @pytest.mark.parametrize('accounts', [
('Income', 'Expenses'), ('Income', 'Expenses'),
('Assets:Receivable', 'Liabilities:Payable'), ('Assets:Receivable', 'Liabilities:Payable'),
]) ])
def test_account_names_report(ledger_entries, sheet_names): def test_account_names_report(ledger_entries, accounts):
postings = list(data.Posting.from_entries(ledger_entries)) postings = list(data.Posting.from_entries(iter(ledger_entries)))
report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names) with clean_account_meta():
report.write(iter(postings)) data.Account.load_openings_and_closings(iter(ledger_entries))
report = ledger.LedgerODS(START_DATE, STOP_DATE, accounts=accounts)
report.write(iter(postings))
for key, expected in ExpectedPostings.group_by_account(postings): for key, expected in ExpectedPostings.group_by_account(postings):
should_find = key.startswith(sheet_names) should_find = key.startswith(accounts)
try: try:
expected.check_report(report.document, START_DATE, STOP_DATE) expected.check_report(report.document, START_DATE, STOP_DATE)
except NotFound: except NotFound:
@ -280,6 +290,7 @@ def test_main(ledger_entries):
'-b', START_DATE.isoformat(), '-b', START_DATE.isoformat(),
'-e', STOP_DATE.isoformat(), '-e', STOP_DATE.isoformat(),
]) ])
output.seek(0)
assert not errors.getvalue() assert not errors.getvalue()
assert retcode == 0 assert retcode == 0
ods = odf.opendocument.load(output) ods = odf.opendocument.load(output)
@ -304,7 +315,10 @@ def test_main_account_limit(ledger_entries, acct_arg):
assert get_sheet_names(ods) == ['Balance', 'Liabilities'] assert get_sheet_names(ods) == ['Balance', 'Liabilities']
postings = data.Posting.from_entries(ledger_entries) postings = data.Posting.from_entries(ledger_entries)
for account, expected in ExpectedPostings.group_by_account(postings): for account, expected in ExpectedPostings.group_by_account(postings):
should_find = account.startswith('Liabilities') if account == 'Liabilities:UnearnedIncome':
should_find = acct_arg == 'Liabilities'
else:
should_find = account.startswith('Liabilities')
try: try:
expected.check_report(ods, START_DATE, STOP_DATE) expected.check_report(ods, START_DATE, STOP_DATE)
except NotFound: except NotFound:
@ -312,6 +326,26 @@ def test_main_account_limit(ledger_entries, acct_arg):
else: else:
assert should_find assert should_find
def test_main_account_classification_splits_hierarchy(ledger_entries):
retcode, output, errors = run_main([
'-a', 'Cash',
'-b', START_DATE.isoformat(),
'-e', STOP_DATE.isoformat(),
])
assert not errors.getvalue()
assert retcode == 0
ods = odf.opendocument.load(output)
assert get_sheet_names(ods) == ['Balance', 'Assets']
postings = data.Posting.from_entries(ledger_entries)
for account, expected in ExpectedPostings.group_by_account(postings):
should_find = (account == 'Assets:Checking' or account == 'Assets:PayPal')
try:
expected.check_report(ods, START_DATE, STOP_DATE)
except NotFound:
assert not should_find, f"{account} not found in report"
else:
assert should_find, f"{account} in report but should be excluded"
@pytest.mark.parametrize('project,start_date,stop_date', [ @pytest.mark.parametrize('project,start_date,stop_date', [
('eighteen', START_DATE, MID_DATE.replace(day=30)), ('eighteen', START_DATE, MID_DATE.replace(day=30)),
('nineteen', MID_DATE, STOP_DATE), ('nineteen', MID_DATE, STOP_DATE),
@ -334,6 +368,15 @@ def test_main_project_report(ledger_entries, project, start_date, stop_date):
for _, expected in ExpectedPostings.group_by_account(related): for _, expected in ExpectedPostings.group_by_account(related):
expected.check_report(ods, start_date, stop_date) expected.check_report(ods, start_date, stop_date)
@pytest.mark.parametrize('arg', [
'Assets:NoneSuchBank',
'Funny money',
])
def test_main_invalid_account(caplog, arg):
retcode, output, errors = run_main(['-a', arg])
assert retcode == 2
assert any(log.message.endswith(f': {arg!r}') for log in caplog.records)
def test_main_no_postings(caplog): def test_main_no_postings(caplog):
retcode, output, errors = run_main(['NonexistentProject']) retcode, output, errors = run_main(['NonexistentProject'])
assert retcode == 24 assert retcode == 24