csv: Support importing squared CSV spreadsheets.

See the test comment for more rationale.
This commit is contained in:
Brett Smith 2019-08-28 10:22:10 -04:00
parent 6e37753fb3
commit ab8559c75b
5 changed files with 41 additions and 5 deletions

View file

@ -43,9 +43,17 @@ class CSVImporterBase:
Reader = csv.reader Reader = csv.reader
DictReader = csv.DictReader DictReader = csv.DictReader
@classmethod
def _row_rindex(cls, row, default=None):
"""Return the index of the last cell in the row that has a value."""
for offset, value in enumerate(reversed(row), 1):
if value:
return len(row) - offset
return default
@classmethod @classmethod
def _read_header_row(cls, row): def _read_header_row(cls, row):
return {} if len(row) < cls._HEADER_MAX_LEN else None return {} if cls._row_rindex(row, -1) + 1 < cls._HEADER_MAX_LEN else None
@classmethod @classmethod
def _read_header(cls, input_file): def _read_header(cls, input_file):

View file

@ -11,10 +11,10 @@ class _DonationsImporterBase(_csv.CSVImporterBase):
@classmethod @classmethod
def _read_header_row(cls, row): def _read_header_row(cls, row):
row_len = len(row) row_rindex = cls._row_rindex(row, -1)
if row_len > 2: if row_rindex > 1:
return None return None
elif row_len == 2 and row[0] in cls.HEADER_FIELDS: elif row_rindex == 1 and row[0] in cls.HEADER_FIELDS:
return {cls.HEADER_FIELDS[row[0]]: row[1]} return {cls.HEADER_FIELDS[row[0]]: row[1]}
else: else:
return {} return {}

View file

@ -30,7 +30,7 @@ REQUIREMENTS['tests_require'] = [
setup( setup(
name='import2ledger', name='import2ledger',
description="Import different sources of financial data to Ledger", description="Import different sources of financial data to Ledger",
version='0.9.2', version='0.9.3',
author='Brett Smith', author='Brett Smith',
author_email='brettcsmith@brettcsmith.org', author_email='brettcsmith@brettcsmith.org',
license='GNU AGPLv3+', license='GNU AGPLv3+',

View file

@ -264,6 +264,8 @@
- source: AmazonAffiliateEarnings.csv - source: AmazonAffiliateEarnings.csv
importer: amazon.EarningsImporter importer: amazon.EarningsImporter
header_rows: 1
header_cols: 12
expect: expect:
- payee: Amazon - payee: Amazon
date: !!python/object/apply:datetime.date [2016, 12, 20] date: !!python/object/apply:datetime.date [2016, 12, 20]
@ -276,6 +278,8 @@
- source: Benevity2018.csv - source: Benevity2018.csv
importer: benevity.Donations2018Importer importer: benevity.Donations2018Importer
header_rows: 11
header_cols: 17
expect: expect:
- date: !!python/object/apply:datetime.date [2017, 10, 28] - date: !!python/object/apply:datetime.date [2017, 10, 28]
currency: USD currency: USD
@ -366,6 +370,8 @@
- source: Benevity2019.csv - source: Benevity2019.csv
importer: benevity.Donations2019Importer importer: benevity.Donations2019Importer
header_rows: 11
header_cols: 21
expect: expect:
- date: !!python/object/apply:datetime.date [2017, 10, 28] - date: !!python/object/apply:datetime.date [2017, 10, 28]
currency: USD currency: USD

View file

@ -1,8 +1,11 @@
import csv
import datetime import datetime
import decimal import decimal
import io
import importlib import importlib
import itertools import itertools
import pathlib import pathlib
import shutil
import re import re
import pytest import pytest
@ -28,6 +31,25 @@ class TestImporters:
with source_path.open() as source_file: with source_path.open() as source_file:
assert importer.can_import(source_file) assert importer.can_import(source_file)
@pytest.mark.parametrize('source_path,importer,header_rows,header_cols', [
(t['source'], t['importer'], t['header_rows'], t['header_cols'])
for t in test_data if t.get('header_rows')
])
def test_can_import_squared_csv(self, source_path, importer, header_rows, header_cols):
# Sometimes when we munge spreadsheets by hand (e.g., to filter by
# project) tools like LibreOffice Calc write a "squared" spreadsheet,
# where every row has the same length. This test ensures the results
# are still recognized for import.
with io.StringIO() as squared_file:
csv_writer = csv.writer(squared_file)
with source_path.open() as source_file:
for row in itertools.islice(csv.reader(source_file), header_rows):
padding = [None] * (header_cols - len(row))
csv_writer.writerow(row + padding)
shutil.copyfileobj(source_file, squared_file)
squared_file.seek(0)
assert importer.can_import(squared_file)
@pytest.mark.parametrize('source_path,import_class,expect_results', [ @pytest.mark.parametrize('source_path,import_class,expect_results', [
(t['source'], t['importer'], t['expect']) for t in test_data (t['source'], t['importer'], t['expect']) for t in test_data
]) ])