diff --git a/tests/conftest.py b/tests/conftest.py index 3dab4f2..9181a55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,9 +11,24 @@ from fido2.cose import ES256 import sys import pytest import os +import struct DEFAULT_PIN='12345678' + +class Packet(object): + def __init__(self, data): + self.data = data + + def ToWireFormat( + self, + ): + return self.data + + @staticmethod + def FromWireFormat(pkt_size, data): + return Packet(data) + class CliInteraction(UserInteraction): def prompt_up(self): print("\nTouch your authenticator device now...\n") @@ -47,6 +62,7 @@ class Device(): # Locate a device self.__dev = next(CtapHidDevice.list_devices(), None) + self.dev = self.__dev if self.__dev is not None: print("Use USB HID channel.") else: @@ -96,6 +112,41 @@ class Device(): self.__rp = rp return self.__rp + def send_data(self, cmd, data, timeout = 1.0, on_keepalive = None): + if not isinstance(data, bytes): + data = struct.pack("%dB" % len(data), *[ord(x) for x in data]) + with Timeout(timeout) as event: + event.is_set() + return self.dev.call(cmd, data, event, on_keepalive = on_keepalive) + + def cid(self): + return self.dev._channel_id + + def set_cid(self, cid): + self.dev._channel_id = int.from_bytes(cid, 'big') + + def recv_raw(self): + with Timeout(1.0): + r = self.dev._connection.read_packet() + return r[4], r[7:] + + def send_raw(self, data, cid=None): + if cid is None: + cid = self.dev._channel_id.to_bytes(4, 'big') + elif not isinstance(cid, bytes): + cid = struct.pack("%dB" % len(cid), *[ord(x) for x in cid]) + if not isinstance(data, bytes): + data = struct.pack("%dB" % len(data), *[ord(x) for x in data]) + data = cid + data + l = len(data) + if l != 64: + pad = "\x00" * (64 - l) + pad = struct.pack("%dB" % len(pad), *[ord(x) for x in pad]) + data = data + pad + data = bytes(data) + assert len(data) == 64 + self.dev._connection.write_packet(data) + def reset(self): print("Resetting Authenticator...") try: diff --git a/tests/pico-fido/test_hid.py b/tests/pico-fido/test_hid.py new file mode 100644 index 0000000..a75bffc --- /dev/null +++ b/tests/pico-fido/test_hid.py @@ -0,0 +1,253 @@ +import os +import socket +import time +from binascii import hexlify, unhexlify + +import pytest +from fido2.ctap import CtapError +from fido2.hid import CTAPHID +from utils import Timeout + +class TestHID(object): + def test_long_ping(self, device): + amt = 1000 + pingdata = os.urandom(amt) + + t1 = time.time() * 1000 + r = device.send_data(CTAPHID.PING, pingdata) + t2 = time.time() * 1000 + delt = t2 - t1 + + assert not (delt > 555 * (amt / 1000)) + + assert r == pingdata + + def test_init(self, device, check_timeouts=False): + if check_timeouts: + with pytest.raises(socket.timeout): + cmd, resp = self.recv_raw() + + payload = b"\x11\x11\x11\x11\x11\x11\x11\x11" + r = device.send_data(CTAPHID.INIT, payload) + print(r) + assert r[:8] == payload + + def test_ping(self, device): + + pingdata = os.urandom(100) + r = device.send_data(CTAPHID.PING, pingdata) + assert r == pingdata + + def test_wink(self, device): + r = device.send_data(CTAPHID.WINK, "") + + def test_cbor_no_payload(self, device): + payload = b"\x11\x11\x11\x11\x11\x11\x11\x11" + r = device.send_data(CTAPHID.INIT, payload) + capabilities = r[16] + + if (capabilities ^ 0x04) != 0: + print("Implements CBOR.") + with pytest.raises(CtapError) as e: + r = device.send_data(CTAPHID.CBOR, "") + assert e.value.code == CtapError.ERR.INVALID_LENGTH + else: + print("CBOR is not implemented.") + + def test_no_data_in_u2f_msg(self, device): + payload = b"\x11\x11\x11\x11\x11\x11\x11\x11" + r = device.send_data(CTAPHID.INIT, payload) + capabilities = r[16] + + if (capabilities ^ 0x08) == 0: + print("U2F implemented.") + with pytest.raises(CtapError) as e: + r = device.send_data(CTAPHID.MSG, "") + print(hexlify(r)) + assert e.value.code == CtapError.ERR.INVALID_LENGTH + else: + print("U2F not implemented.") + + def test_invalid_hid_cmd(self, device): + r = device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + + with pytest.raises(CtapError) as e: + r = device.send_data(0x66, "") + assert e.value.code == CtapError.ERR.INVALID_COMMAND + + def test_oversize_packet(self, device): + device.send_raw("\x81\x1d\xba\x00") + cmd, resp = device.recv_raw() + assert resp[0] == CtapError.ERR.INVALID_LENGTH + + def test_skip_sequence_number(self, device): + r = device.send_data(CTAPHID.PING, "\x44" * 200) + device.send_raw("\x81\x04\x90") + device.send_raw("\x00") + device.send_raw("\x01") + # skip 2 + device.send_raw("\x03") + cmd, resp = device.recv_raw() + assert resp[0] == CtapError.ERR.INVALID_SEQ + + def test_resync_and_ping(self, device): + r = device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + pingdata = os.urandom(100) + r = device.send_data(CTAPHID.PING, pingdata) + if r != pingdata: + raise ValueError("Ping data not echo'd") + + def test_ping_abort(self, device): + device.send_raw("\x81\x04\x00") + device.send_raw("\x00") + device.send_raw("\x01") + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + + def test_ping_abort_from_different_cid(self, device, check_timeouts=False): + oldcid = device.dev._channel_id + newcid = int.from_bytes(b"\x11\x22\x33\x44", 'big') + device.send_raw("\x81\x10\x00") + device.send_raw("\x00") + device.send_raw("\x01") + device.dev._channel_id = newcid + device.send_raw( + "\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88" + ) # init from different cid + print("wait for init response") + cmd, r = device.recv_raw() # init response + assert cmd == 0x86 + device.dev._channel_id = oldcid + if check_timeouts: + # print('wait for timeout') + cmd, r = device.recv_raw() # timeout response + assert cmd == 0xBF + + def test_timeout(self, device): + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + t1 = time.time() * 1000 + device.send_raw("\x81\x04\x00") + device.send_raw("\x00") + device.send_raw("\x01") + cmd, r = device.recv_raw() # timeout response + t2 = time.time() * 1000 + delt = t2 - t1 + assert cmd == 0xBF + assert r[0] == CtapError.ERR.TIMEOUT + assert delt < 1000 and delt > 400 + + def test_not_cont(self, device, check_timeouts=False): + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + device.send_raw("\x81\x04\x00") + device.send_raw("\x00") + device.send_raw("\x01") + device.send_raw("\x81\x10\x00") # init packet + cmd, r = device.recv_raw() # timeout response + assert cmd == 0xBF + assert r[0] == CtapError.ERR.INVALID_SEQ + + if check_timeouts: + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + device.send_raw("\x01\x10\x00") + with pytest.raises(socket.timeout): + cmd, r = device.recv_raw() # timeout response + + def test_check_busy(self, device): + t1 = time.time() * 1000 + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + oldcid = device.cid() + newcid = b"\x11\x22\x33\x44" + device.send_raw("\x81\x04\x00") + device.set_cid(newcid) + device.send_raw("\x81\x04\x00") + cmd, r = device.recv_raw() # busy response + t2 = time.time() * 1000 + assert t2 - t1 < 100 + assert cmd == 0xBF + assert r[0] == CtapError.ERR.CHANNEL_BUSY + + device.set_cid(oldcid) + cmd, r = device.recv_raw() # timeout response + assert cmd == 0xBF + assert r[0] == CtapError.ERR.TIMEOUT + + def test_check_busy_interleaved(self, device): + cid1 = b"\x11\x22\x33\x44" + cid2 = b"\x01\x22\x33\x44" + device.set_cid(cid2) + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + device.set_cid(cid1) + device.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88") + device.send_raw("\x81\x00\x63") # echo 99 bytes first channel + + device.set_cid(cid2) # send ping on 2nd channel + device.send_raw("\x81\x00\x39") + time.sleep(0.1) + device.send_raw("\x00") + + cmd, r = device.recv_raw() # busy response + + device.set_cid(cid1) # finish 1st channel ping + device.send_raw("\x00") + + device.set_cid(cid2) + + assert cmd == 0xBF + assert r[0] == CtapError.ERR.CHANNEL_BUSY + + device.set_cid(cid1) + cmd, r = device.recv_raw() # ping response + assert cmd == 0x81 + assert len(r) == 0x39 + + def test_cid_0(self, device): + device.set_cid("\x00\x00\x00\x00") + device.send_raw( + "\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88", cid="\x00\x00\x00\x00" + ) + cmd, r = device.recv_raw() # timeout + assert cmd == 0xBF + assert r[0] == CtapError.ERR.INVALID_CHANNEL + device.set_cid("\x05\x04\x03\x02") + + def test_cid_ffffffff(self, device): + + device.set_cid("\xff\xff\xff\xff") + device.send_raw( + "\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88", cid="\xff\xff\xff\xff" + ) + cmd, r = device.recv_raw() # timeout + assert cmd == 0xBF + assert r[0] == CtapError.ERR.INVALID_CHANNEL + device.set_cid("\x05\x04\x03\x02") + + def test_keep_alive(self, device, check_timeouts=False): + + precanned_make_credential = unhexlify( + '01a401582031323334353637383961626364656630313233343536373'\ + '8396162636465663002a26269646b6578616d706c652e6f7267646e61'\ + '6d65694578616d706c65525003a462696446cc2abaf119f26469636f6'\ + 'e781f68747470733a2f2f7777772e77332e6f72672f54522f77656261'\ + '7574686e2f646e616d657256696e204f6c696d7069612047657272696'\ + '56b646973706c61794e616d65781c446973706c617965642056696e20'\ + '4f6c696d706961204765727269650481a263616c672664747970656a7'\ + '075626c69632d6b6579') + + count = 0 + def count_keepalive(_x): + nonlocal count + count += 1 + + # We should get a keepalive within .5s + try: + r = device.send_data(CTAPHID.CBOR, precanned_make_credential, timeout = .50, on_keepalive = count_keepalive) + except CtapError as e: + assert e.code == CtapError.ERR.KEEPALIVE_CANCEL + assert count > 0 + + # wait for authnr to get UP or timeout + while True: + try: + r = device.send_data(CTAPHID.CBOR, '\x04') # getInfo + break + except CtapError as e: + assert e.code == CtapError.ERR.CHANNEL_BUSY diff --git a/tests/utils.py b/tests/utils.py index f0e729b..925f402 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -3,6 +3,9 @@ import random import string import secrets import math +from threading import Event, Timer +from numbers import Number + def verify(MC, GA, client_data_hash): credential_data = AttestedCredentialData(MC.auth_data.credential_data) @@ -56,3 +59,59 @@ def shannon_entropy(data): s -= p * math.log2(p) return s + +# Timeout from: +# https://github.com/Yubico/python-fido2/blob/f1dc028d6158e1d6d51558f72055c65717519b9b/fido2/utils.py +# Copyright (c) 2013 Yubico AB +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +class Timeout(object): + """Utility class for adding a timeout to an event. + :param time_or_event: A number, in seconds, or a threading.Event object. + :ivar event: The Event associated with the Timeout. + :ivar timer: The Timer associated with the Timeout, if any. + """ + + def __init__(self, time_or_event): + + if isinstance(time_or_event, Number): + self.event = Event() + self.timer = Timer(time_or_event, self.event.set) + else: + self.event = time_or_event + self.timer = None + + def __enter__(self): + if self.timer: + self.timer.start() + return self.event + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.timer: + self.timer.cancel() + self.timer.join()