import2ledger/import2ledger/importers/nbpy2017.py

161 lines
5.7 KiB
Python

import collections
import decimal
import functools
import re
import bs4
from .. import strparse
STATUS_INVOICED = 'Invoice'
STATUS_PAID = 'Payment'
STATUS_REFUNDED = 'Refund'
class Invoice2017:
CURRENCY = 'USD'
ITEM_RE = re.compile(r'(?:^|\()(Ticket|T-Shirt) - ')
@classmethod
def _elem_stripped_string(cls, elem):
return ''.join(elem.stripped_strings)
@classmethod
def _table_row_text(cls, table_elem):
for row in table_elem.find_all('tr'):
row_text = []
for cell in row.find_all(('th', 'td')):
row_text.append(cls._elem_stripped_string(cell))
try:
extra_cols = int(cell['colspan'])
except KeyError:
pass
else:
row_text.extend(None for _ in range(extra_cols - 1))
yield row_text
def __init__(self, source_file):
soup = bs4.BeautifulSoup(source_file, 'html5lib')
for table in soup.find_all('table'):
rows_text = self._table_row_text(table)
first_row_text = next(rows_text, [])
if first_row_text[:1] == ['Number']:
handler = self._read_invoice_header
elif first_row_text == ['Description', 'Quantity', 'Price/Unit', 'Total']:
handler = self._read_invoice_items
elif first_row_text == ['Payment time', 'Reference', 'Amount']:
handler = self._read_invoice_activity
else:
continue
handler(table, first_row_text, rows_text)
self.base_data = {
'amount': self.amount,
'currency': self.CURRENCY,
'invoice_date': self.invoice_date,
'invoice_id': self.invoice_id,
'payee': self.payee,
'shirt_price': self.shirt_price,
'shirt_rate': self.shirt_rate,
'shirts_sold': self.shirts_sold,
'ticket_price': self.ticket_price,
'ticket_rate': self.ticket_rate,
'tickets_sold': self.tickets_sold,
}
# Raise an AttributeError if we didn't read any invoice activity.
self.actions
def _strpdate(self, s):
date_s = strparse.rejoin_slice_words(s, slice(2), ',', 2).replace('Sept.', 'Sep.')
return strparse.date(date_s, '%b. %d, %Y')
def _read_invoice_header(self, table, first_row_text, rows_text):
self.invoice_id = first_row_text[1]
for key, value in rows_text:
if key == 'Issue date':
self.invoice_date = self._strpdate(value)
recipient_h = table.find('th', string='Recipient')
recipient_cell = recipient_h.find_next_sibling('td')
self.payee = next(recipient_cell.stripped_strings)
def _read_invoice_items(self, table, first_row_text, rows_text):
self.amount = decimal.Decimal(0)
self.tickets_sold = decimal.Decimal(0)
self.ticket_price = decimal.Decimal(0)
self.ticket_rate = ''
self.shirts_sold = decimal.Decimal(0)
self.shirt_price = decimal.Decimal(0)
self.shirt_rate = ''
for description, qty, unit_price, total in rows_text:
if qty is None:
continue
total = strparse.currency_decimal(total)
self.amount += total
match = self.ITEM_RE.search(description)
try:
item_type = match.group(1)
except AttributeError:
continue
qty = int(qty)
unit_price = strparse.currency_decimal(unit_price)
if item_type == 'Ticket':
if total > 0:
self.tickets_sold += qty
self.ticket_price += unit_price
self.ticket_rate = description
elif item_type == 'T-Shirt':
if description.startswith('T-shirts complimentary '):
self.shirts_sold -= qty
else:
if total > 0:
self.shirts_sold += qty
self.shirt_price += unit_price
self.shirt_rate = description
def _read_invoice_activity(self, table, first_row_text, rows_text):
self.actions = [{
'date': self.invoice_date,
'status': STATUS_INVOICED,
}]
for timestamp, description, amount in rows_text:
if description.startswith('Paid '):
last_stripe_id = strparse.rslice_words(description, 1, limit=1)
action = {
'payment_id': last_stripe_id,
'status': STATUS_PAID,
}
else:
# Refund handling could go here, if we need it.
continue
action['date'] = self._strpdate(timestamp)
action['stripe_id'] = last_stripe_id
self.actions.append(action)
def __iter__(self):
return (collections.ChainMap(act, self.base_data) for act in self.actions)
@functools.lru_cache(5)
def _parse_invoice(parser_class, source_file):
try:
return parser_class(source_file)
except AttributeError:
return None
class InvoiceImporter:
INVOICE_CLASS = Invoice2017
LEDGER_TEMPLATE_KEY_FMT = 'nbpy2017 {0} ledger entry'
@classmethod
def _parse_invoice(cls, source_file):
return _parse_invoice(cls.INVOICE_CLASS, source_file)
@classmethod
def can_import(cls, source_file):
return cls._parse_invoice(source_file) is not None
def __init__(self, source_file):
self.invoice = self._parse_invoice(source_file)
def __iter__(self):
for entry in self.invoice:
entry['ledger template'] = self.LEDGER_TEMPLATE_KEY_FMT.format(entry['status'].lower())
yield entry