From: Jakub Kicinski Add test checking conditions which lead to connections breaking. Using bad key or connection gets stuck if device key is rotated twice. Signed-off-by: Jakub Kicinski Signed-off-by: Daniel Zahka --- tools/testing/selftests/drivers/net/psp.py | 111 +++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/tools/testing/selftests/drivers/net/psp.py b/tools/testing/selftests/drivers/net/psp.py index 1c504975b07d..46e63b85be75 100755 --- a/tools/testing/selftests/drivers/net/psp.py +++ b/tools/testing/selftests/drivers/net/psp.py @@ -15,6 +15,10 @@ from lib.py import NetDrvEpEnv, PSPFamily, NlError from lib.py import bkg, rand_port, wait_port_listen +class PSPExceptShortIO(Exception): + pass + + def _get_outq(s): one = b'\0' * 4 outq = fcntl.ioctl(s.fileno(), termios.TIOCOUTQ, one) @@ -89,6 +93,18 @@ def _send_careful(cfg, s, rounds): return len(data) * rounds +def _recv_careful(s, target, rounds=100): + data = b'' + for _ in range(rounds): + try: + data += s.recv(target - len(data), socket.MSG_DONTWAIT) + if len(data) == target: + return data + except BlockingIOError: + time.sleep(0.001) + raise PSPExceptShortIO(target, len(data), data) + + def _check_data_rx(cfg, exp_len): read_len = -1 for _ in range(30): @@ -99,6 +115,27 @@ def _check_data_rx(cfg, exp_len): time.sleep(0.01) ksft_eq(read_len, exp_len) + +def _check_data_outq(s, exp_len, force_wait=False): + outq = 0 + for _ in range(10): + outq = _get_outq(s) + if not force_wait and outq == exp_len: + break + time.sleep(0.01) + ksft_eq(outq, exp_len) + + +def _req_echo(cfg, s, expect_fail=False): + _send_with_ack(cfg, b'data echo\0') + try: + _recv_careful(s, 5) + if expect_fail: + raise Exception("Received unexpected echo reply") + except PSPExceptShortIO: + if not expect_fail: + raise + # # Test cases # @@ -324,6 +361,80 @@ def _data_basic_send(cfg, version, ipver): _close_psp_conn(cfg, s) +def __bad_xfer_do(cfg, s, tx, version='hdr0-aes-gcm-128'): + # Make sure we accept the ACK for the SPI before we seal with the bad assoc + _check_data_outq(s, 0) + + cfg.pspnl.tx_assoc({"dev-id": cfg.psp_dev_id, + "version": version, + "tx-key": tx, + "sock-fd": s.fileno()}) + + data_len = _send_careful(cfg, s, 20) + _check_data_outq(s, data_len, force_wait=True) + _check_data_rx(cfg, 0) + _close_psp_conn(cfg, s) + + +def data_send_bad_key(cfg): + """ Test send data with bad key """ + s = _make_psp_conn(cfg) + + rx_assoc = cfg.pspnl.rx_assoc({"version": 0, + "dev-id": cfg.psp_dev_id, + "sock-fd": s.fileno()}) + rx = rx_assoc['rx-key'] + tx = _spi_xchg(s, rx) + tx['key'] = (tx['key'][0] ^ 0xff).to_bytes(1, 'little') + tx['key'][1:] + __bad_xfer_do(cfg, s, tx) + + +def data_send_disconnect(cfg): + """ Test socket close after sending data """ + with _make_psp_conn(cfg) as s: + assoc = cfg.pspnl.rx_assoc({"version": 0, + "sock-fd": s.fileno()}) + tx = _spi_xchg(s, assoc['rx-key']) + cfg.pspnl.tx_assoc({"version": 0, + "tx-key": tx, + "sock-fd": s.fileno()}) + + data_len = _send_careful(cfg, s, 100) + _check_data_rx(cfg, data_len) + + s.shutdown(socket.SHUT_RDWR) + s.close() + + +def data_stale_key(cfg): + """ Test send on a double-rotated key """ + + s = _make_psp_conn(cfg) + try: + rx_assoc = cfg.pspnl.rx_assoc({"version": 0, + "dev-id": cfg.psp_dev_id, + "sock-fd": s.fileno()}) + rx = rx_assoc['rx-key'] + tx = _spi_xchg(s, rx) + + cfg.pspnl.tx_assoc({"dev-id": cfg.psp_dev_id, + "version": 0, + "tx-key": tx, + "sock-fd": s.fileno()}) + + data_len = _send_careful(cfg, s, 100) + _check_data_rx(cfg, data_len) + _check_data_outq(s, 0) + + cfg.pspnl.key_rotate({"id": cfg.psp_dev_id}) + cfg.pspnl.key_rotate({"id": cfg.psp_dev_id}) + + s.send(b'0123456789' * 200) + _check_data_outq(s, 2000, force_wait=True) + finally: + _close_psp_conn(cfg, s) + + def psp_ip_ver_test_builder(name, test_func, psp_ver, ipver): """Build test cases for each combo of PSP version and IP version""" def test_case(cfg): -- 2.47.3