Moving AUT UNLOCK to Vendor command instead of using VendorConfig.
To do this a MSE command is added, to manage a secure environment. It performs a ephemeral ECDH exchange to derive a shared secret that will be used by vendor commands to convey ciphered data. Signed-off-by: Pol Henarejos <pol.henarejos@cttc.es>
This commit is contained in:
@@ -69,133 +69,32 @@ class VendorConfig(Config):
|
||||
|
||||
class PARAM(IntEnum):
|
||||
VENDOR_COMMAND_ID = 0x01
|
||||
VENDOR_AUT_KEY_AGREEMENT = 0x02
|
||||
VENDOR_AUT_CT = 0x03
|
||||
VENDOR_AUT_CT = 0x02
|
||||
|
||||
class CMD(IntEnum):
|
||||
CONFIG_AUT = 0x03e43f56b34285e2
|
||||
CONFIG_KEY_AGREEMENT = 0x1831a40f04a25ed9
|
||||
CONFIG_UNLOCK = 0x54365966c9a74770
|
||||
CONFIG_BACKUP = 0x6b1ede62beff0d5e
|
||||
CONFIG_AUT_ENABLE = 0x03e43f56b34285e2
|
||||
CONFIG_AUT_DISABLE = 0x1831a40f04a25ed9
|
||||
|
||||
class RESP(IntEnum):
|
||||
KEY_AGREEMENT = 0x01
|
||||
BACKUP = 0x01
|
||||
|
||||
def __init__(self, ctap, pin_uv_protocol=None, pin_uv_token=None):
|
||||
super().__init__(ctap, pin_uv_protocol, pin_uv_token)
|
||||
|
||||
def _get_key_device(self):
|
||||
return skey.get_secure_key()
|
||||
|
||||
def _get_shared_key(self):
|
||||
ret = self._call(
|
||||
Config.CMD.VENDOR_PROTOTYPE,
|
||||
{
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_KEY_AGREEMENT,
|
||||
},
|
||||
)
|
||||
peer_cose_key = ret[VendorConfig.RESP.KEY_AGREEMENT]
|
||||
|
||||
sk = ec.generate_private_key(ec.SECP256R1())
|
||||
pn = sk.public_key().public_numbers()
|
||||
pb = sk.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||
key_agreement = {
|
||||
1: 2,
|
||||
3: -25, # Per the spec, "although this is NOT the algorithm actually used"
|
||||
-1: 1,
|
||||
-2: int2bytes(pn.x, 32),
|
||||
-3: int2bytes(pn.y, 32),
|
||||
}
|
||||
|
||||
x = bytes2int(peer_cose_key[-2])
|
||||
y = bytes2int(peer_cose_key[-3])
|
||||
pk = ec.EllipticCurvePublicNumbers(x, y, ec.SECP256R1()).public_key()
|
||||
shared_key = sk.exchange(ec.ECDH(), pk)
|
||||
|
||||
xkdf = HKDF(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=12+32,
|
||||
salt=None,
|
||||
info=pb
|
||||
)
|
||||
kdf_out = xkdf.derive(shared_key)
|
||||
key_enc = kdf_out[12:]
|
||||
iv = kdf_out[:12]
|
||||
return iv, key_enc, key_agreement, pb
|
||||
|
||||
def _send_command_key(self, cmd):
|
||||
iv, key_enc, key_agreement, pb = self._get_shared_key()
|
||||
|
||||
chacha = ChaCha20Poly1305(key_enc)
|
||||
ct = chacha.encrypt(iv, self._get_key_device(), pb)
|
||||
def enable_device_aut(self, ct):
|
||||
self._call(
|
||||
Config.CMD.VENDOR_PROTOTYPE,
|
||||
{
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: cmd,
|
||||
VendorConfig.PARAM.VENDOR_AUT_KEY_AGREEMENT: key_agreement,
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_AUT_ENABLE,
|
||||
VendorConfig.PARAM.VENDOR_AUT_CT: ct
|
||||
},
|
||||
)
|
||||
|
||||
def enable_device_aut(self):
|
||||
self._send_command_key(VendorConfig.CMD.CONFIG_AUT)
|
||||
|
||||
def unlock_device(self):
|
||||
self._send_command_key(VendorConfig.CMD.CONFIG_UNLOCK)
|
||||
|
||||
def disable_device_aut(self):
|
||||
self._call(
|
||||
Config.CMD.VENDOR_PROTOTYPE,
|
||||
{
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_AUT,
|
||||
},
|
||||
)
|
||||
|
||||
def backup_save(self, filename):
|
||||
ret = self._call(
|
||||
Config.CMD.VENDOR_PROTOTYPE,
|
||||
{
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_BACKUP,
|
||||
},
|
||||
)
|
||||
data = ret[VendorConfig.RESP.BACKUP]
|
||||
d = int.from_bytes(skey.get_secure_key(), 'big')
|
||||
with open(filename, 'wb') as fp:
|
||||
fp.write(b'\x01')
|
||||
fp.write(data)
|
||||
pk = ec.derive_private_key(d, ec.SECP256R1())
|
||||
signature = pk.sign(data, ec.ECDSA(hashes.SHA256()))
|
||||
fp.write(signature)
|
||||
print('Remember the following words in this order:')
|
||||
for c in range(24):
|
||||
coef = (d//(2048**c))%2048
|
||||
print(f'{(c+1):02d} - {words[coef]}')
|
||||
|
||||
def backup_load(self, filename):
|
||||
d = 0
|
||||
if (d == 0):
|
||||
for c in range(24):
|
||||
word = input(f'Introduce word {(c+1):02d}: ')
|
||||
while (word not in words):
|
||||
word = input(f'Word not found. Please, tntroduce the correct word {(c+1):02d}: ')
|
||||
coef = words.index(word)
|
||||
d = d+(2048**c)*coef
|
||||
|
||||
pk = ec.derive_private_key(d, ec.SECP256R1())
|
||||
pb = pk.public_key()
|
||||
with open(filename, 'rb') as fp:
|
||||
format = fp.read(1)[0]
|
||||
if (format == 0x1):
|
||||
data = fp.read(60)
|
||||
signature = fp.read()
|
||||
pb.verify(signature, data, ec.ECDSA(hashes.SHA256()))
|
||||
skey.set_secure_key(pk)
|
||||
self._call(
|
||||
Config.CMD.VENDOR_PROTOTYPE,
|
||||
{
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_BACKUP,
|
||||
VendorConfig.PARAM.VENDOR_AUT_CT: data,
|
||||
VendorConfig.PARAM.VENDOR_COMMAND_ID: VendorConfig.CMD.CONFIG_AUT_DISABLE
|
||||
},
|
||||
)
|
||||
|
||||
@@ -280,17 +179,22 @@ class Vendor:
|
||||
@unique
|
||||
class CMD(IntEnum):
|
||||
VENDOR_BACKUP = 0x01
|
||||
VENDOR_MSE = 0x02
|
||||
VENDOR_UNLOCK = 0x03
|
||||
|
||||
@unique
|
||||
class PARAM(IntEnum):
|
||||
PARAM = 0x01
|
||||
PARAM = 0x01
|
||||
COSE_KEY = 0x02
|
||||
|
||||
class SUBCMD(IntEnum):
|
||||
ENABLE = 0x01
|
||||
DISABLE = 0x02
|
||||
ENABLE = 0x01
|
||||
DISABLE = 0x02
|
||||
KEY_AGREEMENT = 0x01
|
||||
|
||||
class RESP(IntEnum):
|
||||
PARAM = 0x01
|
||||
COSE_KEY = 0x02
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -304,6 +208,10 @@ class Vendor:
|
||||
if pin_uv_protocol and pin_uv_token
|
||||
else None
|
||||
)
|
||||
self.__key_enc = None
|
||||
self.__iv = None
|
||||
|
||||
self.vcfg = VendorConfig(ctap)
|
||||
|
||||
def _call(self, cmd, sub_cmd, params=None):
|
||||
if params:
|
||||
@@ -369,6 +277,72 @@ class Vendor:
|
||||
},
|
||||
)
|
||||
|
||||
def mse(self):
|
||||
sk = ec.generate_private_key(ec.SECP256R1())
|
||||
pn = sk.public_key().public_numbers()
|
||||
self.__pb = sk.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
|
||||
key_agreement = {
|
||||
1: 2,
|
||||
3: -25, # Per the spec, "although this is NOT the algorithm actually used"
|
||||
-1: 1,
|
||||
-2: int2bytes(pn.x, 32),
|
||||
-3: int2bytes(pn.y, 32),
|
||||
}
|
||||
|
||||
ret = self._call(
|
||||
Vendor.CMD.VENDOR_MSE,
|
||||
Vendor.SUBCMD.KEY_AGREEMENT,
|
||||
{
|
||||
Vendor.PARAM.COSE_KEY: key_agreement,
|
||||
},
|
||||
)
|
||||
|
||||
peer_cose_key = ret[VendorConfig.RESP.KEY_AGREEMENT]
|
||||
|
||||
x = bytes2int(peer_cose_key[-2])
|
||||
y = bytes2int(peer_cose_key[-3])
|
||||
pk = ec.EllipticCurvePublicNumbers(x, y, ec.SECP256R1()).public_key()
|
||||
shared_key = sk.exchange(ec.ECDH(), pk)
|
||||
|
||||
xkdf = HKDF(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=12+32,
|
||||
salt=None,
|
||||
info=self.__pb
|
||||
)
|
||||
kdf_out = xkdf.derive(shared_key)
|
||||
self.__key_enc = kdf_out[12:]
|
||||
self.__iv = kdf_out[:12]
|
||||
|
||||
def encrypt_chacha(self, data):
|
||||
chacha = ChaCha20Poly1305(self.__key_enc)
|
||||
ct = chacha.encrypt(self.__iv, data, self.__pb)
|
||||
return ct
|
||||
|
||||
def unlock_device(self):
|
||||
ct = self.get_skey()
|
||||
self._call(
|
||||
Vendor.CMD.VENDOR_UNLOCK,
|
||||
Vendor.SUBCMD.ENABLE,
|
||||
{
|
||||
Vendor.PARAM.PARAM: ct
|
||||
},
|
||||
)
|
||||
|
||||
def _get_key_device(self):
|
||||
return skey.get_secure_key()
|
||||
|
||||
def get_skey(self):
|
||||
self.mse()
|
||||
ct = self.encrypt_chacha(self._get_key_device())
|
||||
return ct
|
||||
|
||||
def enable_device_aut(self):
|
||||
ct = self.get_skey()
|
||||
self.vcfg.enable_device_aut(ct)
|
||||
|
||||
def disable_device_aut(self):
|
||||
self.vcfg.disable_device_aut()
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -383,19 +357,15 @@ def parse_args():
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
def secure(dev, args):
|
||||
vcfg = VendorConfig(Ctap2(dev))
|
||||
|
||||
def secure(vdr, args):
|
||||
if (args.subcommand == 'enable'):
|
||||
vcfg.enable_device_aut()
|
||||
vdr.enable_device_aut()
|
||||
elif (args.subcommand == 'unlock'):
|
||||
vcfg.unlock_device()
|
||||
vdr.unlock_device()
|
||||
elif (args.subcommand == 'disable'):
|
||||
vcfg.disable_device_aut()
|
||||
|
||||
def backup(dev, args):
|
||||
vdr = Vendor(Ctap2Vendor(dev))
|
||||
vdr.disable_device_aut()
|
||||
|
||||
def backup(vdr, args):
|
||||
if (args.subcommand == 'save'):
|
||||
vdr.backup_save(args.filename)
|
||||
elif (args.subcommand == 'load'):
|
||||
@@ -411,10 +381,12 @@ def main(args):
|
||||
|
||||
dev = next(CtapHidDevice.list_devices(), None)
|
||||
|
||||
vdr = Vendor(Ctap2Vendor(dev))
|
||||
|
||||
if (args.command == 'secure'):
|
||||
secure(dev, args)
|
||||
secure(vdr, args)
|
||||
elif (args.command == 'backup'):
|
||||
backup(dev, args)
|
||||
backup(vdr, args)
|
||||
|
||||
def run():
|
||||
args = parse_args()
|
||||
|
||||
Reference in New Issue
Block a user