525 lines
20 KiB
Python
525 lines
20 KiB
Python
"""ledger.py - General ledger report from Beancount
|
|
|
|
This tool produces a spreadsheet that shows postings in Beancount, organized
|
|
by account.
|
|
|
|
Specify the date range you want to report with the ``--begin`` and ``--end``
|
|
options.
|
|
|
|
Select the accounts you want to report with the ``--account`` option. You can
|
|
specify this option multiple times. The report will include at least one sheet
|
|
for each account you specify. Subaccounts will be reported on that sheet as
|
|
well.
|
|
|
|
Select the postings you want to report by passing metadata search terms in
|
|
``name=value`` format.
|
|
|
|
Run ``ledger-report --help`` for abbreviations and other options.
|
|
|
|
Examples
|
|
--------
|
|
|
|
Report all activity related to a given project::
|
|
|
|
ledger-report project=NAME
|
|
|
|
Get all Assets postings for a given month to help with reconciliation::
|
|
|
|
ledger-report -a Assets -b 2018-05-01 -e 2018-06-01
|
|
"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import collections
|
|
import datetime
|
|
import enum
|
|
import itertools
|
|
import operator
|
|
import logging
|
|
import sys
|
|
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Iterable,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
TextIO,
|
|
Tuple,
|
|
Union,
|
|
)
|
|
|
|
from pathlib import Path
|
|
|
|
import odf.table # type:ignore[import]
|
|
|
|
from beancount.core import data as bc_data
|
|
from beancount.parser import printer as bc_printer
|
|
|
|
from . import core
|
|
from .. import books
|
|
from .. import cliutil
|
|
from .. import config as configmod
|
|
from .. import data
|
|
from .. import ranges
|
|
from .. import rtutil
|
|
|
|
PostTally = List[Tuple[int, data.Account]]
|
|
|
|
PROGNAME = 'ledger-report'
|
|
logger = logging.getLogger('conservancy_beancount.reports.ledger')
|
|
|
|
class LedgerODS(core.BaseODS[data.Posting, data.Account]):
|
|
CORE_COLUMNS: Sequence[str] = [
|
|
'Date',
|
|
data.Metadata.human_name('entity'),
|
|
'Description',
|
|
'Original Amount',
|
|
'Booked Amount',
|
|
]
|
|
ACCOUNT_COLUMNS: Dict[str, Sequence[str]] = collections.OrderedDict([
|
|
('Income', ['project', 'rt-id', 'receipt', 'income-type']),
|
|
('Expenses', ['project', 'rt-id', 'receipt', 'approval', 'expense-allocation']),
|
|
('Equity', ['rt-id']),
|
|
('Assets:Receivable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
|
|
('Liabilities:Payable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
|
|
('Assets:PayPal', ['rt-id', 'paypal-id', 'receipt', 'approval']),
|
|
('Assets', ['rt-id', 'receipt', 'approval', 'bank-statement']),
|
|
('Liabilities', ['rt-id', 'receipt', 'approval', 'bank-statement']),
|
|
])
|
|
# Excel 2003 was limited to 65,536 rows per worksheet.
|
|
# While we can probably count on all our users supporting more modern
|
|
# formats (Excel 2007 supports over 1 million rows per worksheet),
|
|
# keeping the default limit conservative seems good to avoid running into
|
|
# other limits (like the number of hyperlinks per worksheet), plus just
|
|
# better for human organization and readability.
|
|
SHEET_SIZE = 65000
|
|
|
|
def __init__(self,
|
|
start_date: datetime.date,
|
|
stop_date: datetime.date,
|
|
sheet_names: Optional[Sequence[str]]=None,
|
|
rt_wrapper: Optional[rtutil.RT]=None,
|
|
sheet_size: Optional[int]=None,
|
|
) -> None:
|
|
if sheet_names is None:
|
|
sheet_names = list(self.ACCOUNT_COLUMNS)
|
|
if sheet_size is None:
|
|
sheet_size = self.SHEET_SIZE
|
|
super().__init__(rt_wrapper)
|
|
self.date_range = ranges.DateRange(start_date, stop_date)
|
|
self.required_sheet_names = sheet_names
|
|
self.sheet_size = sheet_size
|
|
|
|
def init_styles(self) -> None:
|
|
super().init_styles()
|
|
self.amount_column = self.column_style(1.2)
|
|
self.default_column = self.column_style(1.5)
|
|
self.column_styles: Mapping[str, Union[str, odf.style.Style]] = {
|
|
'Date': '',
|
|
'Description': self.column_style(2),
|
|
'Original Amount': self.amount_column,
|
|
'Booked Amount': self.amount_column,
|
|
data.Metadata.human_name('project'): self.amount_column,
|
|
data.Metadata.human_name('rt-id'): self.amount_column,
|
|
}
|
|
|
|
@classmethod
|
|
def _group_tally(
|
|
cls,
|
|
tally_by_account: PostTally,
|
|
key: Callable[[data.Account], Optional[str]],
|
|
) -> Dict[str, PostTally]:
|
|
retval: Dict[str, PostTally] = collections.defaultdict(list)
|
|
for count, account in tally_by_account:
|
|
item_key = key(account)
|
|
if item_key is not None:
|
|
retval[item_key].append((count, account))
|
|
return retval
|
|
|
|
@classmethod
|
|
def _split_sheet(
|
|
cls,
|
|
tally_by_account: PostTally,
|
|
sheet_size: int,
|
|
sheet_name: str,
|
|
) -> Iterator[str]:
|
|
total = 0
|
|
for index, (count, account) in enumerate(tally_by_account):
|
|
total += count
|
|
if total > sheet_size:
|
|
break
|
|
else:
|
|
# All the accounts fit in this sheet.
|
|
yield sheet_name
|
|
return
|
|
if index == 0 and len(tally_by_account) == 1:
|
|
# With one account, we can't split any further, so warn and stop.
|
|
logger.warning(
|
|
"%s has %s rows, over size %s",
|
|
account, f'{count:,g}', f'{sheet_size:,g}',
|
|
)
|
|
yield sheet_name
|
|
return
|
|
group_func = operator.methodcaller('root_part', sheet_name.count(':') + 2)
|
|
maybe_split = cls._group_tally(tally_by_account[:index], group_func)
|
|
must_split = cls._group_tally(tally_by_account[index:], group_func)
|
|
for subkey, must_split_tally in sorted(must_split.items()):
|
|
split_names = cls._split_sheet(
|
|
maybe_split.get(subkey, []) + must_split_tally, sheet_size, subkey,
|
|
)
|
|
# We must be willing to split out at least as many sheets as there
|
|
# are accounts that didn't fit. Do that first.
|
|
yield from itertools.islice(split_names, len(must_split_tally))
|
|
# After that, we can be in one of two cases:
|
|
# 1. There is no next sheet. All the accounts, including the
|
|
# maybe_splits and must_splits, fit on planned subsheets.
|
|
# Update state to note we don't need a sheet for them anymore.
|
|
# 2. The next sheet is named `subkey`, and is planned to include
|
|
# all of our maybe_split accounts. However, we don't need to
|
|
# yield that sheet name, because those accounts already fit in
|
|
# the sheet we're planning, and it would be a needless split.
|
|
next_sheet_name = next(split_names, None)
|
|
if next_sheet_name is None:
|
|
maybe_split.pop(subkey, None)
|
|
else:
|
|
assert next_sheet_name == subkey
|
|
assert not any(split_names)
|
|
if maybe_split:
|
|
yield sheet_name
|
|
|
|
@classmethod
|
|
def plan_sheets(
|
|
cls,
|
|
tally_by_account: Mapping[data.Account, int],
|
|
base_sheets: Sequence[str],
|
|
sheet_size: int,
|
|
) -> Sequence[str]:
|
|
sorted_tally: PostTally = [
|
|
(count, account)
|
|
for account, count in tally_by_account.items()
|
|
]
|
|
sorted_tally.sort()
|
|
split_tally = cls._group_tally(
|
|
sorted_tally,
|
|
operator.methodcaller('is_under', *base_sheets),
|
|
)
|
|
return [
|
|
sheet_name
|
|
for key in base_sheets
|
|
for sheet_name in cls._split_sheet(split_tally[key], sheet_size, key)
|
|
]
|
|
|
|
def section_key(self, row: data.Posting) -> data.Account:
|
|
return row.account
|
|
|
|
def start_sheet(self, sheet_name: str) -> None:
|
|
self.use_sheet(sheet_name.replace(':', ' '))
|
|
columns_key = data.Account(sheet_name).is_under(*self.ACCOUNT_COLUMNS)
|
|
# columns_key must not be None because ACCOUNT_COLUMNS has an entry
|
|
# for all five root accounts.
|
|
assert columns_key is not None
|
|
self.metadata_columns = self.ACCOUNT_COLUMNS[columns_key]
|
|
self.sheet_columns: Sequence[str] = [
|
|
*self.CORE_COLUMNS,
|
|
*(data.Metadata.human_name(meta_key) for meta_key in self.metadata_columns),
|
|
]
|
|
for col_name in self.sheet_columns:
|
|
self.sheet.addElement(odf.table.TableColumn(
|
|
stylename=self.column_styles.get(col_name, self.default_column),
|
|
))
|
|
self.add_row(*(
|
|
self.string_cell(col_name, stylename=self.style_bold)
|
|
for col_name in self.sheet_columns
|
|
))
|
|
self.lock_first_row()
|
|
|
|
def _report_section_balance(self, key: data.Account, date_key: str) -> None:
|
|
uses_opening = key.is_under('Assets', 'Equity', 'Liabilities')
|
|
related = self.account_groups[key]
|
|
if date_key == 'start':
|
|
if not uses_opening:
|
|
return
|
|
date = self.date_range.start
|
|
balance = related.start_bal
|
|
description = "Opening Balance"
|
|
else:
|
|
date = self.date_range.stop
|
|
if uses_opening:
|
|
balance = related.stop_bal
|
|
description = "Ending Balance"
|
|
else:
|
|
balance = related.period_bal
|
|
description = "Period Total"
|
|
self.add_row(
|
|
self.date_cell(date, stylename=self.merge_styles(
|
|
self.style_bold, self.style_date,
|
|
)),
|
|
odf.table.TableCell(),
|
|
self.string_cell(description, stylename=self.style_bold),
|
|
odf.table.TableCell(),
|
|
self.balance_cell(self.norm_func(balance), stylename=self.style_bold),
|
|
)
|
|
|
|
def start_section(self, key: data.Account) -> None:
|
|
self.add_row()
|
|
self.add_row(
|
|
odf.table.TableCell(),
|
|
self.string_cell(
|
|
f"{key} Ledger"
|
|
f" From {self.date_range.start.isoformat()}"
|
|
f" To {self.date_range.stop.isoformat()}",
|
|
stylename=self.style_bold,
|
|
numbercolumnsspanned=len(self.sheet_columns) - 1,
|
|
),
|
|
)
|
|
self.norm_func = core.normalize_amount_func(key)
|
|
self._report_section_balance(key, 'start')
|
|
|
|
def end_section(self, key: data.Account) -> None:
|
|
self._report_section_balance(key, 'stop')
|
|
|
|
def write_row(self, row: data.Posting) -> None:
|
|
if row.cost is None:
|
|
amount_cell = odf.table.TableCell()
|
|
else:
|
|
amount_cell = self.currency_cell(self.norm_func(row.units))
|
|
self.add_row(
|
|
self.date_cell(row.meta.date),
|
|
self.string_cell(row.meta.get('entity') or ''),
|
|
self.string_cell(row.meta.txn.narration),
|
|
amount_cell,
|
|
self.currency_cell(self.norm_func(row.at_cost())),
|
|
*(self.meta_links_cell(row.meta.report_links(key))
|
|
if key in data.LINK_METADATA
|
|
else self.string_cell(row.meta.get(key, ''))
|
|
for key in self.metadata_columns),
|
|
)
|
|
|
|
def write_balance_sheet(self) -> None:
|
|
self.use_sheet("Balance")
|
|
self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(3)))
|
|
self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(1.5)))
|
|
self.add_row(
|
|
self.string_cell("Account", stylename=self.style_bold),
|
|
self.string_cell("Balance", stylename=self.style_bold),
|
|
)
|
|
self.lock_first_row()
|
|
self.add_row()
|
|
self.add_row(self.string_cell(
|
|
f"Ledger From {self.date_range.start.isoformat()}"
|
|
f" To {self.date_range.stop.isoformat()}",
|
|
stylename=self.merge_styles(self.style_centertext, self.style_bold),
|
|
numbercolumnsspanned=2,
|
|
))
|
|
self.add_row()
|
|
for account, balance in core.account_balances(self.account_groups):
|
|
if account is core.OPENING_BALANCE_NAME:
|
|
text = f"Balance as of {self.date_range.start.isoformat()}"
|
|
style = self.merge_styles(self.style_bold, self.style_endtext)
|
|
elif account is core.ENDING_BALANCE_NAME:
|
|
text = f"Balance as of {self.date_range.stop.isoformat()}"
|
|
style = self.merge_styles(self.style_bold, self.style_endtext)
|
|
else:
|
|
text = account
|
|
style = self.style_endtext
|
|
self.add_row(
|
|
self.string_cell(text, stylename=style),
|
|
self.balance_cell(-balance, stylename=style),
|
|
)
|
|
|
|
def write(self, rows: Iterable[data.Posting]) -> None:
|
|
related_cls = core.PeriodPostings.with_start_date(self.date_range.start)
|
|
self.account_groups = dict(related_cls.group_by_account(
|
|
post for post in rows if post.meta.date < self.date_range.stop
|
|
))
|
|
self.write_balance_sheet()
|
|
tally_by_account_iter = (
|
|
(account, len(related))
|
|
for account, related in self.account_groups.items()
|
|
)
|
|
tally_by_account = {
|
|
account: count
|
|
for account, count in tally_by_account_iter
|
|
if count
|
|
}
|
|
sheet_names = self.plan_sheets(
|
|
tally_by_account, self.required_sheet_names, self.sheet_size,
|
|
)
|
|
using_sheet_index = -1
|
|
for sheet_index, account in core.sort_and_filter_accounts(
|
|
tally_by_account, sheet_names,
|
|
):
|
|
while using_sheet_index < sheet_index:
|
|
using_sheet_index += 1
|
|
self.start_sheet(sheet_names[using_sheet_index])
|
|
super().write(self.account_groups[account])
|
|
for index in range(using_sheet_index + 1, len(sheet_names)):
|
|
self.start_sheet(sheet_names[index])
|
|
|
|
|
|
class ReturnFlag(enum.IntFlag):
|
|
LOAD_ERRORS = 1
|
|
NOTHING_TO_REPORT = 8
|
|
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(prog=PROGNAME)
|
|
cliutil.add_version_argument(parser)
|
|
parser.add_argument(
|
|
'--begin', '--start', '-b',
|
|
dest='start_date',
|
|
metavar='DATE',
|
|
type=cliutil.date_arg,
|
|
help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
|
|
The default is one year ago.
|
|
""")
|
|
parser.add_argument(
|
|
'--end', '--stop', '-e',
|
|
dest='stop_date',
|
|
metavar='DATE',
|
|
type=cliutil.date_arg,
|
|
help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
|
|
The default is a year after the start date, or 30 days from today if the start
|
|
date was also not specified.
|
|
""")
|
|
parser.add_argument(
|
|
'--account', '-a',
|
|
dest='sheet_names',
|
|
metavar='ACCOUNT',
|
|
action='append',
|
|
help="""Show this account in the report. You can specify this option
|
|
multiple times. If not specified, the default set adapts to your search
|
|
criteria.
|
|
""")
|
|
parser.add_argument(
|
|
'--sheet-size', '--size',
|
|
metavar='SIZE',
|
|
type=int,
|
|
default=LedgerODS.SHEET_SIZE,
|
|
help="""Try to limit sheets to this many rows. The report will
|
|
automatically create new sheets to make this happen. When that's not possible,
|
|
it will issue a warning.
|
|
""")
|
|
parser.add_argument(
|
|
'--output-file', '-O',
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Write the report to this file, or stdout when PATH is `-`.
|
|
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
|
|
""")
|
|
cliutil.add_loglevel_argument(parser)
|
|
parser.add_argument(
|
|
'search_terms',
|
|
metavar='FILTER',
|
|
type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
|
|
nargs=argparse.ZERO_OR_MORE,
|
|
help="""Report on postings that match this criteria. The format is
|
|
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
|
|
metadata to match. A single ticket number is a shortcut for
|
|
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
|
|
""")
|
|
args = parser.parse_args(arglist)
|
|
if args.sheet_names is None:
|
|
if any(term.meta_key == 'project' for term in args.search_terms):
|
|
args.sheet_names = [
|
|
'Income',
|
|
'Expenses',
|
|
'Assets:Receivable',
|
|
'Assets:Prepaid',
|
|
'Liabilities:UnearnedIncome',
|
|
'Liabilities:Payable',
|
|
]
|
|
else:
|
|
args.sheet_names = list(LedgerODS.ACCOUNT_COLUMNS)
|
|
return args
|
|
|
|
def diff_year(date: datetime.date, diff: int) -> datetime.date:
|
|
new_year = date.year + diff
|
|
try:
|
|
return date.replace(year=new_year)
|
|
except ValueError:
|
|
# The original date is Feb 29, which doesn't exist in the new year.
|
|
if diff < 0:
|
|
return datetime.date(new_year, 2, 28)
|
|
else:
|
|
return datetime.date(new_year, 3, 1)
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
config: Optional[configmod.Config]=None,
|
|
) -> int:
|
|
args = parse_arguments(arglist)
|
|
cliutil.set_loglevel(logger, args.loglevel)
|
|
if config is None:
|
|
config = configmod.Config()
|
|
config.load_file()
|
|
|
|
today = datetime.date.today()
|
|
if args.start_date is None:
|
|
args.start_date = diff_year(today, -1)
|
|
if args.stop_date is None:
|
|
args.stop_date = today + datetime.timedelta(days=30)
|
|
elif args.stop_date is None:
|
|
args.stop_date = diff_year(args.start_date, 1)
|
|
|
|
returncode = 0
|
|
books_loader = config.books_loader()
|
|
if books_loader is None:
|
|
entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
|
|
else:
|
|
entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
|
|
for error in load_errors:
|
|
bc_printer.print_error(error, file=stderr)
|
|
returncode |= ReturnFlag.LOAD_ERRORS
|
|
|
|
postings = data.Posting.from_entries(entries)
|
|
for search_term in args.search_terms:
|
|
postings = search_term.filter_postings(postings)
|
|
|
|
rt_wrapper = config.rt_wrapper()
|
|
if rt_wrapper is None:
|
|
logger.warning("could not initialize RT client; spreadsheet links will be broken")
|
|
report = LedgerODS(
|
|
args.start_date,
|
|
args.stop_date,
|
|
args.sheet_names,
|
|
rt_wrapper,
|
|
args.sheet_size,
|
|
)
|
|
report.write(postings)
|
|
if not report.account_groups:
|
|
logger.warning("no matching postings found to report")
|
|
returncode |= ReturnFlag.NOTHING_TO_REPORT
|
|
|
|
if args.output_file is None:
|
|
out_dir_path = config.repository_path() or Path()
|
|
args.output_file = out_dir_path / 'LedgerReport_{}_{}.ods'.format(
|
|
args.start_date.isoformat(), args.stop_date.isoformat(),
|
|
)
|
|
logger.info("Writing report to %s", args.output_file)
|
|
ods_file = cliutil.bytes_output(args.output_file, stdout)
|
|
report.save_file(ods_file)
|
|
return 0 if returncode == 0 else 16 + returncode
|
|
|
|
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
|
|
|
if __name__ == '__main__':
|
|
exit(entry_point())
|