[ledgercli] Store posting metadata
This commit is contained in:
parent
8abbe3462f
commit
86a6bec585
2 changed files with 74 additions and 16 deletions
|
@ -6,13 +6,10 @@ import os
|
|||
import sys
|
||||
import subprocess
|
||||
import logging
|
||||
import time
|
||||
import re
|
||||
import pygit2
|
||||
|
||||
from datetime import datetime
|
||||
from xml.etree import ElementTree
|
||||
from contextlib import contextmanager
|
||||
|
||||
from accounting.exceptions import AccountingException, TransactionNotFound, \
|
||||
LedgerNotBalanced, TransactionIDCollision
|
||||
|
@ -21,6 +18,14 @@ from accounting.storage import Storage
|
|||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
HAS_PYGIT = False
|
||||
|
||||
try:
|
||||
import pygit2
|
||||
HAS_PYGIT = True
|
||||
except ImportError:
|
||||
_log.warning('Failed to import pygit2')
|
||||
|
||||
|
||||
class Ledger(Storage):
|
||||
def __init__(self, app=None, ledger_file=None, ledger_bin=None):
|
||||
|
@ -34,6 +39,12 @@ class Ledger(Storage):
|
|||
self.ledger_file = ledger_file
|
||||
_log.info('ledger file: %s', ledger_file)
|
||||
|
||||
self.init_pygit()
|
||||
|
||||
def init_pygit(self):
|
||||
if not HAS_PYGIT:
|
||||
return False
|
||||
|
||||
try:
|
||||
self.repository = pygit2.Repository(
|
||||
os.path.join(os.path.dirname(self.ledger_file), '.git'))
|
||||
|
@ -89,7 +100,7 @@ class Ledger(Storage):
|
|||
|
||||
return args
|
||||
|
||||
def send_command(self, command):
|
||||
def run_command(self, command):
|
||||
'''
|
||||
Creates a new ledger process with the specified :data:`command` and
|
||||
returns the output.
|
||||
|
@ -147,27 +158,34 @@ class Ledger(Storage):
|
|||
|
||||
metadata_template = ' ;{0}: {1}\n'
|
||||
|
||||
# TODO: Generate metadata for postings
|
||||
posting_template = (' {account} {p.amount.symbol}'
|
||||
' {p.amount.amount}\n')
|
||||
|
||||
output = b''
|
||||
|
||||
out_postings = ''
|
||||
|
||||
for posting in transaction.postings:
|
||||
out_postings += posting_template.format(
|
||||
p=posting,
|
||||
account=posting.account + ' ' * (
|
||||
80 - (len(posting.account) + len(posting.amount.symbol) +
|
||||
len(str(posting.amount.amount)) + 1 + 2))
|
||||
)
|
||||
|
||||
if len(posting.metadata):
|
||||
for k, v in posting.metadata.items():
|
||||
out_postings += metadata_template.format(str(k), str(v))
|
||||
|
||||
# XXX: Even I hardly understands what this does, however I indent it it
|
||||
# stays unreadable.
|
||||
output += transaction_template.format(
|
||||
date=transaction.date.strftime('%Y-%m-%d'),
|
||||
t=transaction,
|
||||
metadata=''.join([
|
||||
metadata_template.format(k, v)
|
||||
metadata_template.format(str(k), str(v))
|
||||
for k, v in transaction.metadata.items()]),
|
||||
postings=''.join([posting_template.format(
|
||||
p=p,
|
||||
account=p.account + ' ' * (
|
||||
80 - (len(p.account) + len(p.amount.symbol) +
|
||||
len(str(p.amount.amount)) + 1 + 2)
|
||||
)) for p in transaction.postings
|
||||
])
|
||||
postings=out_postings
|
||||
).encode('utf8')
|
||||
|
||||
with open(self.ledger_file, 'ab') as f:
|
||||
|
@ -194,7 +212,7 @@ class Ledger(Storage):
|
|||
return transaction.id
|
||||
|
||||
def bal(self):
|
||||
output = self.send_command('xml')
|
||||
output = self.run_command('xml')
|
||||
|
||||
if output is None:
|
||||
raise RuntimeError('bal call returned no output')
|
||||
|
@ -248,7 +266,7 @@ class Ledger(Storage):
|
|||
return transaction
|
||||
|
||||
def reg(self):
|
||||
output = self.send_command('xml')
|
||||
output = self.run_command('xml')
|
||||
|
||||
if output is None:
|
||||
raise RuntimeError('reg call returned no output')
|
||||
|
|
|
@ -293,8 +293,48 @@ class TransactionTestCase(unittest.TestCase):
|
|||
|
||||
self.assertEqual(response['error']['type'], 'TransactionNotFound')
|
||||
|
||||
def test_post_transaction_with_metadata(self): pass
|
||||
def test_post_transaction_with_metadata(self):
|
||||
transaction = copy.deepcopy(self.simple_transaction)
|
||||
|
||||
transaction.metadata.update({'foo': 'bar'})
|
||||
|
||||
response = self._post_json('/transaction', transaction)
|
||||
|
||||
transaction_id = response['transaction_ids'][0]
|
||||
|
||||
response = self._get_json('/transaction/' + transaction_id)
|
||||
|
||||
self.assertEqual(
|
||||
response['transaction'].metadata,
|
||||
transaction.metadata)
|
||||
|
||||
def test_post_transaction_with_posting_metadata(self):
|
||||
transaction = copy.deepcopy(self.simple_transaction)
|
||||
|
||||
postings = [
|
||||
Posting(account='Assets:Checking', metadata={'assets': 'checking'},
|
||||
amount=Amount(amount='-100.10', symbol='$')),
|
||||
Posting(account='Expenses:Foo', metadata={'expenses': 'foo'},
|
||||
amount=Amount(amount='100.10', symbol='$'))
|
||||
]
|
||||
|
||||
transaction.postings = postings
|
||||
|
||||
response = self._post_json('/transaction', transaction)
|
||||
|
||||
transaction_id = response['transaction_ids'][0]
|
||||
|
||||
response = self._get_json('/transaction/' + transaction_id)
|
||||
|
||||
for posting in response['transaction'].postings:
|
||||
if posting.account == 'Expenses:Foo':
|
||||
self.assertEqual(posting.metadata, {'expenses': 'foo'})
|
||||
elif posting.account == 'Assets:Checking':
|
||||
self.assertEqual(posting.metadata, {'assets': 'checking'})
|
||||
else:
|
||||
assert False, \
|
||||
'Something about this transaction\'s postings is' \
|
||||
' unexpected'
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in a new issue