123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- #!/usr/bin/python
- #
- # wpa_supplicant mesh mode tests
- # Copyright (c) 2014, cozybit Inc.
- #
- # This software may be distributed under the terms of the BSD license.
- # See README for more details.
- import logging
- logger = logging.getLogger()
- import hwsim_utils
- from wpasupplicant import WpaSupplicant
- def check_mesh_scan(dev, params, other_started=False):
- if not other_started:
- dev.dump_monitor()
- id = dev.request("SCAN " + params)
- if "FAIL" in id:
- raise Exception("Failed to start scan")
- id = int(id)
- if other_started:
- ev = dev.wait_event(["CTRL-EVENT-SCAN-STARTED"])
- if ev is None:
- raise Exception("Other scan did not start")
- if "id=" + str(id) in ev:
- raise Exception("Own scan id unexpectedly included in start event")
- ev = dev.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
- if ev is None:
- raise Exception("Other scan did not complete")
- if "id=" + str(id) in ev:
- raise Exception(
- "Own scan id unexpectedly included in completed event")
- ev = dev.wait_event(["CTRL-EVENT-SCAN-STARTED"])
- if ev is None:
- raise Exception("Scan did not start")
- if "id=" + str(id) not in ev:
- raise Exception("Scan id not included in start event")
- ev = dev.wait_event(["CTRL-EVENT-SCAN-RESULTS"])
- if ev is None:
- raise Exception("Scan did not complete")
- if "id=" + str(id) not in ev:
- raise Exception("Scan id not included in completed event")
- res = dev.request("SCAN_RESULTS")
- if res.find("[MESH]") < 0:
- raise Exception("Scan did not contain a MESH network")
- bssid = res.splitlines()[1].split(' ')[0]
- bss = dev.get_bss(bssid)
- if bss is None:
- raise Exception("Could not get BSS entry for mesh")
- if 'mesh_capability' not in bss:
- raise Exception("mesh_capability missing from BSS entry")
- def check_mesh_group_added(dev):
- ev = dev.wait_event(["MESH-GROUP-STARTED"])
- if ev is None:
- raise Exception("Test exception: Couldn't join mesh")
- def check_mesh_group_removed(dev):
- ev = dev.wait_event(["MESH-GROUP-REMOVED"])
- if ev is None:
- raise Exception("Test exception: Couldn't leave mesh")
- def check_mesh_peer_connected(dev):
- ev = dev.wait_event(["MESH-PEER-CONNECTED"])
- if ev is None:
- raise Exception("Test exception: Remote peer did not connect.")
- def check_mesh_peer_disconnected(dev):
- ev = dev.wait_event(["MESH-PEER-DISCONNECTED"])
- if ev is None:
- raise Exception("Test exception: Peer disconnect event not detected.")
- def test_wpas_add_set_remove_support(dev):
- """wpa_supplicant MESH add/set/remove network support"""
- id = dev[0].add_network()
- dev[0].set_network(id, "mode", "5")
- dev[0].remove_network(id)
- def add_open_mesh_network(dev, ht_mode=False, freq="2412", start=True):
- id = dev.add_network()
- dev.set_network(id, "mode", "5")
- dev.set_network_quoted(id, "ssid", "wpas-mesh-open")
- dev.set_network(id, "key_mgmt", "NONE")
- dev.set_network(id, "frequency", freq)
- if ht_mode:
- dev.set_network(id, "mesh_ht_mode", ht_mode)
- if start:
- dev.mesh_group_add(id)
- return id
- def test_wpas_mesh_group_added(dev):
- """wpa_supplicant MESH group add"""
- add_open_mesh_network(dev[0])
- # Check for MESH-GROUP-STARTED event
- check_mesh_group_added(dev[0])
- def test_wpas_mesh_group_remove(dev):
- """wpa_supplicant MESH group remove"""
- add_open_mesh_network(dev[0], ht_mode="NOHT")
- # Check for MESH-GROUP-STARTED event
- check_mesh_group_added(dev[0])
- dev[0].mesh_group_remove()
- # Check for MESH-GROUP-REMOVED event
- check_mesh_group_removed(dev[0])
- dev[0].mesh_group_remove()
- def test_wpas_mesh_peer_connected(dev):
- """wpa_supplicant MESH peer connected"""
- add_open_mesh_network(dev[0], ht_mode="HT20")
- add_open_mesh_network(dev[1], ht_mode="HT20")
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- def test_wpas_mesh_peer_disconnected(dev):
- """wpa_supplicant MESH peer disconnected"""
- add_open_mesh_network(dev[0])
- add_open_mesh_network(dev[1])
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- # Remove group on dev 1
- dev[1].mesh_group_remove()
- # Device 0 should get a disconnection event
- check_mesh_peer_disconnected(dev[0])
- def test_wpas_mesh_mode_scan(dev):
- """wpa_supplicant MESH scan support"""
- add_open_mesh_network(dev[0], ht_mode="HT40+")
- add_open_mesh_network(dev[1], ht_mode="HT40+")
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for Mesh scan
- check_mesh_scan(dev[0], "use_id=1")
- def wrap_wpas_mesh_test(test, dev, apdev):
- def _test_connectivity(dev1, dev2):
- return hwsim_utils.test_connectivity(dev1, dev2)
- return test(dev, apdev, _test_connectivity)
- def _test_wpas_mesh_open(dev, apdev, test_connectivity):
- add_open_mesh_network(dev[0], ht_mode="HT40-", freq="2462")
- add_open_mesh_network(dev[1], ht_mode="HT40-", freq="2462")
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- # Test connectivity 0->1 and 1->0
- test_connectivity(dev[0], dev[1])
- def test_wpas_mesh_open(dev, apdev):
- """wpa_supplicant open MESH network connectivity"""
- return wrap_wpas_mesh_test(_test_wpas_mesh_open, dev, apdev)
- def _test_wpas_mesh_open_no_auto(dev, apdev, test_connectivity):
- id = add_open_mesh_network(dev[0], start=False)
- dev[0].set_network(id, "dot11MeshMaxRetries", "16")
- dev[0].set_network(id, "dot11MeshRetryTimeout", "255")
- dev[0].mesh_group_add(id)
- id = add_open_mesh_network(dev[1], start=False)
- dev[1].set_network(id, "no_auto_peer", "1")
- dev[1].mesh_group_add(id)
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- # Test connectivity 0->1 and 1->0
- test_connectivity(dev[0], dev[1])
- def test_wpas_mesh_open_no_auto(dev, apdev):
- """wpa_supplicant open MESH network connectivity"""
- return wrap_wpas_mesh_test(_test_wpas_mesh_open_no_auto, dev, apdev)
- def add_mesh_secure_net(dev, psk=True):
- id = dev.add_network()
- dev.set_network(id, "mode", "5")
- dev.set_network_quoted(id, "ssid", "wpas-mesh-sec")
- dev.set_network(id, "key_mgmt", "SAE")
- dev.set_network(id, "frequency", "2412")
- if psk:
- dev.set_network_quoted(id, "psk", "thisismypassphrase!")
- return id
- def _test_wpas_mesh_secure(dev, apdev, test_connectivity):
- dev[0].request("SET sae_groups ")
- id = add_mesh_secure_net(dev[0])
- dev[0].mesh_group_add(id)
- dev[1].request("SET sae_groups ")
- id = add_mesh_secure_net(dev[1])
- dev[1].mesh_group_add(id)
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- # Test connectivity 0->1 and 1->0
- test_connectivity(dev[0], dev[1])
- def test_wpas_mesh_secure(dev, apdev):
- """wpa_supplicant secure MESH network connectivity"""
- return wrap_wpas_mesh_test(_test_wpas_mesh_secure, dev, apdev)
- def test_wpas_mesh_secure_sae_group_mismatch(dev, apdev):
- """wpa_supplicant secure MESH and SAE group mismatch"""
- addr0 = dev[0].p2p_interface_addr()
- addr1 = dev[1].p2p_interface_addr()
- addr2 = dev[2].p2p_interface_addr()
- dev[0].request("SET sae_groups 19 25")
- id = add_mesh_secure_net(dev[0])
- dev[0].mesh_group_add(id)
- dev[1].request("SET sae_groups 19")
- id = add_mesh_secure_net(dev[1])
- dev[1].mesh_group_add(id)
- dev[2].request("SET sae_groups 26")
- id = add_mesh_secure_net(dev[2])
- dev[2].mesh_group_add(id)
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- check_mesh_group_added(dev[2])
- ev = dev[0].wait_event(["MESH-PEER-CONNECTED"])
- if ev is None:
- raise Exception("Remote peer did not connect")
- if addr1 not in ev:
- raise Exception("Unexpected peer connected: " + ev)
- ev = dev[1].wait_event(["MESH-PEER-CONNECTED"])
- if ev is None:
- raise Exception("Remote peer did not connect")
- if addr0 not in ev:
- raise Exception("Unexpected peer connected: " + ev)
- ev = dev[2].wait_event(["MESH-PEER-CONNECTED"], timeout=1)
- if ev is not None:
- raise Exception("Unexpected peer connection at dev[2]: " + ev)
- ev = dev[0].wait_event(["MESH-PEER-CONNECTED"], timeout=0.1)
- if ev is not None:
- raise Exception("Unexpected peer connection: " + ev)
- ev = dev[1].wait_event(["MESH-PEER-CONNECTED"], timeout=0.1)
- if ev is not None:
- raise Exception("Unexpected peer connection: " + ev)
- dev[0].request("SET sae_groups ")
- dev[1].request("SET sae_groups ")
- dev[2].request("SET sae_groups ")
- def test_wpas_mesh_secure_sae_missing_password(dev, apdev):
- """wpa_supplicant secure MESH and missing SAE password"""
- id = add_mesh_secure_net(dev[0], psk=False)
- dev[0].set_network(id, "psk", "8f20b381f9b84371d61b5080ad85cac3c61ab3ca9525be5b2d0f4da3d979187a")
- dev[0].mesh_group_add(id)
- ev = dev[0].wait_event(["MESH-GROUP-STARTED", "Could not join mesh"],
- timeout=5)
- if ev is None:
- raise Exception("Timeout on mesh start event")
- if "MESH-GROUP-STARTED" in ev:
- raise Exception("Unexpected mesh group start")
- ev = dev[0].wait_event(["MESH-GROUP-STARTED"], timeout=0.1)
- if ev is not None:
- raise Exception("Unexpected mesh group start")
- def _test_wpas_mesh_secure_no_auto(dev, apdev, test_connectivity):
- dev[0].request("SET sae_groups 19")
- id = add_mesh_secure_net(dev[0])
- dev[0].mesh_group_add(id)
- dev[1].request("SET sae_groups 19")
- id = add_mesh_secure_net(dev[1])
- dev[1].set_network(id, "no_auto_peer", "1")
- dev[1].mesh_group_add(id)
- # Check for mesh joined
- check_mesh_group_added(dev[0])
- check_mesh_group_added(dev[1])
- # Check for peer connected
- check_mesh_peer_connected(dev[0])
- check_mesh_peer_connected(dev[1])
- # Test connectivity 0->1 and 1->0
- test_connectivity(dev[0], dev[1])
- dev[0].request("SET sae_groups ")
- dev[1].request("SET sae_groups ")
- def test_wpas_mesh_secure_no_auto(dev, apdev):
- """wpa_supplicant secure MESH network connectivity"""
- return wrap_wpas_mesh_test(_test_wpas_mesh_secure_no_auto, dev, apdev)
- def test_wpas_mesh_ctrl(dev):
- """wpa_supplicant ctrl_iface mesh command error cases"""
- if "FAIL" not in dev[0].request("MESH_GROUP_ADD 123"):
- raise Exception("Unexpected MESH_GROUP_ADD success")
- id = dev[0].add_network()
- if "FAIL" not in dev[0].request("MESH_GROUP_ADD %d" % id):
- raise Exception("Unexpected MESH_GROUP_ADD success")
- dev[0].set_network(id, "mode", "5")
- dev[0].set_network(id, "key_mgmt", "WPA-PSK")
- if "FAIL" not in dev[0].request("MESH_GROUP_ADD %d" % id):
- raise Exception("Unexpected MESH_GROUP_ADD success")
- if "FAIL" not in dev[0].request("MESH_GROUP_REMOVE foo"):
- raise Exception("Unexpected MESH_GROUP_REMOVE success")
- def test_wpas_mesh_dynamic_interface(dev):
- """wpa_supplicant mesh with dynamic interface"""
- mesh0 = None
- mesh1 = None
- try:
- mesh0 = dev[0].request("MESH_INTERFACE_ADD ifname=mesh0")
- if "FAIL" in mesh0:
- raise Exception("MESH_INTERFACE_ADD failed")
- mesh1 = dev[1].request("MESH_INTERFACE_ADD")
- if "FAIL" in mesh1:
- raise Exception("MESH_INTERFACE_ADD failed")
- wpas0 = WpaSupplicant(ifname=mesh0)
- wpas1 = WpaSupplicant(ifname=mesh1)
- logger.info(mesh0 + " address " + wpas0.get_status_field("address"))
- logger.info(mesh1 + " address " + wpas1.get_status_field("address"))
- add_open_mesh_network(wpas0)
- add_open_mesh_network(wpas1)
- check_mesh_group_added(wpas0)
- check_mesh_group_added(wpas1)
- check_mesh_peer_connected(wpas0)
- check_mesh_peer_connected(wpas1)
- hwsim_utils.test_connectivity(wpas0, wpas1)
- # Must not allow MESH_GROUP_REMOVE on dynamic interface
- if "FAIL" not in wpas0.request("MESH_GROUP_REMOVE " + mesh0):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- if "FAIL" not in wpas1.request("MESH_GROUP_REMOVE " + mesh1):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- # Must not allow MESH_GROUP_REMOVE on another radio interface
- if "FAIL" not in wpas0.request("MESH_GROUP_REMOVE " + mesh1):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- if "FAIL" not in wpas1.request("MESH_GROUP_REMOVE " + mesh0):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- wpas0.remove_ifname()
- wpas1.remove_ifname()
- if "OK" not in dev[0].request("MESH_GROUP_REMOVE " + mesh0):
- raise Exception("MESH_GROUP_REMOVE failed")
- if "OK" not in dev[1].request("MESH_GROUP_REMOVE " + mesh1):
- raise Exception("MESH_GROUP_REMOVE failed")
- if "FAIL" not in dev[0].request("MESH_GROUP_REMOVE " + mesh0):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- if "FAIL" not in dev[1].request("MESH_GROUP_REMOVE " + mesh1):
- raise Exception("Invalid MESH_GROUP_REMOVE accepted")
- logger.info("Make sure another dynamic group can be added")
- mesh0 = dev[0].request("MESH_INTERFACE_ADD ifname=mesh0")
- if "FAIL" in mesh0:
- raise Exception("MESH_INTERFACE_ADD failed")
- mesh1 = dev[1].request("MESH_INTERFACE_ADD")
- if "FAIL" in mesh1:
- raise Exception("MESH_INTERFACE_ADD failed")
- wpas0 = WpaSupplicant(ifname=mesh0)
- wpas1 = WpaSupplicant(ifname=mesh1)
- logger.info(mesh0 + " address " + wpas0.get_status_field("address"))
- logger.info(mesh1 + " address " + wpas1.get_status_field("address"))
- add_open_mesh_network(wpas0)
- add_open_mesh_network(wpas1)
- check_mesh_group_added(wpas0)
- check_mesh_group_added(wpas1)
- check_mesh_peer_connected(wpas0)
- check_mesh_peer_connected(wpas1)
- hwsim_utils.test_connectivity(wpas0, wpas1)
- finally:
- if mesh0:
- dev[0].request("MESH_GROUP_REMOVE " + mesh0)
- if mesh1:
- dev[1].request("MESH_GROUP_REMOVE " + mesh1)
|