historical: Add support for from_date arg.

This commit is contained in:
Brett Smith 2020-05-17 12:13:10 -04:00
parent 71893ace4d
commit 2d753c31aa
3 changed files with 146 additions and 62 deletions

View file

@ -12,9 +12,11 @@ except ImportError:
import enum34 as enum import enum34 as enum
class Formatter: class Formatter:
def __init__(self, rate, signed_currencies=(), base_fmt='#,##0.###', def __init__(self, cost_rates, price_rates=None,
signed_currencies=(), base_fmt='#,##0.###',
rate_precision=5, denomination=None): rate_precision=5, denomination=None):
self.rate = rate self.cost_rates = cost_rates
self.price_rates = price_rates
self.base_fmt = base_fmt self.base_fmt = base_fmt
self.base_fmt_noprec = base_fmt.rsplit('.', 1)[0] self.base_fmt_noprec = base_fmt.rsplit('.', 1)[0]
self.signed_currencies = set(code for code in signed_currencies self.signed_currencies = set(code for code in signed_currencies
@ -48,7 +50,7 @@ class Formatter:
def format_rate_pair(self, from_curr, to_curr): def format_rate_pair(self, from_curr, to_curr):
from_amt = 1 from_amt = 1
to_amt = self.rate.convert(from_amt, from_curr, to_curr) to_amt = self.cost_rates.convert(from_amt, from_curr, to_curr)
return "{} {} = {} {}".format( return "{} {} = {} {}".format(
self.format_rate(from_amt), from_curr, self.format_rate(from_amt), from_curr,
self.format_rate(to_amt), to_curr, self.format_rate(to_amt), to_curr,
@ -62,7 +64,7 @@ class Formatter:
) )
def format_conversion(self, from_amt, from_curr, to_curr): def format_conversion(self, from_amt, from_curr, to_curr):
to_amt = self.rate.convert(from_amt, from_curr, to_curr) to_amt = self.cost_rates.convert(from_amt, from_curr, to_curr)
return "{} = {}".format( return "{} = {}".format(
self.format_currency(from_amt, from_curr), self.format_currency(from_amt, from_curr),
self.format_currency(to_amt, to_curr), self.format_currency(to_amt, to_curr),
@ -70,6 +72,16 @@ class Formatter:
class LedgerFormatter(Formatter): class LedgerFormatter(Formatter):
COST_FMT = '{{={}}}'
PRICE_FMT = ' @ {}'
def price_rate(self, from_amt, from_curr, to_curr):
if self.price_rates is None:
rates = self.cost_rates
else:
rates = self.price_rates
return rates.convert(from_amt, from_curr, to_curr)
def can_sign_currency(self, code): def can_sign_currency(self, code):
return len(babel.numbers.get_currency_symbol(code)) == 1 return len(babel.numbers.get_currency_symbol(code)) == 1
@ -87,21 +99,42 @@ class LedgerFormatter(Formatter):
qrate = rate qrate = rate
return qrate.normalize() return qrate.normalize()
def format_rate(self, rate): def normalize_enough(self, rate, curr, from_amt, to_amt, prec=None):
return str(self.normalize_rate(rate)) if prec is None:
prec = self.rate_prec
# Starting from prec, find the least amount of precision to
# make sure from_amt converts exactly to to_amt.
for try_prec in itertools.count(prec):
try_rate = self.normalize_rate(rate, try_prec)
got_amt = self.currency_decimal(from_amt * try_rate, curr)
# If got_amt == to_amt, this is enough precision to do the
# conversion exactly, so we're done.
# If try_rate == rate, there's no more precision available, so stop.
if (got_amt == to_amt) or (try_rate == rate):
break
return try_rate
def format_ledger_rate_raw(self, rate, curr): def _pretty_rate(self, fmt, rate, curr, from_amt=None, to_amt=None):
rate_s = self.format_currency(rate, curr, currency_digits=False) if to_amt is None:
return "{{={0}}} @ {0}".format(rate_s) rate = self.normalize_rate(rate)
else:
def format_ledger_rate(self, rate, curr): rate = self.normalize_enough(rate, curr, from_amt, to_amt)
return self.format_ledger_rate_raw(self.normalize_rate(rate), curr) return fmt.format(self.format_currency(rate, curr, currency_digits=False))
def format_rate_pair(self, from_curr, to_curr): def format_rate_pair(self, from_curr, to_curr):
from_amt = 1 from_amt = 1
to_amt = self.rate.convert(from_amt, from_curr, to_curr) cost = self.cost_rates.convert(from_amt, from_curr, to_curr)
return "{} {} {}".format( price = self.price_rate(from_amt, from_curr, to_curr)
from_amt, from_curr, self.format_ledger_rate(to_amt, to_curr)) if price is None:
price_s = ''
else:
price_s = self._pretty_rate(self.PRICE_FMT, price, to_curr)
return "{} {} {}{}".format(
from_amt,
from_curr,
self._pretty_rate(self.COST_FMT, cost, to_curr),
price_s,
)
def _denomination_for(self, currency, default): def _denomination_for(self, currency, default):
if self.denomination is None: if self.denomination is None:
@ -116,22 +149,23 @@ class LedgerFormatter(Formatter):
amt_s = self.format_currency(amount, currency) amt_s = self.format_currency(amount, currency)
if denomination is None: if denomination is None:
return amt_s return amt_s
full_rate = self.rate.convert(1, currency, denomination) cost = self.cost_rates.convert(1, currency, denomination)
# Starting from self.rate_prec, find the least amount of precision to price = self.price_rate(1, currency, denomination)
# make sure the `from` amount converts exactly to the `to` amount. to_amt = self.currency_decimal(amount * cost, denomination)
to_amt = self.currency_decimal(amount * full_rate, denomination) if price is None:
for prec in itertools.count(self.rate_prec): price_s = ''
rate = self.normalize_rate(full_rate, prec) else:
got_amt = self.currency_decimal(amount * rate, denomination) price_s = self._pretty_rate(
# If got_amt == to_amt, this is enough precision to do the self.PRICE_FMT, price, denomination, amount, to_amt,
# conversion exactly, so we're done. )
# If rate == full_rate, there's no more precision available, so stop. return "{} {}{}".format(
if (got_amt == to_amt) or (rate == full_rate): amt_s,
break self._pretty_rate(self.COST_FMT, cost, denomination, amount, to_amt),
return "{} {}".format(amt_s, self.format_ledger_rate_raw(rate, denomination)) price_s,
)
def format_conversion(self, from_amt, from_curr, to_curr): def format_conversion(self, from_amt, from_curr, to_curr):
to_amt = self.rate.convert(from_amt, from_curr, to_curr) to_amt = self.cost_rates.convert(from_amt, from_curr, to_curr)
return "{}\n{}".format( return "{}\n{}".format(
self.format_denominated_rate(from_amt, from_curr, to_curr), self.format_denominated_rate(from_amt, from_curr, to_curr),
self.format_denominated_rate(to_amt, to_curr, None), self.format_denominated_rate(to_amt, to_curr, None),
@ -147,19 +181,28 @@ class Formats(enum.Enum):
return cls[s.upper()] return cls[s.upper()]
def load_rates(config, loaders, date):
with loaders.historical(date, config.args.base) as rate_json:
rates = oxrrate.Rate.from_json_file(rate_json)
if loaders.should_cache():
config.cache.save_rate(rates)
return rates
def run(config, stdout, stderr): def run(config, stdout, stderr):
loaders = config.get_loaders() loaders = config.get_loaders()
with loaders.historical(config.args.date, config.args.base) as rate_json: cost_rates = load_rates(config, loaders, config.args.date)
rate = oxrrate.Rate.from_json_file(rate_json) if config.args.from_date is None:
if loaders.should_cache(): price_rates = None
config.cache.save_rate(rate) else:
price_rates = load_rates(config, loaders, config.args.from_date)
formatter = config.args.output_format.value( formatter = config.args.output_format.value(
rate, cost_rates,
price_rates,
config.args.signed_currencies, config.args.signed_currencies,
denomination=config.args.denomination, denomination=config.args.denomination,
) )
if not config.args.from_currency: if not config.args.from_currency:
for from_curr in sorted(rate.rates): for from_curr in sorted(cost_rates.rates):
print(formatter.format_rate_pair_bidir(from_curr, config.args.to_currency), print(formatter.format_rate_pair_bidir(from_curr, config.args.to_currency),
file=stdout) file=stdout)
elif config.args.amount is None: elif config.args.amount is None:

13
tests/historical2.json Normal file
View file

@ -0,0 +1,13 @@
{
"disclaimer": "https://openexchangerates.org/terms/",
"license": "https://openexchangerates.org/license/",
"timestamp": 982256400,
"base": "USD",
"rates": {
"AED": 3.76246,
"ALL": 144.529739,
"ANG": 1.97,
"RUB": 57.0736,
"USD": 1
}
}

View file

@ -1,6 +1,7 @@
import argparse import argparse
import decimal import decimal
import io import io
import itertools
import json import json
import re import re
@ -11,11 +12,11 @@ from . import any_date, relpath
import oxrlib.commands.historical as oxrhist import oxrlib.commands.historical as oxrhist
class FakeResponder: class FakeResponder:
def __init__(self, response_path): def __init__(self, *response_paths):
self.response_path = response_path self.paths = itertools.cycle(response_paths)
def _respond(self, *args, **kwargs): def _respond(self, *args, **kwargs):
return open(self.response_path) return next(self.paths).open()
def __getattr__(self, name): def __getattr__(self, name):
return self._respond return self._respond
@ -38,9 +39,16 @@ class FakeConfig:
output = pytest.fixture(lambda: io.StringIO()) output = pytest.fixture(lambda: io.StringIO())
@pytest.fixture(scope='module')
def single_responder():
return FakeResponder(relpath('historical1.json'))
@pytest.fixture @pytest.fixture
def historical1_responder(): def alternate_responder():
return FakeResponder(relpath('historical1.json').as_posix()) return FakeResponder(
relpath('historical1.json'),
relpath('historical2.json'),
)
def build_config( def build_config(
responder, responder,
@ -48,6 +56,7 @@ def build_config(
amount=None, amount=None,
from_currency=None, from_currency=None,
to_currency=None, to_currency=None,
from_date=None,
ledger=False, ledger=False,
signed_currencies=None, signed_currencies=None,
denomination=None, denomination=None,
@ -59,6 +68,7 @@ def build_config(
'amount': None if amount is None else decimal.Decimal(amount), 'amount': None if amount is None else decimal.Decimal(amount),
'from_currency': from_currency, 'from_currency': from_currency,
'to_currency': base if to_currency is None else to_currency, 'to_currency': base if to_currency is None else to_currency,
'from_date': from_date,
'output_format': oxrhist.Formats['LEDGER' if ledger else 'RAW'], 'output_format': oxrhist.Formats['LEDGER' if ledger else 'RAW'],
'signed_currencies': [base] if signed_currencies is None else signed_currencies, 'signed_currencies': [base] if signed_currencies is None else signed_currencies,
'denomination': denomination, 'denomination': denomination,
@ -86,8 +96,8 @@ def check_fx_amount(config, lines, amount, cost, fx_code, fx_sign=None, price=No
line = next(lines, "<EOF>") line = next(lines, "<EOF>")
assert re.match(pattern, line) assert re.match(pattern, line)
def test_rate_list(historical1_responder, output, any_date): def test_rate_list(single_responder, output, any_date):
config = build_config(historical1_responder, any_date) config = build_config(single_responder, any_date)
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
assert next(lines).startswith('1 AED = 0.27229') assert next(lines).startswith('1 AED = 0.27229')
assert next(lines) == '1 USD = 3.67246 AED\n' assert next(lines) == '1 USD = 3.67246 AED\n'
@ -96,52 +106,52 @@ def test_rate_list(historical1_responder, output, any_date):
assert next(lines).startswith('1 ANG = 0.55865') assert next(lines).startswith('1 ANG = 0.55865')
assert next(lines) == '1 USD = 1.79 ANG\n' assert next(lines) == '1 USD = 1.79 ANG\n'
def test_one_rate(historical1_responder, output, any_date): def test_one_rate(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, from_currency='ANG') config = build_config(single_responder, any_date, from_currency='ANG')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
assert next(lines).startswith('1 ANG = 0.55865') assert next(lines).startswith('1 ANG = 0.55865')
assert next(lines) == '1 USD = 1.79 ANG\n' assert next(lines) == '1 USD = 1.79 ANG\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_conversion(historical1_responder, output, any_date): def test_conversion(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, amount=10, from_currency='AED') config = build_config(single_responder, any_date, amount=10, from_currency='AED')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
assert next(lines) == '10.00 AED = 2.72 USD\n' assert next(lines) == '10.00 AED = 2.72 USD\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_back_conversion(historical1_responder, output, any_date): def test_back_conversion(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
amount=2, from_currency='USD', to_currency='ALL') amount=2, from_currency='USD', to_currency='ALL')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
assert next(lines) == '2.00 USD = 289 ALL\n' assert next(lines) == '2.00 USD = 289 ALL\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_ledger_rate(historical1_responder, output, any_date): def test_ledger_rate(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='ANG', ledger=True) from_currency='ANG', ledger=True)
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
check_fx_amount(config, lines, '1 ANG', '0.5586', 'USD', '$') check_fx_amount(config, lines, '1 ANG', '0.5586', 'USD', '$')
check_fx_amount(config, lines, '1 USD', '1.79', 'ANG') check_fx_amount(config, lines, '1 USD', '1.79', 'ANG')
assert next(lines, None) is None assert next(lines, None) is None
def test_ledger_conversion(historical1_responder, output, any_date): def test_ledger_conversion(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='ALL', amount=300, ledger=True) from_currency='ALL', amount=300, ledger=True)
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
check_fx_amount(config, lines, '300 ALL', '0.00691', 'USD', '$') check_fx_amount(config, lines, '300 ALL', '0.00691', 'USD', '$')
assert next(lines) == '$2.08\n' assert next(lines) == '$2.08\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_signed_currencies(historical1_responder, output, any_date): def test_signed_currencies(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='AED', ledger=True, signed_currencies=['EUR']) from_currency='AED', ledger=True, signed_currencies=['EUR'])
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
check_fx_amount(config, lines, '1 AED', '0.272', 'USD', '$') check_fx_amount(config, lines, '1 AED', '0.272', 'USD', '$')
check_fx_amount(config, lines, '1 USD', '3.672', 'AED') check_fx_amount(config, lines, '1 USD', '3.672', 'AED')
assert next(lines, None) is None assert next(lines, None) is None
def test_denomination(historical1_responder, output, any_date): def test_denomination(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='ANG', to_currency='AED', amount=10, from_currency='ANG', to_currency='AED', amount=10,
ledger=True, denomination='USD') ledger=True, denomination='USD')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
@ -149,8 +159,8 @@ def test_denomination(historical1_responder, output, any_date):
check_fx_amount(config, lines, '20.52 AED', '0.272', 'USD', '$') check_fx_amount(config, lines, '20.52 AED', '0.272', 'USD', '$')
assert next(lines, None) is None assert next(lines, None) is None
def test_redundant_denomination(historical1_responder, output, any_date): def test_redundant_denomination(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='ANG', to_currency='USD', amount=10, from_currency='ANG', to_currency='USD', amount=10,
ledger=True, denomination='USD') ledger=True, denomination='USD')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
@ -158,8 +168,8 @@ def test_redundant_denomination(historical1_responder, output, any_date):
assert next(lines) == '$5.59\n' assert next(lines) == '$5.59\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_from_denomination(historical1_responder, output, any_date): def test_from_denomination(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='USD', to_currency='ALL', amount=10, from_currency='USD', to_currency='ALL', amount=10,
ledger=True, denomination='USD') ledger=True, denomination='USD')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
@ -167,8 +177,8 @@ def test_from_denomination(historical1_responder, output, any_date):
check_fx_amount(config, lines, '1,445 ALL', '0.00691', 'USD', '$') check_fx_amount(config, lines, '1,445 ALL', '0.00691', 'USD', '$')
assert next(lines, None) is None assert next(lines, None) is None
def test_rate_precision_added_as_needed(historical1_responder, output, any_date): def test_rate_precision_added_as_needed(single_responder, output, any_date):
config = build_config(historical1_responder, any_date, config = build_config(single_responder, any_date,
from_currency='RUB', to_currency='USD', amount=63805, from_currency='RUB', to_currency='USD', amount=63805,
ledger=True, denomination='USD') ledger=True, denomination='USD')
lines = lines_from_run(config, output) lines = lines_from_run(config, output)
@ -179,3 +189,21 @@ def test_rate_precision_added_as_needed(historical1_responder, output, any_date)
check_fx_amount(config, lines, '63,805.00 RUB', '0.0175204', 'USD', '$') check_fx_amount(config, lines, '63,805.00 RUB', '0.0175204', 'USD', '$')
assert next(lines) == '$1,117.89\n' assert next(lines) == '$1,117.89\n'
assert next(lines, None) is None assert next(lines, None) is None
def test_from_date_rates(alternate_responder, output, any_date):
config = build_config(alternate_responder, any_date,
from_currency='ANG', to_currency='AED',
from_date=any_date, ledger=True, denomination='USD')
lines = lines_from_run(config, output)
check_fx_amount(config, lines, '1 ANG', '2.051', 'AED', None, '1.909')
check_fx_amount(config, lines, '1 AED', '0.487', 'ANG', None, '0.523')
assert next(lines, None) is None
def test_from_date_conversion(alternate_responder, output, any_date):
config = build_config(alternate_responder, any_date,
from_currency='ANG', to_currency='AED', amount=10,
from_date=any_date, ledger=True, denomination='USD')
lines = lines_from_run(config, output)
check_fx_amount(config, lines, '10.00 ANG', '0.558', 'USD', '$', '0.507')
check_fx_amount(config, lines, '20.52 AED', '0.272', 'USD', '$', '0.265')
assert next(lines, None) is None