The current importers trim lots of extraneous symbols and whitespace from currency strings before passing them to Decimal(). This function takes care of all that in a single place.
153 lines
5.3 KiB
Python
153 lines
5.3 KiB
Python
import decimal
|
|
import functools
|
|
|
|
import bs4
|
|
from .. import util
|
|
|
|
STATUS_INVOICED = 'Invoice'
|
|
STATUS_PAID = 'Payment'
|
|
STATUS_REFUNDED = 'Refund'
|
|
|
|
class Invoice2017:
|
|
STANDARD_TICKET_RATE = decimal.Decimal('42.50')
|
|
DISCOUNT_TICKET_RATE = STANDARD_TICKET_RATE / 2
|
|
STANDARD_SHIRT_RATE = decimal.Decimal('25.50')
|
|
DISCOUNT_SHIRT_RATE = STANDARD_SHIRT_RATE
|
|
CURRENCY = 'USD'
|
|
|
|
@classmethod
|
|
def _elem_stripped_string(cls, elem):
|
|
return ''.join(elem.stripped_strings)
|
|
|
|
@classmethod
|
|
def _table_row_text(cls, table_elem):
|
|
for row in table_elem.find_all('tr'):
|
|
row_text = []
|
|
for cell in row.find_all(('th', 'td')):
|
|
row_text.append(cls._elem_stripped_string(cell))
|
|
try:
|
|
extra_cols = int(cell['colspan'])
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
row_text.extend(None for _ in range(extra_cols - 1))
|
|
yield row_text
|
|
|
|
def __init__(self, source_file):
|
|
soup = bs4.BeautifulSoup(source_file, 'html5lib')
|
|
for table in soup.find_all('table'):
|
|
rows_text = self._table_row_text(table)
|
|
first_row_text = next(rows_text, [])
|
|
if first_row_text[:1] == ['Number']:
|
|
handler = self._read_invoice_header
|
|
elif first_row_text == ['Description', 'Quantity', 'Price/Unit', 'Total']:
|
|
handler = self._read_invoice_items
|
|
elif first_row_text == ['Payment time', 'Reference', 'Amount']:
|
|
handler = self._read_invoice_activity
|
|
else:
|
|
continue
|
|
handler(table, first_row_text, rows_text)
|
|
self.base_data = {
|
|
'amount': self.amount,
|
|
'currency': self.CURRENCY,
|
|
'invoice_date': self.invoice_date,
|
|
'invoice_id': self.invoice_id,
|
|
'payee': self.payee,
|
|
'shirt_rate': self.shirt_rate,
|
|
'shirts_sold': self.shirts_sold,
|
|
'ticket_rate': self.ticket_rate,
|
|
'tickets_sold': self.tickets_sold,
|
|
}
|
|
# Raise an AttributeError if we didn't read any invoice activity.
|
|
self.actions
|
|
|
|
def _strpdate(self, s):
|
|
date_s = util.rejoin_slice_words(s, slice(2), ',', 2)
|
|
return util.strpdate(date_s, '%b. %d, %Y')
|
|
|
|
def _read_invoice_header(self, table, first_row_text, rows_text):
|
|
self.invoice_id = first_row_text[1]
|
|
for key, value in rows_text:
|
|
if key == 'Issue date':
|
|
self.invoice_date = self._strpdate(value)
|
|
recipient_h = table.find('th', text='Recipient')
|
|
recipient_cell = recipient_h.find_next_sibling('td')
|
|
self.payee = next(recipient_cell.stripped_strings)
|
|
|
|
def _read_invoice_items(self, table, first_row_text, rows_text):
|
|
self.amount = decimal.Decimal(0)
|
|
self.tickets_sold = decimal.Decimal(0)
|
|
self.ticket_rate = self.STANDARD_TICKET_RATE
|
|
self.shirts_sold = decimal.Decimal(0)
|
|
self.shirt_rate = self.STANDARD_SHIRT_RATE
|
|
for description, qty, unit_price, total in rows_text:
|
|
if description.startswith('Ticket - '):
|
|
self.tickets_sold += 1
|
|
elif description.startswith('T-Shirt - '):
|
|
self.shirts_sold += 1
|
|
elif description.startswith('Early Bird ('):
|
|
self.ticket_rate = self.DISCOUNT_TICKET_RATE
|
|
if qty:
|
|
self.amount += util.parse_currency_dec(total)
|
|
|
|
def _read_invoice_activity(self, table, first_row_text, rows_text):
|
|
self.actions = [{
|
|
'date': self.invoice_date,
|
|
'status': STATUS_INVOICED,
|
|
}]
|
|
for timestamp, description, amount in rows_text:
|
|
if description.startswith('Paid '):
|
|
last_stripe_id = util.rslice_words(description, 1, limit=1)
|
|
action = {
|
|
'payment_id': last_stripe_id,
|
|
'status': STATUS_PAID,
|
|
}
|
|
else:
|
|
# Refund handling could go here, if we need it.
|
|
continue
|
|
action['date'] = self._strpdate(timestamp)
|
|
action['stripe_id'] = last_stripe_id
|
|
self.actions.append(action)
|
|
|
|
def __iter__(self):
|
|
for action in self.actions:
|
|
data = self.base_data.copy()
|
|
data.update(action)
|
|
yield data
|
|
|
|
|
|
@functools.lru_cache(5)
|
|
def _parse_invoice(parser_class, source_file):
|
|
try:
|
|
return parser_class(source_file)
|
|
except AttributeError:
|
|
return None
|
|
|
|
class ImporterBase:
|
|
@classmethod
|
|
def _parse_invoice(cls, source_file):
|
|
return _parse_invoice(cls.INVOICE_CLASS, source_file)
|
|
|
|
@classmethod
|
|
def can_import(cls, source_file):
|
|
return cls._parse_invoice(source_file) is not None
|
|
|
|
def __init__(self, source_file):
|
|
self.invoice = self._parse_invoice(source_file)
|
|
|
|
def __iter__(self):
|
|
for entry in self.invoice:
|
|
if entry['status'] == self.YIELD_STATUS:
|
|
yield entry
|
|
|
|
|
|
class Invoice2017Importer(ImporterBase):
|
|
TEMPLATE_KEY = 'template nbpy2017 invoice'
|
|
INVOICE_CLASS = Invoice2017
|
|
YIELD_STATUS = STATUS_INVOICED
|
|
|
|
|
|
class Payment2017Importer(ImporterBase):
|
|
TEMPLATE_KEY = 'template nbpy2017 payment'
|
|
INVOICE_CLASS = Invoice2017
|
|
YIELD_STATUS = STATUS_PAID
|