147 lines
2.9 KiB
Python
147 lines
2.9 KiB
Python
|
from time import time
|
||
|
from base64 import b16encode
|
||
|
import os.path
|
||
|
import libnacl.utils, libnacl.sign
|
||
|
from transaction_const import *
|
||
|
from io import BytesIO
|
||
|
from hashlib import sha3_256
|
||
|
import logging
|
||
|
from random import randint
|
||
|
|
||
|
logging.basicConfig(level='DEBUG')
|
||
|
log = logging.getLogger('tx')
|
||
|
|
||
|
'''
|
||
|
|
||
|
[ CA$H ]
|
||
|
[signature ] * 16
|
||
|
[load][used]
|
||
|
[keyi][txid]
|
||
|
[timestamp ]
|
||
|
[ uuid ]
|
||
|
[ F0X0 ]
|
||
|
|
||
|
|
||
|
21*4 = 84bytes
|
||
|
76 bytes
|
||
|
|
||
|
'''
|
||
|
|
||
|
FIELDS = {
|
||
|
'signature': (0, 63),
|
||
|
'enc_key': 64,
|
||
|
'key_id': 65,
|
||
|
'load_amt': (66, 67),
|
||
|
'used_amt': (68, 69),
|
||
|
'tx_id': (70, 71),
|
||
|
'ts': (72, 75),
|
||
|
'uuid': (76, 79)
|
||
|
}
|
||
|
|
||
|
class Transaction:
|
||
|
def __init__(self, tag_id, nfc_bytes=None):
|
||
|
|
||
|
# These are needed for proper randomness
|
||
|
self.tag_id = tag_id
|
||
|
|
||
|
self.data = bytearray(max([(x[1] if isinstance(x, tuple) else x) for x in FIELDS.values()])+1)
|
||
|
|
||
|
self.set_field('key_id', KEY_ID)
|
||
|
self.set_field('ts', time())
|
||
|
self.set_field('uuid', libnacl.randombytes(4))
|
||
|
|
||
|
if nfc_bytes:
|
||
|
self.load(nfc_bytes)
|
||
|
|
||
|
def set_field(self, fid, val):
|
||
|
|
||
|
if isinstance(FIELDS[fid], int):
|
||
|
flen = 1
|
||
|
sbyte = FIELDS[fid]
|
||
|
else:
|
||
|
flen = 1 + FIELDS[fid][1] - FIELDS[fid][0]
|
||
|
sbyte = FIELDS[fid][0]
|
||
|
|
||
|
if isinstance(val, float):
|
||
|
val = int(val)
|
||
|
|
||
|
if isinstance(val, int):
|
||
|
val = val.to_bytes(flen)
|
||
|
|
||
|
assert len(val) <= flen
|
||
|
|
||
|
if flen > 1:
|
||
|
log.info(f"Writing {fid} at bytes {sbyte}:{sbyte+flen-1}: 0x{b16encode(val).decode()}")
|
||
|
self.data[sbyte:sbyte+flen] = val
|
||
|
else:
|
||
|
log.info(f"Writing {fid} at byte {sbyte}: 0x{b16encode(val).decode()}")
|
||
|
self.data[sbyte] = val[0]
|
||
|
|
||
|
def get_field(self, fid):
|
||
|
if isinstance(FIELDS[fid], int):
|
||
|
flen = 1
|
||
|
sbyte = FIELDS[fid]
|
||
|
else:
|
||
|
flen = 1 + FIELDS[fid][1] - FIELDS[fid][0]
|
||
|
sbyte = FIELDS[fid][0]
|
||
|
|
||
|
if flen > 1:
|
||
|
val = bytes(self.data[sbyte:sbyte+flen])
|
||
|
else:
|
||
|
val = self.data[sbyte]
|
||
|
|
||
|
return val
|
||
|
|
||
|
def load(self, data):
|
||
|
if data.startswith(HEADER):
|
||
|
data = data[len(HEADER):]
|
||
|
|
||
|
if data.find(FOOTER):
|
||
|
data = data[:-data.find(FOOTER)]
|
||
|
|
||
|
self.data = data
|
||
|
|
||
|
def sign(self):
|
||
|
self.set_field('key_id', KEY_ID)
|
||
|
signer = libnacl.sign.Signer(KEY_SEED)
|
||
|
signature = signer.signature(bytes(self.data[FIELDS['signature'][1]+1:]+self.tag_id))
|
||
|
self.set_field('signature', signature)
|
||
|
|
||
|
log.info("Signed message with success!")
|
||
|
|
||
|
self.verify()
|
||
|
|
||
|
def get_bytes(self):
|
||
|
return HEADER + self.data + FOOTER
|
||
|
|
||
|
# TODO: implement these
|
||
|
def topup(self, amt):
|
||
|
self.load_amount += amt
|
||
|
self.tx_bytes
|
||
|
|
||
|
def spend(self, amt):
|
||
|
self.used_amount += amt
|
||
|
|
||
|
def verify(self):
|
||
|
verifier = libnacl.sign.Verifier(KEYS[self.get_field('key_id')])
|
||
|
verifier.verify(bytes(self.data + self.tag_id))
|
||
|
log.info("Signature has passed!")
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
print("Testing transaction.")
|
||
|
tx = Transaction(tag_id=bytes(8))
|
||
|
tx.sign()
|
||
|
|
||
|
#for fname in FIELDS:
|
||
|
# print(' ', fname, tx.get_field(fname), 'len')
|
||
|
|
||
|
#res = tx.get_bytes()
|
||
|
#tx.verify()
|
||
|
|
||
|
#tx = Transaction(tag_id=bytes(8), nfc_bytes=res)
|
||
|
#res2 = tx.get_bytes()
|
||
|
|
||
|
|
||
|
|
||
|
|