loaders: Add OXRAPIRequest.
This commit is contained in:
parent
7912a37074
commit
10b0a818d7
2 changed files with 152 additions and 0 deletions
|
@ -1,3 +1,8 @@
|
|||
import cgi
|
||||
import io
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
|
||||
class LoaderError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -25,3 +30,48 @@ class FileCache:
|
|||
return path.open()
|
||||
except FileNotFoundError as error:
|
||||
raise LoaderNoDataError(path) from error
|
||||
|
||||
|
||||
class OXRAPIRequest:
|
||||
DEFAULT_API_ROOT = 'https://openexchangerates.org/api/'
|
||||
DEFAULT_RESPONSE_ENCODING = 'utf-8'
|
||||
|
||||
def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen):
|
||||
self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root
|
||||
self.app_id = app_id
|
||||
self.open_url = open_func
|
||||
|
||||
def _get_response_encoding(self, response, default=None):
|
||||
try:
|
||||
content_type = response.getheader('Content-Type', 'application/json')
|
||||
_, ct_options = cgi.parse_header(content_type)
|
||||
encoding = ct_options['charset']
|
||||
except (KeyError, ValueError):
|
||||
encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default
|
||||
return encoding
|
||||
|
||||
def _raw_query(self, url_tail, params):
|
||||
url = '{}?{}'.format(
|
||||
urllib.parse.urljoin(self.api_root, url_tail),
|
||||
urllib.parse.urlencode(params),
|
||||
)
|
||||
response = self.open_url(url)
|
||||
status_code = response.status
|
||||
encoding = self._get_response_encoding(response)
|
||||
response_body = io.TextIOWrapper(response, encoding=encoding)
|
||||
if 200 <= status_code < 203:
|
||||
return response_body
|
||||
elif status_code == 404 or status_code == 410:
|
||||
exc_class = LoaderNoDataError
|
||||
elif status_code >= 500:
|
||||
exc_class = LoaderSourceError
|
||||
else:
|
||||
exc_class = LoaderBadRequestError
|
||||
with response_body:
|
||||
raise exc_class(url, response_body.read(64 * 1024))
|
||||
|
||||
def historical(self, date, base):
|
||||
return self._raw_query(
|
||||
'historical/{}.json'.format(date.isoformat()),
|
||||
{'app_id': self.app_id, 'base': base},
|
||||
)
|
||||
|
|
102
tests/test_OXRAPIRequest.py
Normal file
102
tests/test_OXRAPIRequest.py
Normal file
|
@ -0,0 +1,102 @@
|
|||
import datetime
|
||||
import http.client
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import urllib.parse
|
||||
|
||||
import pytest
|
||||
import oxrlib.loaders
|
||||
|
||||
APPID_CHARS = string.ascii_letters + string.digits
|
||||
RANDOM_APPID = ''.join(random.choice(APPID_CHARS) for _ in range(32))
|
||||
API_ROOT = 'http://[100::]/oxrlibtest/'
|
||||
API_ROOT_PATH = urllib.parse.urlsplit(API_ROOT).path
|
||||
|
||||
class FakeResponse:
|
||||
debuglevel = 0
|
||||
version = 11
|
||||
|
||||
def __init__(self, status_code, reason=None, body=None, headers=None, encoding='utf-16'):
|
||||
if reason is None:
|
||||
reason = http.client.responses[status_code]
|
||||
if body is None:
|
||||
body = json.dumps(reason)
|
||||
if headers is None:
|
||||
headers = {
|
||||
'Content-Type': 'application/json; charset={}'.format(encoding),
|
||||
'Content-Length': str(len(body)),
|
||||
}
|
||||
self.status = status_code
|
||||
self.reason = reason
|
||||
read_fd, write_fd = os.pipe()
|
||||
with open(write_fd, 'w', encoding=encoding) as body_file:
|
||||
print('\ufeff', body, sep='', file=body_file)
|
||||
self.fp = open(read_fd, 'rb')
|
||||
self.headers = headers
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.fp, name)
|
||||
|
||||
def getheader(self, name, default=None):
|
||||
return self.headers.get(name, default)
|
||||
|
||||
def getheaders(self):
|
||||
return list(self.headers.itervalues())
|
||||
|
||||
|
||||
class FakeOpener:
|
||||
def __init__(self, response):
|
||||
self.response = response
|
||||
self.call_list = []
|
||||
|
||||
def __call__(self, url, **kwargs):
|
||||
self.call_list.append((url, kwargs))
|
||||
return self.response
|
||||
|
||||
def call_count(self):
|
||||
return len(self.call_list)
|
||||
|
||||
def last_called_url(self):
|
||||
return self.call_list[-1][0]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_client():
|
||||
return oxrlib.loaders.OXRAPIRequest(RANDOM_APPID, API_ROOT)
|
||||
|
||||
@pytest.fixture
|
||||
def any_date():
|
||||
return datetime.date.today() - datetime.timedelta(days=730 - random.randint(0, 365))
|
||||
|
||||
@pytest.mark.parametrize('base', ['USD', 'JPY'])
|
||||
def test_success(api_client, any_date, base):
|
||||
body = "Good Test"
|
||||
opener = FakeOpener(FakeResponse(200, body))
|
||||
api_client.open_url = opener
|
||||
response = api_client.historical(any_date, base)
|
||||
assert opener.call_count() == 1
|
||||
urlparts = urllib.parse.urlsplit(opener.last_called_url())
|
||||
assert urlparts.path == '{}historical/{}.json'.format(API_ROOT_PATH, any_date.isoformat())
|
||||
params = urllib.parse.parse_qs(urlparts.query)
|
||||
assert params['base'] == [base]
|
||||
assert response.read() == (json.dumps(body) + "\n")
|
||||
|
||||
@pytest.mark.parametrize('status_code,expect_exctype', [
|
||||
(400, oxrlib.loaders.LoaderBadRequestError),
|
||||
(403, oxrlib.loaders.LoaderBadRequestError),
|
||||
(404, oxrlib.loaders.LoaderNoDataError),
|
||||
(410, oxrlib.loaders.LoaderNoDataError),
|
||||
(500, oxrlib.loaders.LoaderSourceError),
|
||||
])
|
||||
def test_failure(api_client, any_date, status_code, expect_exctype):
|
||||
opener = FakeOpener(FakeResponse(status_code))
|
||||
api_client.open_url = opener
|
||||
try:
|
||||
response = api_client.historical(any_date, 'USD')
|
||||
except expect_exctype:
|
||||
pass
|
||||
else:
|
||||
assert False, "got response: " + response.read()
|
Loading…
Reference in a new issue