
This makes it easier to build formulas based on classification. Hiding it keeps the presentation neat for human readers.
868 lines
33 KiB
Python
868 lines
33 KiB
Python
"""ledger.py - General ledger report from Beancount
|
|
|
|
This tool produces a spreadsheet that shows postings in Beancount, organized
|
|
by account.
|
|
|
|
Specify the date range you want to report with the ``--begin`` and ``--end``
|
|
options.
|
|
|
|
Select the accounts you want to report with the ``--account`` option. You can
|
|
specify this option multiple times. The report will include at least one sheet
|
|
for each account you specify. Subaccounts will be reported on that sheet as
|
|
well.
|
|
|
|
Select the postings you want to report by passing metadata search terms in
|
|
``name=value`` format.
|
|
|
|
Run ``ledger-report --help`` for abbreviations and other options.
|
|
|
|
Examples
|
|
--------
|
|
|
|
Report all activity related to a given project::
|
|
|
|
ledger-report project=NAME
|
|
|
|
Get all Assets postings for a given month to help with reconciliation::
|
|
|
|
ledger-report -a Assets -b 2018-05-01 -e 2018-06-01
|
|
"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import collections
|
|
import datetime
|
|
import enum
|
|
import itertools
|
|
import operator
|
|
import logging
|
|
import sys
|
|
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
TextIO,
|
|
Tuple,
|
|
Union,
|
|
)
|
|
from ..beancount_types import (
|
|
Transaction,
|
|
)
|
|
|
|
from pathlib import Path
|
|
|
|
import odf.table # type:ignore[import]
|
|
|
|
from beancount.core import data as bc_data
|
|
from beancount.parser import printer as bc_printer
|
|
|
|
from . import core
|
|
from . import rewrite
|
|
from .. import books
|
|
from .. import cliutil
|
|
from .. import config as configmod
|
|
from .. import data
|
|
from .. import ranges
|
|
from .. import rtutil
|
|
|
|
PostTally = List[Tuple[int, data.Account]]
|
|
|
|
PROGNAME = 'ledger-report'
|
|
logger = logging.getLogger('conservancy_beancount.reports.ledger')
|
|
|
|
class LedgerODS(core.BaseODS[data.Posting, None]):
|
|
CORE_COLUMNS: Sequence[str] = [
|
|
'Date',
|
|
data.Metadata.human_name('entity'),
|
|
'Description',
|
|
'Original Amount',
|
|
'Booked Amount',
|
|
]
|
|
ACCOUNT_COLUMNS: Dict[str, Sequence[str]] = collections.OrderedDict([
|
|
('Income', ['project', 'rt-id', 'receipt', 'income-type', 'memo']),
|
|
('Expenses', ['project', 'rt-id', 'receipt', 'approval', 'expense-type']),
|
|
('Equity', ['project', 'rt-id']),
|
|
('Assets:Receivable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
|
|
('Liabilities:Payable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
|
|
('Assets:PayPal', ['rt-id', 'paypal-id', 'receipt', 'approval']),
|
|
('Assets', ['rt-id', 'receipt', 'approval', 'bank-statement']),
|
|
('Liabilities', ['rt-id', 'receipt', 'approval', 'bank-statement']),
|
|
])
|
|
CLASSIFICATION_COLUMN = "Account Classification"
|
|
# Excel 2003 was limited to 65,536 rows per worksheet.
|
|
# While we can probably count on all our users supporting more modern
|
|
# formats (Excel 2007 supports over 1 million rows per worksheet),
|
|
# keeping the default limit conservative seems good to avoid running into
|
|
# other limits (like the number of hyperlinks per worksheet), plus just
|
|
# better for human organization and readability.
|
|
SHEET_SIZE = 65500
|
|
|
|
def __init__(self,
|
|
start_date: datetime.date,
|
|
stop_date: datetime.date,
|
|
accounts: Optional[Sequence[str]]=None,
|
|
rt_wrapper: Optional[rtutil.RT]=None,
|
|
sheet_size: Optional[int]=None,
|
|
totals_with_entries: Optional[Sequence[str]]=None,
|
|
totals_without_entries: Optional[Sequence[str]]=None,
|
|
) -> None:
|
|
if sheet_size is None:
|
|
sheet_size = self.SHEET_SIZE
|
|
if totals_with_entries is None:
|
|
totals_with_entries = [s for s in self.ACCOUNT_COLUMNS if ':' not in s]
|
|
if totals_without_entries is None:
|
|
totals_without_entries = totals_with_entries
|
|
super().__init__(rt_wrapper)
|
|
self.date_range = ranges.DateRange(start_date, stop_date)
|
|
self.sheet_size = sheet_size
|
|
self.totals_with_entries = totals_with_entries
|
|
self.totals_without_entries = totals_without_entries
|
|
self.report_name = "Ledger"
|
|
|
|
if accounts is None:
|
|
self.accounts = set(data.Account.iter_accounts())
|
|
self.required_sheet_names = list(self.ACCOUNT_COLUMNS)
|
|
else:
|
|
self.accounts = set()
|
|
self.required_sheet_names = []
|
|
for acct_spec in accounts:
|
|
subaccounts = frozenset(data.Account.iter_accounts_by_hierarchy(acct_spec))
|
|
if subaccounts:
|
|
self.accounts.update(subaccounts)
|
|
self._require_sheet(acct_spec)
|
|
else:
|
|
account_roots_map = collections.defaultdict(list)
|
|
for account in data.Account.iter_accounts_by_classification(acct_spec):
|
|
self.accounts.add(account)
|
|
account_roots_map[account.root_part()].append(account)
|
|
if not account_roots_map:
|
|
raise ValueError("unknown account name or classification", acct_spec)
|
|
for root_part, accounts in account_roots_map.items():
|
|
start_count = min(account.count_parts() for account in accounts)
|
|
for count in range(start_count, 1, -1):
|
|
target = accounts[0].root_part(count)
|
|
if all(acct.root_part(count) == target for acct in accounts):
|
|
self._require_sheet(target)
|
|
break
|
|
else:
|
|
self._require_sheet(root_part)
|
|
|
|
def _require_sheet(self, new_sheet: str) -> None:
|
|
for index, sheet in enumerate(self.required_sheet_names):
|
|
if new_sheet == sheet:
|
|
break
|
|
elif new_sheet.startswith(sheet):
|
|
self.required_sheet_names.insert(index, new_sheet)
|
|
break
|
|
else:
|
|
self.required_sheet_names.append(new_sheet)
|
|
|
|
def init_styles(self) -> None:
|
|
super().init_styles()
|
|
self.amount_column = self.column_style(1.2)
|
|
self.default_column = self.column_style(1.5)
|
|
self.column_styles: Mapping[str, Union[str, odf.style.Style]] = {
|
|
'Account': self.column_style(2), # for TransactionODS
|
|
'Date': '',
|
|
'Description': self.column_style(2),
|
|
'Original Amount': self.amount_column,
|
|
'Booked Amount': self.amount_column,
|
|
data.Metadata.human_name('project'): self.amount_column,
|
|
data.Metadata.human_name('rt-id'): self.amount_column,
|
|
self.CLASSIFICATION_COLUMN: self.column_style(3),
|
|
}
|
|
|
|
@classmethod
|
|
def _group_tally(
|
|
cls,
|
|
tally_by_account: PostTally,
|
|
key: Callable[[data.Account], Optional[str]],
|
|
) -> Dict[str, PostTally]:
|
|
retval: Dict[str, PostTally] = collections.defaultdict(list)
|
|
for count, account in tally_by_account:
|
|
item_key = key(account)
|
|
if item_key is not None:
|
|
retval[item_key].append((count, account))
|
|
return retval
|
|
|
|
@classmethod
|
|
def _split_sheet(
|
|
cls,
|
|
tally_by_account: PostTally,
|
|
sheet_size: int,
|
|
sheet_name: str,
|
|
) -> Iterator[str]:
|
|
total = 0
|
|
for index, (count, account) in enumerate(tally_by_account):
|
|
total += count
|
|
if total > sheet_size:
|
|
break
|
|
else:
|
|
# All the accounts fit in this sheet.
|
|
yield sheet_name
|
|
return
|
|
if index == 0 and len(tally_by_account) == 1:
|
|
# With one account, we can't split any further, so warn and stop.
|
|
logger.warning(
|
|
"%s has %s rows, over size %s",
|
|
account, f'{count:,g}', f'{sheet_size:,g}',
|
|
)
|
|
yield sheet_name
|
|
return
|
|
group_func = operator.methodcaller('root_part', sheet_name.count(':') + 2)
|
|
maybe_split = cls._group_tally(tally_by_account[:index], group_func)
|
|
must_split = cls._group_tally(tally_by_account[index:], group_func)
|
|
for subkey, must_split_tally in sorted(must_split.items()):
|
|
split_names = cls._split_sheet(
|
|
maybe_split.get(subkey, []) + must_split_tally, sheet_size, subkey,
|
|
)
|
|
# We must be willing to split out at least as many sheets as there
|
|
# are accounts that didn't fit. Do that first.
|
|
yield from itertools.islice(split_names, len(must_split_tally))
|
|
# After that, we can be in one of two cases:
|
|
# 1. There is no next sheet. All the accounts, including the
|
|
# maybe_splits and must_splits, fit on planned subsheets.
|
|
# Update state to note we don't need a sheet for them anymore.
|
|
# 2. The next sheet is named `subkey`, and is planned to include
|
|
# all of our maybe_split accounts. However, we don't need to
|
|
# yield that sheet name, because those accounts already fit in
|
|
# the sheet we're planning, and it would be a needless split.
|
|
next_sheet_name = next(split_names, None)
|
|
if next_sheet_name is None:
|
|
maybe_split.pop(subkey, None)
|
|
else:
|
|
assert next_sheet_name == subkey
|
|
assert not any(split_names)
|
|
if maybe_split:
|
|
yield sheet_name
|
|
|
|
@classmethod
|
|
def plan_sheets(
|
|
cls,
|
|
tally_by_account: Mapping[data.Account, int],
|
|
base_sheets: Sequence[str],
|
|
sheet_size: int,
|
|
) -> Sequence[str]:
|
|
sorted_tally: PostTally = [
|
|
(count, account)
|
|
for account, count in tally_by_account.items()
|
|
]
|
|
sorted_tally.sort()
|
|
split_tally = cls._group_tally(
|
|
sorted_tally,
|
|
operator.methodcaller('is_under', *base_sheets),
|
|
)
|
|
return [
|
|
sheet_name
|
|
for key in base_sheets
|
|
for sheet_name in cls._split_sheet(split_tally[key], sheet_size, key)
|
|
]
|
|
|
|
# The write() override does its own sectioning by account, so this method
|
|
# goes unused.
|
|
def section_key(self, row: data.Posting) -> None:
|
|
return None
|
|
|
|
def metadata_columns_for(self, sheet_name: str) -> Sequence[str]:
|
|
columns_key = data.Account(sheet_name).is_under(*self.ACCOUNT_COLUMNS)
|
|
# columns_key must not be None because ACCOUNT_COLUMNS has an entry
|
|
# for all five root accounts.
|
|
assert columns_key is not None
|
|
return self.ACCOUNT_COLUMNS[columns_key]
|
|
|
|
def start_sheet(self, sheet_name: str) -> None:
|
|
self.use_sheet(sheet_name.replace(':', ' '))
|
|
self.metadata_columns = self.metadata_columns_for(sheet_name)
|
|
self.sheet_columns: Sequence[str] = [
|
|
*self.CORE_COLUMNS,
|
|
*(data.Metadata.human_name(meta_key) for meta_key in self.metadata_columns),
|
|
self.CLASSIFICATION_COLUMN,
|
|
]
|
|
for col_name in self.sheet_columns:
|
|
self.sheet.addElement(odf.table.TableColumn(
|
|
stylename=self.column_styles.get(col_name, self.default_column),
|
|
))
|
|
self.sheet.lastChild.setAttribute('visibility', 'collapse')
|
|
self.add_row(*(
|
|
self.string_cell(col_name, stylename=self.style_bold)
|
|
for col_name in self.sheet_columns
|
|
))
|
|
self.lock_first_row()
|
|
|
|
def _write_total_row(self,
|
|
date: datetime.date,
|
|
description: str,
|
|
balance: core.Balance,
|
|
bal_style: Union[None, str, odf.style.Style]=None,
|
|
) -> None:
|
|
cells: List[odf.table.TableCell] = []
|
|
for column in self.CORE_COLUMNS:
|
|
if column == 'Date':
|
|
cell = self.date_cell(date, stylename=self.merge_styles(
|
|
self.style_bold, self.style_date,
|
|
))
|
|
elif column == 'Description':
|
|
cell = self.string_cell(description, stylename=self.style_bold)
|
|
elif column == 'Booked Amount':
|
|
cell = self.balance_cell(self.norm_func(balance), stylename=bal_style)
|
|
else:
|
|
cell = odf.table.TableCell()
|
|
cells.append(cell)
|
|
self.add_row(*cells)
|
|
|
|
def _report_section_balance(self, key: data.Account, date_key: str) -> None:
|
|
related = self.account_groups[key]
|
|
if date_key == 'start':
|
|
if not key.keeps_balance():
|
|
return
|
|
date = self.date_range.start
|
|
balance = related.start_bal
|
|
description = "Opening Balance"
|
|
stylename: Optional[odf.style.Style] = None
|
|
else:
|
|
date = self.date_range.stop
|
|
stylename = self.style_bottomline
|
|
if key.keeps_balance():
|
|
balance = related.stop_bal
|
|
description = "Ending Balance"
|
|
else:
|
|
balance = related.period_bal
|
|
description = "Period Total"
|
|
self._write_total_row(date, description, balance, stylename)
|
|
|
|
def write_header(self, key: data.Account) -> None:
|
|
self.add_row()
|
|
core_count = len(self.CORE_COLUMNS)
|
|
try:
|
|
classification = key.meta['classification']
|
|
except KeyError:
|
|
classification_cell = odf.table.TableCell()
|
|
else:
|
|
classification_cell = self.string_cell(
|
|
classification,
|
|
numbercolumnsspanned=len(self.sheet_columns) - core_count,
|
|
)
|
|
self.add_row(
|
|
odf.table.TableCell(),
|
|
self.string_cell(
|
|
key,
|
|
stylename=self.style_bold,
|
|
numbercolumnsspanned=core_count - 1,
|
|
),
|
|
*(odf.table.TableCell() for _ in range(2, core_count)),
|
|
classification_cell,
|
|
)
|
|
|
|
def write_balance_sheet(self) -> None:
|
|
self.use_sheet("Balance")
|
|
self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(3)))
|
|
self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(1.5)))
|
|
self.add_row(
|
|
self.string_cell("Account", stylename=self.style_bold),
|
|
self.string_cell("Balance", stylename=self.style_bold),
|
|
)
|
|
self.lock_first_row()
|
|
self.add_row()
|
|
self.add_row(self.string_cell(
|
|
f"Ledger From {self.date_range.start.isoformat()}"
|
|
f" To {self.date_range.stop.isoformat()}",
|
|
stylename=self.merge_styles(self.style_centertext, self.style_bold),
|
|
numbercolumnsspanned=2,
|
|
))
|
|
self.add_row()
|
|
for account, balance in core.account_balances(self.account_groups):
|
|
if account is core.OPENING_BALANCE_NAME:
|
|
text = f"Balance as of {self.date_range.start.isoformat()}"
|
|
style = self.merge_styles(self.style_bold, self.style_endtext)
|
|
elif account is core.ENDING_BALANCE_NAME:
|
|
text = f"Balance as of {self.date_range.stop.isoformat()}"
|
|
style = self.merge_styles(
|
|
self.style_bottomline, self.style_bold, self.style_endtext,
|
|
)
|
|
else:
|
|
text = account
|
|
style = self.style_endtext
|
|
self.add_row(
|
|
self.string_cell(text, stylename=style),
|
|
self.balance_cell(-balance, stylename=style),
|
|
)
|
|
|
|
def _account_tally(self, account: data.Account) -> int:
|
|
return len(self.account_groups[account])
|
|
|
|
def write_entries(self, account: data.Account, rows: Iterable[data.Posting]) -> None:
|
|
classification = account.meta.get('classification', '')
|
|
for row in rows:
|
|
if row.cost is None:
|
|
amount_cell = odf.table.TableCell()
|
|
else:
|
|
amount_cell = self.currency_cell(self.norm_func(row.units))
|
|
self.add_row(
|
|
self.date_cell(row.meta.date),
|
|
self.string_cell(row.meta.get('entity') or ''),
|
|
self.string_cell(row.meta.txn.narration),
|
|
amount_cell,
|
|
self.currency_cell(self.norm_func(row.at_cost())),
|
|
*(self.meta_links_cell(row.meta.report_links(key))
|
|
if key in data.LINK_METADATA
|
|
else self.string_cell(row.meta.get(key, ''))
|
|
for key in self.metadata_columns),
|
|
self.string_cell(classification),
|
|
)
|
|
|
|
def write(self, rows: Iterable[data.Posting]) -> None:
|
|
related_cls = core.PeriodPostings.with_start_date(self.date_range.start)
|
|
self.account_groups = dict(related_cls.group_by_account(
|
|
post for post in rows if post.meta.date < self.date_range.stop
|
|
))
|
|
for empty_acct in self.accounts.difference(self.account_groups):
|
|
self.account_groups[empty_acct] = related_cls()
|
|
self.write_balance_sheet()
|
|
tally_by_account_iter = (
|
|
(account, self._account_tally(account))
|
|
for account in self.accounts
|
|
)
|
|
tally_by_account = {
|
|
# 3 for the rows generated by start_section+end_section
|
|
account: count + 3
|
|
for account, count in tally_by_account_iter
|
|
}
|
|
sheet_names = self.plan_sheets(
|
|
tally_by_account, self.required_sheet_names, self.sheet_size,
|
|
)
|
|
using_sheet_index = -1
|
|
for sheet_index, account in core.sort_and_filter_accounts(
|
|
tally_by_account, sheet_names,
|
|
):
|
|
if not account.is_open_on_date(self.date_range.start):
|
|
continue
|
|
while using_sheet_index < sheet_index:
|
|
using_sheet_index += 1
|
|
self.start_sheet(sheet_names[using_sheet_index])
|
|
self.norm_func = core.normalize_amount_func(account)
|
|
postings = self.account_groups[account]
|
|
if postings:
|
|
totals_set = self.totals_with_entries
|
|
else:
|
|
totals_set = self.totals_without_entries
|
|
want_totals = account.is_under(*totals_set) is not None
|
|
if postings or want_totals:
|
|
self.write_header(account)
|
|
if want_totals:
|
|
self._report_section_balance(account, 'start')
|
|
self.write_entries(account, postings)
|
|
if want_totals:
|
|
self._report_section_balance(account, 'stop')
|
|
for index in range(using_sheet_index + 1, len(sheet_names)):
|
|
self.start_sheet(sheet_names[index])
|
|
|
|
|
|
class TransactionFilter(enum.IntFlag):
|
|
ZERO = 1
|
|
CREDIT = 2
|
|
DEBIT = 4
|
|
ALL = ZERO | CREDIT | DEBIT
|
|
|
|
@classmethod
|
|
def from_arg(cls, s: str) -> 'TransactionFilter':
|
|
try:
|
|
return cls[s.upper()]
|
|
except KeyError:
|
|
raise ValueError(f"unknown transaction filter {s!r}")
|
|
|
|
@classmethod
|
|
def post_flag(cls, post: data.Posting) -> int:
|
|
norm_func = core.normalize_amount_func(post.account)
|
|
number = norm_func(post.units.number)
|
|
if not number:
|
|
return cls.ZERO
|
|
elif number > 0:
|
|
return cls.CREDIT
|
|
else:
|
|
return cls.DEBIT
|
|
|
|
|
|
class TransactionODS(LedgerODS):
|
|
CORE_COLUMNS: Sequence[str] = [
|
|
'Date',
|
|
'Description',
|
|
'Account',
|
|
data.Metadata.human_name('entity'),
|
|
'Original Amount',
|
|
'Booked Amount',
|
|
]
|
|
METADATA_COLUMNS: Sequence[str] = [
|
|
'project',
|
|
'rt-id',
|
|
'receipt',
|
|
'check',
|
|
'invoice',
|
|
'contract',
|
|
'approval',
|
|
'paypal-id',
|
|
'check-number',
|
|
'bank-statement',
|
|
]
|
|
|
|
def __init__(self,
|
|
start_date: datetime.date,
|
|
stop_date: datetime.date,
|
|
accounts: Optional[Sequence[str]]=None,
|
|
rt_wrapper: Optional[rtutil.RT]=None,
|
|
sheet_size: Optional[int]=None,
|
|
totals_with_entries: Optional[Sequence[str]]=None,
|
|
totals_without_entries: Optional[Sequence[str]]=None,
|
|
txn_filter: int=TransactionFilter.ALL,
|
|
) -> None:
|
|
super().__init__(
|
|
start_date,
|
|
stop_date,
|
|
accounts,
|
|
rt_wrapper,
|
|
sheet_size,
|
|
totals_with_entries,
|
|
totals_without_entries,
|
|
)
|
|
self.txn_filter = txn_filter
|
|
if self.txn_filter == TransactionFilter.CREDIT:
|
|
self.report_name = "Receipts"
|
|
elif self.txn_filter == TransactionFilter.DEBIT:
|
|
self.report_name = "Disbursements"
|
|
else:
|
|
self.report_name = "Transactions"
|
|
|
|
def _wanted_txns(self, postings: Iterable[data.Posting]) -> Iterator[Transaction]:
|
|
last_txn: Optional[Transaction] = None
|
|
for post in postings:
|
|
txn = post.meta.txn
|
|
if (txn is not last_txn
|
|
and TransactionFilter.post_flag(post) & self.txn_filter):
|
|
yield txn
|
|
last_txn = txn
|
|
|
|
def metadata_columns_for(self, sheet_name: str) -> Sequence[str]:
|
|
return self.METADATA_COLUMNS
|
|
|
|
def write_balance_sheet(self) -> None:
|
|
return
|
|
|
|
def _report_section_balance(self, key: data.Account, date_key: str) -> None:
|
|
if self.txn_filter == TransactionFilter.ALL:
|
|
super()._report_section_balance(key, date_key)
|
|
elif date_key == 'stop':
|
|
balance = core.Balance(
|
|
post.at_cost()
|
|
for txn in self._wanted_txns(self.account_groups[key])
|
|
for post in data.Posting.from_txn(txn)
|
|
if post.account == key
|
|
)
|
|
self._write_total_row(self.date_range.stop, "Period Activity", balance)
|
|
|
|
def _account_tally(self, account: data.Account) -> int:
|
|
return sum(len(txn.postings)
|
|
for txn in self._wanted_txns(self.account_groups[account]))
|
|
|
|
def write_entries(self, account: data.Account, rows: Iterable[data.Posting]) -> None:
|
|
for txn in self._wanted_txns(rows):
|
|
post_list = list(data.Posting.from_txn(txn))
|
|
post_list.sort(key=lambda post: (
|
|
0 if post.account == account else 1,
|
|
-abs(post.at_cost().number),
|
|
))
|
|
postings = iter(post_list)
|
|
post1 = next(postings)
|
|
if post1.cost is None:
|
|
amount_cell = odf.table.TableCell()
|
|
else:
|
|
amount_cell = self.currency_cell(self.norm_func(post1.units))
|
|
self.add_row(
|
|
self.date_cell(txn.date),
|
|
self.string_cell(txn.narration),
|
|
self.string_cell(post1.account),
|
|
self.string_cell(post1.meta.get('entity') or ''),
|
|
amount_cell,
|
|
self.currency_cell(self.norm_func(post1.at_cost())),
|
|
*(self.meta_links_cell(post1.meta.report_links(key))
|
|
if key in data.LINK_METADATA
|
|
else self.string_cell(post1.meta.get(key, ''))
|
|
for key in self.metadata_columns),
|
|
)
|
|
for post in postings:
|
|
meta_cells: List[odf.table.TableCell] = []
|
|
for meta_key in self.metadata_columns:
|
|
try:
|
|
dup = post.meta[meta_key] is txn.meta[meta_key]
|
|
except KeyError:
|
|
dup = False
|
|
if dup:
|
|
meta_cell = odf.table.TableCell()
|
|
elif meta_key in data.LINK_METADATA:
|
|
meta_cell = self.meta_links_cell(post.meta.report_links(meta_key))
|
|
else:
|
|
meta_cell = self.string_cell(post.meta.get(meta_key, ''))
|
|
meta_cells.append(meta_cell)
|
|
if post.cost is None:
|
|
amount_cell = odf.table.TableCell()
|
|
else:
|
|
amount_cell = self.currency_cell(self.norm_func(post.units))
|
|
self.add_row(
|
|
odf.table.TableCell(),
|
|
odf.table.TableCell(),
|
|
self.string_cell(post.account),
|
|
self.string_cell(post.meta.get('entity') or ''),
|
|
amount_cell,
|
|
self.currency_cell(self.norm_func(post.at_cost())),
|
|
*meta_cells,
|
|
)
|
|
|
|
|
|
class CashReportAction(argparse.Action):
|
|
def __call__(self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: Union[Sequence[Any], str, None]=None,
|
|
option_string: Optional[str]=None,
|
|
) -> None:
|
|
namespace.txn_filter = self.const
|
|
if namespace.accounts is None:
|
|
namespace.accounts = []
|
|
namespace.accounts.append('Assets:PayPal')
|
|
namespace.accounts.append('Cash')
|
|
if namespace.stop_date is None:
|
|
namespace.stop_date = datetime.date.today()
|
|
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(prog=PROGNAME)
|
|
cliutil.add_version_argument(parser)
|
|
parser.add_argument(
|
|
'--disbursements',
|
|
action=CashReportAction,
|
|
const=TransactionFilter.DEBIT,
|
|
nargs=0,
|
|
help="""Shortcut to set all the necessary options to generate a cash
|
|
disbursements report.
|
|
""")
|
|
parser.add_argument(
|
|
'--receipts',
|
|
action=CashReportAction,
|
|
const=TransactionFilter.CREDIT,
|
|
nargs=0,
|
|
help="""Shortcut to set all the necessary options to generate a cash
|
|
receipts report.
|
|
""")
|
|
parser.add_argument(
|
|
'--begin', '--start', '-b',
|
|
dest='start_date',
|
|
metavar='DATE',
|
|
type=cliutil.date_arg,
|
|
help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
|
|
The default is one year ago.
|
|
""")
|
|
parser.add_argument(
|
|
'--end', '--stop', '-e',
|
|
dest='stop_date',
|
|
metavar='DATE',
|
|
type=cliutil.date_arg,
|
|
help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
|
|
The default is a year after the start date, or 30 days from today if the start
|
|
date was also not specified.
|
|
""")
|
|
parser.add_argument(
|
|
'--transactions', '-t',
|
|
dest='txn_filter',
|
|
metavar='TYPE',
|
|
type=TransactionFilter.from_arg,
|
|
help="""Report whole transactions rather than individual postings.
|
|
The type argument selects which type of transactions to report. Choices are
|
|
credit, debit, or all.
|
|
""")
|
|
parser.add_argument(
|
|
'--account', '-a',
|
|
dest='accounts',
|
|
metavar='ACCOUNT',
|
|
action='append',
|
|
help="""Show this account in the report. You can specify this option
|
|
multiple times. You can specify a part of the account hierarchy, or an account
|
|
classification from metadata. If not specified, the default set adapts to your
|
|
search criteria.
|
|
""")
|
|
cliutil.add_rewrite_rules_argument(parser)
|
|
parser.add_argument(
|
|
'--show-totals', '-S',
|
|
metavar='ACCOUNT',
|
|
action='append',
|
|
help="""When entries for this account appear in the report, include
|
|
account balance(s) as well. You can specify this option multiple times. Pass in
|
|
a part of the account hierarchy. The default is all accounts.
|
|
""")
|
|
parser.add_argument(
|
|
'--add-totals', '-T',
|
|
metavar='ACCOUNT',
|
|
action='append',
|
|
help="""When an account could be included in the report but does not
|
|
have any entries in the date range, include a header and account balance(s) for
|
|
it. You can specify this option multiple times. Pass in a part of the account
|
|
hierarchy. The default set adapts to your search criteria.
|
|
""")
|
|
parser.add_argument(
|
|
'--sheet-size', '--size',
|
|
metavar='SIZE',
|
|
type=int,
|
|
default=LedgerODS.SHEET_SIZE,
|
|
help="""Try to limit sheets to this many rows. The report will
|
|
automatically create new sheets to make this happen. When that's not possible,
|
|
it will issue a warning.
|
|
""")
|
|
parser.add_argument(
|
|
'--output-file', '-O',
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Write the report to this file, or stdout when PATH is `-`.
|
|
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
|
|
""")
|
|
cliutil.add_loglevel_argument(parser)
|
|
parser.add_argument(
|
|
'search_terms',
|
|
metavar='FILTER',
|
|
type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
|
|
nargs=argparse.ZERO_OR_MORE,
|
|
help="""Report on postings that match this criteria. The format is
|
|
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
|
|
metadata to match. A single ticket number is a shortcut for
|
|
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
|
|
""")
|
|
args = parser.parse_args(arglist)
|
|
if args.add_totals is None and args.search_terms:
|
|
args.add_totals = []
|
|
if args.accounts is None:
|
|
if any(term.meta_key == 'project' for term in args.search_terms):
|
|
args.accounts = [
|
|
'Income',
|
|
'Expenses',
|
|
'Assets:Receivable',
|
|
'Assets:Prepaid',
|
|
'Liabilities:UnearnedIncome',
|
|
'Liabilities:Payable',
|
|
]
|
|
else:
|
|
args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
|
|
return args
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
config: Optional[configmod.Config]=None,
|
|
) -> int:
|
|
args = parse_arguments(arglist)
|
|
cliutil.set_loglevel(logger, args.loglevel)
|
|
if config is None:
|
|
config = configmod.Config()
|
|
config.load_file()
|
|
|
|
today = datetime.date.today()
|
|
if args.start_date is None:
|
|
args.start_date = cliutil.diff_year(today, -1)
|
|
if args.stop_date is None:
|
|
args.stop_date = today + datetime.timedelta(days=30)
|
|
elif args.stop_date is None:
|
|
args.stop_date = cliutil.diff_year(args.start_date, 1)
|
|
|
|
returncode = 0
|
|
books_loader = config.books_loader()
|
|
if books_loader is None:
|
|
entries, load_errors, options = books.Loader.load_none(config.config_file_path())
|
|
returncode = cliutil.ExitCode.NoConfiguration
|
|
else:
|
|
entries, load_errors, options = books_loader.load_fy_range(args.start_date, args.stop_date)
|
|
if load_errors:
|
|
returncode = cliutil.ExitCode.BeancountErrors
|
|
elif not entries:
|
|
returncode = cliutil.ExitCode.NoDataLoaded
|
|
for error in load_errors:
|
|
bc_printer.print_error(error, file=stderr)
|
|
|
|
data.Account.load_from_books(entries, options)
|
|
postings = data.Posting.from_entries(entries)
|
|
for rewrite_path in args.rewrite_rules:
|
|
try:
|
|
ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
|
|
except ValueError as error:
|
|
logger.critical("failed loading rewrite rules from %s: %s",
|
|
rewrite_path, error.args[0])
|
|
return cliutil.ExitCode.RewriteRulesError
|
|
postings = ruleset.rewrite(postings)
|
|
for search_term in args.search_terms:
|
|
postings = search_term.filter_postings(postings)
|
|
|
|
rt_wrapper = config.rt_wrapper()
|
|
if rt_wrapper is None:
|
|
logger.warning("could not initialize RT client; spreadsheet links will be broken")
|
|
try:
|
|
if args.txn_filter is None:
|
|
report = LedgerODS(
|
|
args.start_date,
|
|
args.stop_date,
|
|
args.accounts,
|
|
rt_wrapper,
|
|
args.sheet_size,
|
|
args.show_totals,
|
|
args.add_totals,
|
|
)
|
|
else:
|
|
report = TransactionODS(
|
|
args.start_date,
|
|
args.stop_date,
|
|
args.accounts,
|
|
rt_wrapper,
|
|
args.sheet_size,
|
|
args.show_totals,
|
|
args.add_totals,
|
|
args.txn_filter,
|
|
)
|
|
except ValueError as error:
|
|
logger.error("%s: %r", *error.args)
|
|
return 2
|
|
report.set_common_properties(config.books_repo())
|
|
report.write(postings)
|
|
if not any(report.account_groups.values()):
|
|
logger.warning("no matching postings found to report")
|
|
returncode = returncode or cliutil.ExitCode.NoDataFiltered
|
|
|
|
if args.output_file is None:
|
|
out_dir_path = config.repository_path() or Path()
|
|
args.output_file = out_dir_path / '{}Report_{}_{}.ods'.format(
|
|
report.report_name,
|
|
args.start_date.isoformat(),
|
|
args.stop_date.isoformat(),
|
|
)
|
|
logger.info("Writing report to %s", args.output_file)
|
|
ods_file = cliutil.bytes_output(args.output_file, stdout)
|
|
report.save_file(ods_file)
|
|
return returncode
|
|
|
|
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
|
|
|
if __name__ == '__main__':
|
|
exit(entry_point())
|