
Only one refund was actually issued, so we can deal with that manually. But the code did its job of revealing those cases and checking we were handling them well.
141 lines
5.1 KiB
Python
141 lines
5.1 KiB
Python
import decimal
|
|
import functools
|
|
|
|
import bs4
|
|
from .. import util
|
|
|
|
class Invoice2017:
|
|
STANDARD_TICKET_RATE = decimal.Decimal('42.50')
|
|
DISCOUNT_TICKET_RATE = STANDARD_TICKET_RATE / 2
|
|
STANDARD_SHIRT_RATE = decimal.Decimal('25.50')
|
|
DISCOUNT_SHIRT_RATE = STANDARD_SHIRT_RATE
|
|
DATE_FMT = '%b. %d, %Y,'
|
|
CURRENCY = 'USD'
|
|
|
|
@classmethod
|
|
def _elem_stripped_string(cls, elem):
|
|
return ''.join(elem.stripped_strings)
|
|
|
|
@classmethod
|
|
def _table_row_text(cls, table_elem):
|
|
for row in table_elem.find_all('tr'):
|
|
row_text = []
|
|
for cell in row.find_all(('th', 'td')):
|
|
row_text.append(cls._elem_stripped_string(cell))
|
|
try:
|
|
extra_cols = int(cell['colspan'])
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
row_text.extend(None for _ in range(extra_cols - 1))
|
|
yield row_text
|
|
|
|
def __init__(self, source_file):
|
|
soup = bs4.BeautifulSoup(source_file, 'html5lib')
|
|
for table in soup.find_all('table'):
|
|
rows_text = self._table_row_text(table)
|
|
first_row_text = next(rows_text, [])
|
|
if first_row_text[:1] == ['Number']:
|
|
handler = self._read_invoice_header
|
|
elif first_row_text == ['Description', 'Quantity', 'Price/Unit', 'Total']:
|
|
handler = self._read_invoice_items
|
|
elif first_row_text == ['Payment time', 'Reference', 'Amount']:
|
|
handler = self._read_invoice_activity
|
|
else:
|
|
continue
|
|
handler(table, first_row_text, rows_text)
|
|
self.base_data = {
|
|
'amount': self.amount,
|
|
'currency': self.CURRENCY,
|
|
'invoice_id': self.invoice_id,
|
|
'payee': self.payee,
|
|
'shirt_rate': self.shirt_rate,
|
|
'shirts_sold': self.shirts_sold,
|
|
'ticket_rate': self.ticket_rate,
|
|
'tickets_sold': self.tickets_sold,
|
|
}
|
|
# Raise an AttributeError if we didn't read any invoice activity.
|
|
self.actions
|
|
|
|
def _read_invoice_header(self, table, first_row_text, rows_text):
|
|
self.invoice_id = first_row_text[1]
|
|
recipient_h = table.find('th', text='Recipient')
|
|
recipient_cell = recipient_h.find_next_sibling('td')
|
|
self.payee = next(recipient_cell.stripped_strings)
|
|
|
|
def _read_invoice_items(self, table, first_row_text, rows_text):
|
|
self.amount = decimal.Decimal(0)
|
|
self.tickets_sold = decimal.Decimal(0)
|
|
self.ticket_rate = self.STANDARD_TICKET_RATE
|
|
self.shirts_sold = decimal.Decimal(0)
|
|
self.shirt_rate = self.STANDARD_SHIRT_RATE
|
|
for description, qty, unit_price, total in rows_text:
|
|
if description.startswith('Ticket - '):
|
|
self.tickets_sold += 1
|
|
elif description.startswith('T-Shirt - '):
|
|
self.shirts_sold += 1
|
|
elif description.startswith('Early Bird ('):
|
|
self.ticket_rate = self.DISCOUNT_TICKET_RATE
|
|
if qty:
|
|
self.amount += decimal.Decimal(total.lstrip('$'))
|
|
|
|
def _read_invoice_activity(self, table, first_row_text, rows_text):
|
|
self.actions = []
|
|
date_wordcount = self.DATE_FMT.count(' ') + 1
|
|
for timestamp, description, amount in rows_text:
|
|
if description.startswith('Paid '):
|
|
last_stripe_id = description.rsplit(None, 1)[1]
|
|
action = {
|
|
'multiplier': 1,
|
|
'payment_id': last_stripe_id,
|
|
}
|
|
else:
|
|
# Refund handling could go here, if we need it.
|
|
continue
|
|
# Trim extraneous text like the time/a.m./p.m.
|
|
date_words = timestamp.split(' ', date_wordcount + 1)[:date_wordcount]
|
|
action['date'] = util.strpdate(' '.join(date_words), self.DATE_FMT)
|
|
action['stripe_id'] = last_stripe_id
|
|
self.actions.append(action)
|
|
|
|
def __iter__(self):
|
|
for action in self.actions:
|
|
data = self.base_data.copy()
|
|
data.update(action)
|
|
multiplier = data.pop('multiplier')
|
|
for key in ['amount', 'tickets_sold', 'shirts_sold']:
|
|
data[key] *= multiplier
|
|
yield data
|
|
|
|
|
|
@functools.lru_cache(5)
|
|
def _parse_invoice(parser_class, source_file):
|
|
try:
|
|
return parser_class(source_file)
|
|
except AttributeError:
|
|
return None
|
|
|
|
class ImporterBase:
|
|
@classmethod
|
|
def _parse_invoice(cls, source_file):
|
|
return _parse_invoice(cls.INVOICE_CLASS, source_file)
|
|
|
|
@classmethod
|
|
def can_import(cls, source_file):
|
|
return cls._parse_invoice(source_file) is not None
|
|
|
|
def __init__(self, source_file):
|
|
self.invoice = self._parse_invoice(source_file)
|
|
|
|
def __iter__(self):
|
|
for entry in self.invoice:
|
|
if self._should_yield_entry(entry):
|
|
yield entry
|
|
|
|
|
|
class Payment2017Importer(ImporterBase):
|
|
TEMPLATE_KEY = 'template nbpy2017 payment'
|
|
INVOICE_CLASS = Invoice2017
|
|
|
|
def _should_yield_entry(self, entry):
|
|
return entry['amount'] > 0
|