
This is like the existing attributes, but it only supports the outgoings report, so don't build it at __init__ time.
752 lines
27 KiB
Python
752 lines
27 KiB
Python
#!/usr/bin/env python3
|
||
"""accrual-report - Status reports for accruals
|
||
|
||
accrual-report checks accruals (postings under Assets:Receivable and
|
||
Liabilities:Payable) for errors and metadata consistency, and reports any
|
||
problems on stderr. Then it writes a report about the status of those
|
||
accruals.
|
||
|
||
If you run it with no arguments, it will generate an aging report in ODS format
|
||
in the current directory.
|
||
|
||
Otherwise, the typical way to run it is to pass an RT ticket number or
|
||
invoice link as an argument, to report about accruals that match those
|
||
criteria::
|
||
|
||
# Report all accruals associated with RT#1230:
|
||
accrual-report 1230
|
||
# Report all accruals with the invoice link rt:45/670.
|
||
accrual-report 45/670
|
||
# Report all accruals with the invoice link Invoice980.pdf.
|
||
accrual-report Invoice980.pdf
|
||
|
||
By default, to stay fast, accrual-report only looks for postings from the
|
||
beginning of the last fiscal year. You can search further back in history
|
||
by passing the ``--since`` argument. The argument can be a fiscal year, or
|
||
a negative number of how many years back to search::
|
||
|
||
# Search for accruals since 2016
|
||
accrual-report --since 2016 [search terms …]
|
||
# Search for accruals from the beginning of three fiscal years ago
|
||
accrual-report --since -3 [search terms …]
|
||
|
||
If you want to further limit what accruals are reported, you can match on
|
||
other metadata by passing additional arguments in ``name=value`` format.
|
||
You can pass any number of search terms. For example::
|
||
|
||
# Report accruals associated with RT#1230 and Jane Doe
|
||
accrual-report 1230 entity=Doe-Jane
|
||
|
||
accrual-report will automatically decide what kind of report to generate
|
||
from the search terms you provide and the results they return. If you pass
|
||
no search terms, it generates an aging report. If your search terms match a
|
||
single outstanding payable, it writes an outgoing approval report.
|
||
Otherwise, it writes a basic balance report. You can specify what report
|
||
type you want with the ``--report-type`` option::
|
||
|
||
# Write an outgoing approval report for all outstanding accruals for
|
||
# Jane Doe, even if there's more than one
|
||
accrual-report --report-type outgoing entity=Doe-Jane
|
||
# Write an aging report for a specific project
|
||
accrual-report --report-type aging project=ProjectName
|
||
"""
|
||
# Copyright © 2020 Brett Smith
|
||
#
|
||
# This program is free software: you can redistribute it and/or modify
|
||
# it under the terms of the GNU Affero General Public License as published by
|
||
# the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU Affero General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU Affero General Public License
|
||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
||
import argparse
|
||
import collections
|
||
import datetime
|
||
import enum
|
||
import logging
|
||
import re
|
||
import sys
|
||
import urllib.parse as urlparse
|
||
|
||
from pathlib import Path
|
||
|
||
from typing import (
|
||
cast,
|
||
Any,
|
||
BinaryIO,
|
||
Callable,
|
||
Iterable,
|
||
Iterator,
|
||
List,
|
||
Mapping,
|
||
NamedTuple,
|
||
Optional,
|
||
Sequence,
|
||
Set,
|
||
TextIO,
|
||
Tuple,
|
||
TypeVar,
|
||
Union,
|
||
)
|
||
from ..beancount_types import (
|
||
Entries,
|
||
Error,
|
||
Errors,
|
||
MetaKey,
|
||
MetaValue,
|
||
Transaction,
|
||
)
|
||
|
||
import odf.style # type:ignore[import]
|
||
import odf.table # type:ignore[import]
|
||
import rt
|
||
|
||
from beancount.parser import printer as bc_printer
|
||
|
||
from . import core
|
||
from .. import books
|
||
from .. import cliutil
|
||
from .. import config as configmod
|
||
from .. import data
|
||
from .. import filters
|
||
from .. import rtutil
|
||
|
||
PROGNAME = 'accrual-report'
|
||
|
||
CompoundAmount = TypeVar('CompoundAmount', data.Amount, core.Balance)
|
||
PostGroups = Mapping[Optional[MetaValue], 'AccrualPostings']
|
||
RTObject = Mapping[str, str]
|
||
T = TypeVar('T')
|
||
|
||
logger = logging.getLogger('conservancy_beancount.reports.accrual')
|
||
|
||
class Sentinel:
|
||
pass
|
||
|
||
|
||
class Account(NamedTuple):
|
||
name: str
|
||
aging_thresholds: Sequence[int]
|
||
|
||
|
||
class AccrualAccount(enum.Enum):
|
||
# Note the aging report uses the same order accounts are defined here.
|
||
# See AgingODS.start_spreadsheet().
|
||
RECEIVABLE = Account('Assets:Receivable', [365, 120, 90, 60])
|
||
PAYABLE = Account('Liabilities:Payable', [365, 90, 60, 30])
|
||
|
||
@classmethod
|
||
def account_names(cls) -> Iterator[str]:
|
||
return (acct.value.name for acct in cls)
|
||
|
||
@classmethod
|
||
def by_account(cls, name: data.Account) -> 'AccrualAccount':
|
||
for account in cls:
|
||
if name.is_under(account.value.name):
|
||
return account
|
||
raise ValueError(f"unrecognized account {name!r}")
|
||
|
||
@classmethod
|
||
def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
|
||
for account in cls:
|
||
account_name = account.value.name
|
||
if all(post.account.is_under(account_name) for post in related):
|
||
return account
|
||
raise ValueError("unrecognized account set in related postings")
|
||
|
||
@property
|
||
def normalize_amount(self) -> Callable[[T], T]:
|
||
return core.normalize_amount_func(self.value.name)
|
||
|
||
|
||
class AccrualPostings(core.RelatedPostings):
|
||
__slots__ = (
|
||
'accrual_type',
|
||
'end_balance',
|
||
'account',
|
||
'entity',
|
||
'invoice',
|
||
)
|
||
INCONSISTENT = Sentinel()
|
||
|
||
def __init__(self,
|
||
source: Iterable[data.Posting]=(),
|
||
*,
|
||
_can_own: bool=False,
|
||
) -> None:
|
||
super().__init__(source, _can_own=_can_own)
|
||
# The following type declarations tell mypy about values set in the for
|
||
# loop that are important enough to be referenced directly elsewhere.
|
||
self.account = self._single_item(post.account for post in self)
|
||
if isinstance(self.account, Sentinel):
|
||
self.accrual_type: Optional[AccrualAccount] = None
|
||
norm_func: Callable[[T], T] = lambda x: x
|
||
entity_pred: Callable[[data.Posting], bool] = bool
|
||
else:
|
||
self.accrual_type = AccrualAccount.by_account(self.account)
|
||
norm_func = self.accrual_type.normalize_amount
|
||
entity_pred = lambda post: norm_func(post.units).number > 0
|
||
self.entity = self._single_item(self.entities(entity_pred))
|
||
self.invoice = self._single_item(self.first_meta_links('invoice', None))
|
||
self.end_balance = norm_func(self.balance_at_cost())
|
||
|
||
def _single_item(self, seq: Iterable[T]) -> Union[T, Sentinel]:
|
||
items = iter(seq)
|
||
try:
|
||
item1 = next(items)
|
||
except StopIteration:
|
||
all_same = False
|
||
else:
|
||
all_same = all(item == item1 for item in items)
|
||
return item1 if all_same else self.INCONSISTENT
|
||
|
||
def entities(self, pred: Callable[[data.Posting], bool]=bool) -> Iterator[MetaValue]:
|
||
return filters.iter_unique(
|
||
post.meta['entity']
|
||
for post in self
|
||
if pred(post) and 'entity' in post.meta
|
||
)
|
||
|
||
def make_consistent(self) -> Iterator[Tuple[MetaValue, 'AccrualPostings']]:
|
||
account_ok = isinstance(self.account, str)
|
||
entity_ok = isinstance(self.entity, str)
|
||
# `'/' in self.invoice` is just our heuristic to ensure that the
|
||
# invoice metadata is "unique enough," and not just a placeholder
|
||
# value like "FIXME". It can be refined if needed.
|
||
invoice_ok = isinstance(self.invoice, str) and '/' in self.invoice
|
||
if account_ok and entity_ok and invoice_ok:
|
||
yield (self.invoice, self)
|
||
return
|
||
groups = collections.defaultdict(list)
|
||
for post in self:
|
||
post_invoice = self.invoice if invoice_ok else (
|
||
post.meta.get('invoice') or 'BlankInvoice'
|
||
)
|
||
post_entity = self.entity if entity_ok else (
|
||
post.meta.get('entity') or 'BlankEntity'
|
||
)
|
||
groups[f'{post.account} {post_invoice} {post_entity}'].append(post)
|
||
type_self = type(self)
|
||
for group_key, posts in groups.items():
|
||
yield group_key, type_self(posts, _can_own=True)
|
||
|
||
def is_paid(self, default: Optional[bool]=None) -> Optional[bool]:
|
||
if self.accrual_type is None:
|
||
return default
|
||
else:
|
||
return self.end_balance.le_zero()
|
||
|
||
def is_zero(self, default: Optional[bool]=None) -> Optional[bool]:
|
||
if self.accrual_type is None:
|
||
return default
|
||
else:
|
||
return self.end_balance.is_zero()
|
||
|
||
def since_last_nonzero(self) -> 'AccrualPostings':
|
||
for index, (post, balance) in enumerate(self.iter_with_balance()):
|
||
if balance.is_zero():
|
||
start_index = index
|
||
try:
|
||
empty = start_index == index
|
||
except NameError:
|
||
empty = True
|
||
return self if empty else self[start_index + 1:]
|
||
|
||
@property
|
||
def rt_id(self) -> Union[str, None, Sentinel]:
|
||
return self._single_item(self.first_meta_links('rt-id', None))
|
||
|
||
|
||
class BaseReport:
|
||
def __init__(self, out_file: TextIO) -> None:
|
||
self.out_file = out_file
|
||
self.logger = logger.getChild(type(self).__name__)
|
||
|
||
def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
|
||
raise NotImplementedError("BaseReport._report")
|
||
|
||
def run(self, groups: PostGroups) -> None:
|
||
for index, invoice in enumerate(groups):
|
||
for line in self._report(groups[invoice], index):
|
||
print(line, file=self.out_file)
|
||
|
||
|
||
class AgingODS(core.BaseODS[AccrualPostings, Optional[data.Account]]):
|
||
COLUMNS = [
|
||
'Date',
|
||
'Entity',
|
||
'Invoice Amount',
|
||
'Booked Amount',
|
||
'Project',
|
||
'Ticket',
|
||
'Invoice',
|
||
'Approval',
|
||
'Contract',
|
||
'Purchase Order',
|
||
]
|
||
COL_COUNT = len(COLUMNS)
|
||
|
||
def __init__(self,
|
||
rt_client: rt.Rt,
|
||
date: datetime.date,
|
||
logger: logging.Logger,
|
||
) -> None:
|
||
super().__init__()
|
||
self.rt_client = rt_client
|
||
self.rt_wrapper = rtutil.RT(self.rt_client)
|
||
self.date = date
|
||
self.logger = logger
|
||
|
||
def init_styles(self) -> None:
|
||
super().init_styles()
|
||
self.style_widecol = self.replace_child(
|
||
self.document.automaticstyles,
|
||
odf.style.Style,
|
||
name='WideCol',
|
||
)
|
||
self.style_widecol.setAttribute('family', 'table-column')
|
||
self.style_widecol.addElement(odf.style.TableColumnProperties(
|
||
columnwidth='1.25in',
|
||
))
|
||
|
||
def section_key(self, row: AccrualPostings) -> Optional[data.Account]:
|
||
if isinstance(row.account, str):
|
||
return row.account
|
||
else:
|
||
return None
|
||
|
||
def start_spreadsheet(self) -> None:
|
||
for accrual_type in AccrualAccount:
|
||
self.use_sheet(accrual_type.name.title())
|
||
for index in range(self.COL_COUNT):
|
||
stylename = self.style_widecol if index else ''
|
||
self.sheet.addElement(odf.table.TableColumn(stylename=stylename))
|
||
self.add_row(*(
|
||
self.string_cell(name, stylename=self.style_bold)
|
||
for name in self.COLUMNS
|
||
))
|
||
self.lock_first_row()
|
||
|
||
def start_section(self, key: Optional[data.Account]) -> None:
|
||
if key is None:
|
||
return
|
||
self.age_thresholds = list(AccrualAccount.by_account(key).value.aging_thresholds)
|
||
self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
|
||
accrual_date = self.date - datetime.timedelta(days=self.age_thresholds[-1])
|
||
acct_parts = key.slice_parts()
|
||
self.use_sheet(acct_parts[1])
|
||
self.add_row()
|
||
self.add_row(self.string_cell(
|
||
f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
|
||
f" Accrued by {accrual_date.isoformat()} Unpaid by {self.date.isoformat()}",
|
||
stylename=self.merge_styles(self.style_bold, self.style_centertext),
|
||
numbercolumnsspanned=self.COL_COUNT,
|
||
))
|
||
self.add_row()
|
||
|
||
def end_section(self, key: Optional[data.Account]) -> None:
|
||
if key is None:
|
||
return
|
||
total_balance = core.MutableBalance()
|
||
text_style = self.merge_styles(self.style_bold, self.style_endtext)
|
||
text_span = 4
|
||
last_age_text: Optional[str] = None
|
||
self.add_row()
|
||
for threshold, balance in zip(self.age_thresholds, self.age_balances):
|
||
years, days = divmod(threshold, 365)
|
||
years_text = f"{years} {'Year' if years == 1 else 'Years'}"
|
||
days_text = f"{days} Days"
|
||
if years and days:
|
||
age_text = f"{years_text} {days_text}"
|
||
elif years:
|
||
age_text = years_text
|
||
else:
|
||
age_text = days_text
|
||
if last_age_text is None:
|
||
age_range = f"Over {age_text}"
|
||
else:
|
||
age_range = f"{age_text}–{last_age_text}"
|
||
self.add_row(
|
||
self.string_cell(
|
||
f"Total Aged {age_range}: ",
|
||
stylename=text_style,
|
||
numbercolumnsspanned=text_span,
|
||
),
|
||
*(odf.table.TableCell() for _ in range(1, text_span)),
|
||
self.balance_cell(balance),
|
||
)
|
||
last_age_text = age_text
|
||
total_balance += balance
|
||
self.add_row(
|
||
self.string_cell(
|
||
"Total Unpaid: ",
|
||
stylename=text_style,
|
||
numbercolumnsspanned=text_span,
|
||
),
|
||
*(odf.table.TableCell() for _ in range(1, text_span)),
|
||
self.balance_cell(total_balance),
|
||
)
|
||
|
||
def _link_seq(self, row: AccrualPostings, key: MetaKey) -> Iterator[Tuple[str, str]]:
|
||
for href in row.all_meta_links(key):
|
||
text: Optional[str] = None
|
||
rt_ids = self.rt_wrapper.parse(href)
|
||
if rt_ids is not None:
|
||
ticket_id, attachment_id = rt_ids
|
||
if attachment_id is None:
|
||
text = f'RT#{ticket_id}'
|
||
href = self.rt_wrapper.url(ticket_id, attachment_id) or href
|
||
else:
|
||
# '..' pops the ODS filename off the link path. In other words,
|
||
# make the link relative to the directory the ODS is in.
|
||
href = f'../{href}'
|
||
if text is None:
|
||
href_path = Path(urlparse.urlparse(href).path)
|
||
text = urlparse.unquote(href_path.name)
|
||
yield (href, text)
|
||
|
||
def write_row(self, row: AccrualPostings) -> None:
|
||
age = (self.date - row[0].meta.date).days
|
||
if row.end_balance.ge_zero():
|
||
for index, threshold in enumerate(self.age_thresholds):
|
||
if age >= threshold:
|
||
self.age_balances[index] += row.end_balance
|
||
break
|
||
else:
|
||
return
|
||
raw_balance = row.balance()
|
||
if row.accrual_type is not None:
|
||
raw_balance = row.accrual_type.normalize_amount(raw_balance)
|
||
if raw_balance == row.end_balance:
|
||
amount_cell = odf.table.TableCell()
|
||
else:
|
||
amount_cell = self.balance_cell(raw_balance)
|
||
projects = {post.meta.get('project') or None for post in row}
|
||
projects.discard(None)
|
||
self.add_row(
|
||
self.date_cell(row[0].meta.date),
|
||
self.multiline_cell(row.entities()),
|
||
amount_cell,
|
||
self.balance_cell(row.end_balance),
|
||
self.multiline_cell(sorted(projects)),
|
||
self.multilink_cell(self._link_seq(row, 'rt-id')),
|
||
self.multilink_cell(self._link_seq(row, 'invoice')),
|
||
self.multilink_cell(self._link_seq(row, 'approval')),
|
||
self.multilink_cell(self._link_seq(row, 'contract')),
|
||
self.multilink_cell(self._link_seq(row, 'purchase-order')),
|
||
)
|
||
|
||
|
||
class AgingReport(BaseReport):
|
||
def __init__(self,
|
||
rt_client: rt.Rt,
|
||
out_file: BinaryIO,
|
||
date: Optional[datetime.date]=None,
|
||
) -> None:
|
||
if date is None:
|
||
date = datetime.date.today()
|
||
self.out_bin = out_file
|
||
self.logger = logger.getChild(type(self).__name__)
|
||
self.ods = AgingODS(rt_client, date, self.logger)
|
||
|
||
def run(self, groups: PostGroups) -> None:
|
||
rows: List[AccrualPostings] = []
|
||
for group in groups.values():
|
||
if group.is_zero():
|
||
# Cheap optimization: don't slice and dice groups we're not
|
||
# going to report anyway.
|
||
continue
|
||
elif group.accrual_type is None:
|
||
group = group.since_last_nonzero()
|
||
else:
|
||
# Filter out new accruals after the report date.
|
||
# e.g., cover the case that the same invoices has multiple
|
||
# postings over time, and we don't want to report too-recent
|
||
# ones.
|
||
cutoff_date = self.ods.date - datetime.timedelta(
|
||
days=group.accrual_type.value.aging_thresholds[-1],
|
||
)
|
||
group = AccrualPostings(
|
||
post for post in group.since_last_nonzero()
|
||
if post.meta.date <= cutoff_date
|
||
or group.accrual_type.normalize_amount(post.units.number) < 0
|
||
)
|
||
if group and not group.is_zero():
|
||
rows.append(group)
|
||
rows.sort(key=lambda related: (
|
||
related.account,
|
||
related[0].meta.date,
|
||
('\0'.join(related.entities())
|
||
if related.entity is related.INCONSISTENT
|
||
else related.entity),
|
||
))
|
||
self.ods.write(rows)
|
||
self.ods.save_file(self.out_bin)
|
||
|
||
|
||
class BalanceReport(BaseReport):
|
||
def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
|
||
posts = posts.since_last_nonzero()
|
||
date_s = posts[0].meta.date.strftime('%Y-%m-%d')
|
||
if index:
|
||
yield ""
|
||
yield f"{posts.invoice}:"
|
||
yield f" {posts.balance_at_cost()} outstanding since {date_s}"
|
||
|
||
|
||
class OutgoingReport(BaseReport):
|
||
def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
|
||
super().__init__(out_file)
|
||
self.rt_client = rt_client
|
||
self.rt_wrapper = rtutil.RT(rt_client)
|
||
|
||
def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
|
||
rt_ids = list(posts.first_meta_links('rt-id'))
|
||
rt_ids_count = len(rt_ids)
|
||
if rt_ids_count != 1:
|
||
raise ValueError(f"{rt_ids_count} rt-id links found")
|
||
parsed = rtutil.RT.parse(rt_ids.pop())
|
||
if parsed is None:
|
||
raise ValueError("rt-id is not a valid RT reference")
|
||
else:
|
||
return parsed
|
||
|
||
def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
|
||
posts = posts.since_last_nonzero()
|
||
try:
|
||
ticket_id, _ = self._primary_rt_id(posts)
|
||
ticket = self.rt_client.get_ticket(ticket_id)
|
||
# Note we only use this when ticket is None.
|
||
errmsg = f"ticket {ticket_id} not found"
|
||
except (ValueError, rt.RtError) as error:
|
||
ticket = None
|
||
errmsg = error.args[0]
|
||
if ticket is None:
|
||
self.logger.error(
|
||
"can't generate outgoings report for %s because no RT ticket available: %s",
|
||
posts.invoice, errmsg,
|
||
)
|
||
return
|
||
|
||
try:
|
||
rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
|
||
except (IndexError, rt.RtError):
|
||
rt_requestor = None
|
||
if rt_requestor is None:
|
||
requestor = ''
|
||
requestor_name = ''
|
||
else:
|
||
requestor_name = (
|
||
rt_requestor.get('RealName')
|
||
or ticket.get('CF.{payment-to}')
|
||
or ''
|
||
)
|
||
requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
|
||
|
||
balance_s = posts.end_balance.format(None)
|
||
raw_balance = -posts.balance()
|
||
if raw_balance != posts.end_balance:
|
||
balance_s = f'{raw_balance} ({balance_s})'
|
||
|
||
contract_links = list(posts.all_meta_links('contract'))
|
||
if contract_links:
|
||
contract_s = ' , '.join(self.rt_wrapper.iter_urls(
|
||
contract_links, missing_fmt='<BROKEN RT LINK: {}>',
|
||
))
|
||
else:
|
||
contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
|
||
projects = [v for v in posts.meta_values('project')
|
||
if isinstance(v, str)]
|
||
|
||
yield "PAYMENT FOR APPROVAL:"
|
||
yield f"REQUESTOR: {requestor}"
|
||
yield f"TOTAL TO PAY: {balance_s}"
|
||
yield f"AGREEMENT: {contract_s}"
|
||
yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
|
||
yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
|
||
yield f"PROJECT: {', '.join(projects)}"
|
||
yield "\nBEANCOUNT ENTRIES:\n"
|
||
|
||
last_txn: Optional[Transaction] = None
|
||
for post in posts:
|
||
txn = post.meta.txn
|
||
if txn is not last_txn:
|
||
last_txn = txn
|
||
txn = self.rt_wrapper.txn_with_urls(txn, '{}')
|
||
yield bc_printer.format_entry(txn)
|
||
|
||
|
||
class ReportType(enum.Enum):
|
||
AGING = AgingReport
|
||
BALANCE = BalanceReport
|
||
OUTGOING = OutgoingReport
|
||
AGE = AGING
|
||
BAL = BALANCE
|
||
OUT = OUTGOING
|
||
OUTGOINGS = OUTGOING
|
||
|
||
@classmethod
|
||
def by_name(cls, name: str) -> 'ReportType':
|
||
try:
|
||
return cls[name.upper()]
|
||
except KeyError:
|
||
raise ValueError(f"unknown report type {name!r}") from None
|
||
|
||
@classmethod
|
||
def default_for(cls, groups: PostGroups) -> 'ReportType':
|
||
if len(groups) == 1 and all(
|
||
group.accrual_type is AccrualAccount.PAYABLE
|
||
and not group.is_paid()
|
||
for group in groups.values()
|
||
):
|
||
return cls.OUTGOING
|
||
else:
|
||
return cls.BALANCE
|
||
|
||
|
||
class ReturnFlag(enum.IntFlag):
|
||
LOAD_ERRORS = 1
|
||
REPORT_ERRORS = 4
|
||
NOTHING_TO_REPORT = 8
|
||
|
||
|
||
def filter_search(postings: Iterable[data.Posting],
|
||
search_terms: Iterable[cliutil.SearchTerm],
|
||
) -> Iterable[data.Posting]:
|
||
accounts = tuple(AccrualAccount.account_names())
|
||
postings = (post for post in postings if post.account.is_under(*accounts))
|
||
for query in search_terms:
|
||
postings = query.filter_postings(postings)
|
||
return postings
|
||
|
||
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(prog=PROGNAME)
|
||
cliutil.add_version_argument(parser)
|
||
parser.add_argument(
|
||
'--report-type', '-t',
|
||
metavar='NAME',
|
||
type=ReportType.by_name,
|
||
help="""The type of report to generate, one of `aging`, `balance`, or
|
||
`outgoing`. If not specified, the default is `aging` when no search terms are
|
||
given, `outgoing` for search terms that return a single outstanding payable,
|
||
and `balance` any other time.
|
||
""")
|
||
parser.add_argument(
|
||
'--since',
|
||
metavar='YEAR',
|
||
type=int,
|
||
default=-1,
|
||
help="""How far back to search the books for related transactions.
|
||
You can either specify a fiscal year, or a negative offset from the current
|
||
fiscal year, to start loading entries from. The default is -1 (start from the
|
||
previous fiscal year).
|
||
""")
|
||
parser.add_argument(
|
||
'--output-file', '-O',
|
||
metavar='PATH',
|
||
type=Path,
|
||
help="""Write the report to this file, or stdout when PATH is `-`.
|
||
The default is stdout for the balance and outgoing reports, and a generated
|
||
filename for other reports.
|
||
""")
|
||
cliutil.add_loglevel_argument(parser)
|
||
parser.add_argument(
|
||
'search_terms',
|
||
metavar='FILTER',
|
||
type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
|
||
nargs=argparse.ZERO_OR_MORE,
|
||
help="""Report on accruals that match this criteria. The format is
|
||
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
|
||
metadata to match. A single ticket number is a shortcut for
|
||
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
|
||
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
|
||
""")
|
||
args = parser.parse_args(arglist)
|
||
if args.report_type is None and not args.search_terms:
|
||
args.report_type = ReportType.AGING
|
||
return args
|
||
|
||
def main(arglist: Optional[Sequence[str]]=None,
|
||
stdout: TextIO=sys.stdout,
|
||
stderr: TextIO=sys.stderr,
|
||
config: Optional[configmod.Config]=None,
|
||
) -> int:
|
||
if cliutil.is_main_script(PROGNAME):
|
||
global logger
|
||
logger = logging.getLogger(PROGNAME)
|
||
sys.excepthook = cliutil.ExceptHook(logger)
|
||
args = parse_arguments(arglist)
|
||
cliutil.setup_logger(logger, args.loglevel, stderr)
|
||
if config is None:
|
||
config = configmod.Config()
|
||
config.load_file()
|
||
|
||
books_loader = config.books_loader()
|
||
if books_loader is None:
|
||
entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
|
||
elif args.report_type is ReportType.AGING:
|
||
entries, load_errors, _ = books_loader.load_all()
|
||
else:
|
||
entries, load_errors, _ = books_loader.load_all(args.since)
|
||
filters.remove_opening_balance_txn(entries)
|
||
|
||
returncode = 0
|
||
postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
|
||
groups: PostGroups = dict(AccrualPostings.group_by_first_meta_link(postings, 'invoice'))
|
||
for error in load_errors:
|
||
bc_printer.print_error(error, file=stderr)
|
||
returncode |= ReturnFlag.LOAD_ERRORS
|
||
if not groups:
|
||
logger.warning("no matching entries found to report")
|
||
returncode |= ReturnFlag.NOTHING_TO_REPORT
|
||
|
||
groups = {
|
||
key: posts
|
||
for source_posts in groups.values()
|
||
for key, posts in source_posts.make_consistent()
|
||
}
|
||
if args.report_type is not ReportType.AGING:
|
||
groups = {
|
||
key: posts for key, posts in groups.items() if not posts.is_paid()
|
||
} or groups
|
||
|
||
if args.report_type is None:
|
||
args.report_type = ReportType.default_for(groups)
|
||
report: Optional[BaseReport] = None
|
||
output_path: Optional[Path] = None
|
||
if args.report_type is ReportType.AGING:
|
||
rt_client = config.rt_client()
|
||
if rt_client is None:
|
||
logger.error("unable to generate aging report: RT client is required")
|
||
else:
|
||
now = datetime.datetime.now()
|
||
if args.output_file is None:
|
||
args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
|
||
logger.info("Writing report to %s", args.output_file)
|
||
out_bin = cliutil.bytes_output(args.output_file, stdout)
|
||
report = AgingReport(rt_client, out_bin)
|
||
elif args.report_type is ReportType.OUTGOING:
|
||
rt_client = config.rt_client()
|
||
if rt_client is None:
|
||
logger.error("unable to generate outgoing report: RT client is required")
|
||
else:
|
||
out_file = cliutil.text_output(args.output_file, stdout)
|
||
report = OutgoingReport(rt_client, out_file)
|
||
else:
|
||
out_file = cliutil.text_output(args.output_file, stdout)
|
||
report = args.report_type.value(out_file)
|
||
|
||
if report is None:
|
||
returncode |= ReturnFlag.REPORT_ERRORS
|
||
else:
|
||
report.run(groups)
|
||
return 0 if returncode == 0 else 16 + returncode
|
||
|
||
if __name__ == '__main__':
|
||
exit(main())
|