123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901 |
- # P2P autonomous GO test cases
- # Copyright (c) 2013-2015, Jouni Malinen <j@w1.fi>
- #
- # This software may be distributed under the terms of the BSD license.
- # See README for more details.
- from remotehost import remote_compatible
- import time
- import subprocess
- import logging
- logger = logging.getLogger()
- import hwsim_utils
- import utils
- from utils import HwsimSkip
- from wlantest import Wlantest
- from wpasupplicant import WpaSupplicant
- from p2p_utils import *
- from test_p2p_messages import mgmt_tx, parse_p2p_public_action
- def test_autogo(dev):
- """P2P autonomous GO and client joining group"""
- addr0 = dev[0].p2p_dev_addr()
- addr2 = dev[2].p2p_dev_addr()
- res = autogo(dev[0])
- if "p2p-wlan" in res['ifname']:
- raise Exception("Unexpected group interface name on GO")
- res = connect_cli(dev[0], dev[1])
- if "p2p-wlan" in res['ifname']:
- raise Exception("Unexpected group interface name on client")
- bss = dev[1].get_bss("p2p_dev_addr=" + addr0, res['ifname'])
- if not bss or bss['bssid'] != dev[0].p2p_interface_addr():
- raise Exception("Unexpected BSSID in the BSS entry for the GO")
- id = bss['id']
- bss = dev[1].get_bss("ID-" + id, res['ifname'])
- if not bss or bss['id'] != id:
- raise Exception("Could not find BSS entry based on id")
- res = dev[1].group_request("BSS RANGE=" + id + "- MASK=0x1")
- if "id=" + id not in res:
- raise Exception("Could not find BSS entry based on id range")
- res = dev[1].request("SCAN_RESULTS")
- if "[P2P]" not in res:
- raise Exception("P2P flag missing from scan results: " + res)
- # Presence request to increase testing coverage
- if "FAIL" not in dev[1].group_request("P2P_PRESENCE_REQ 30000"):
- raise Exception("Invald P2P_PRESENCE_REQ accepted")
- if "FAIL" not in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400 30001"):
- raise Exception("Invald P2P_PRESENCE_REQ accepted")
- if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400"):
- raise Exception("Could not send presence request")
- ev = dev[1].wait_group_event(["P2P-PRESENCE-RESPONSE"], 10)
- if ev is None:
- raise Exception("Timeout while waiting for Presence Response")
- if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400 20000 102400"):
- raise Exception("Could not send presence request")
- ev = dev[1].wait_group_event(["P2P-PRESENCE-RESPONSE"])
- if ev is None:
- raise Exception("Timeout while waiting for Presence Response")
- if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ"):
- raise Exception("Could not send presence request")
- ev = dev[1].wait_group_event(["P2P-PRESENCE-RESPONSE"])
- if ev is None:
- raise Exception("Timeout while waiting for Presence Response")
- if not dev[2].discover_peer(addr0):
- raise Exception("Could not discover GO")
- dev[0].dump_monitor()
- dev[2].global_request("P2P_PROV_DISC " + addr0 + " display join")
- ev = dev[0].wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
- if ev is None:
- raise Exception("GO did not report P2P-PROV-DISC-SHOW-PIN")
- if "p2p_dev_addr=" + addr2 not in ev:
- raise Exception("Unexpected P2P Device Address in event: " + ev)
- if "group=" + dev[0].group_ifname not in ev:
- raise Exception("Unexpected group interface in event: " + ev)
- ev = dev[2].wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
- if ev is None:
- raise Exception("P2P-PROV-DISC-ENTER-PIN not reported")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo2(dev):
- """P2P autonomous GO with a separate group interface and client joining group"""
- dev[0].global_request("SET p2p_no_group_iface 0")
- res = autogo(dev[0], freq=2437)
- if "p2p-wlan" not in res['ifname']:
- raise Exception("Unexpected group interface name on GO")
- if res['ifname'] not in utils.get_ifnames():
- raise Exception("Could not find group interface netdev")
- connect_cli(dev[0], dev[1], social=True, freq=2437)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- if res['ifname'] in utils.get_ifnames():
- raise Exception("Group interface netdev was not removed")
- def test_autogo3(dev):
- """P2P autonomous GO and client with a separate group interface joining group"""
- dev[1].global_request("SET p2p_no_group_iface 0")
- autogo(dev[0], freq=2462)
- res = connect_cli(dev[0], dev[1], social=True, freq=2462)
- if "p2p-wlan" not in res['ifname']:
- raise Exception("Unexpected group interface name on client")
- if res['ifname'] not in utils.get_ifnames():
- raise Exception("Could not find group interface netdev")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- dev[1].ping()
- if res['ifname'] in utils.get_ifnames():
- raise Exception("Group interface netdev was not removed")
- def test_autogo4(dev):
- """P2P autonomous GO and client joining group (both with a separate group interface)"""
- dev[0].global_request("SET p2p_no_group_iface 0")
- dev[1].global_request("SET p2p_no_group_iface 0")
- res1 = autogo(dev[0], freq=2412)
- res2 = connect_cli(dev[0], dev[1], social=True, freq=2412)
- if "p2p-wlan" not in res1['ifname']:
- raise Exception("Unexpected group interface name on GO")
- if "p2p-wlan" not in res2['ifname']:
- raise Exception("Unexpected group interface name on client")
- ifnames = utils.get_ifnames()
- if res1['ifname'] not in ifnames:
- raise Exception("Could not find GO group interface netdev")
- if res2['ifname'] not in ifnames:
- raise Exception("Could not find client group interface netdev")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- dev[1].ping()
- ifnames = utils.get_ifnames()
- if res1['ifname'] in ifnames:
- raise Exception("GO group interface netdev was not removed")
- if res2['ifname'] in ifnames:
- raise Exception("Client group interface netdev was not removed")
- def test_autogo_m2d(dev):
- """P2P autonomous GO and clients not authorized"""
- autogo(dev[0], freq=2412)
- go_addr = dev[0].p2p_dev_addr()
- dev[1].global_request("SET p2p_no_group_iface 0")
- if not dev[1].discover_peer(go_addr, social=True):
- raise Exception("GO " + go_addr + " not found")
- dev[1].dump_monitor()
- if not dev[2].discover_peer(go_addr, social=True):
- raise Exception("GO " + go_addr + " not found")
- dev[2].dump_monitor()
- logger.info("Trying to join the group when GO has not authorized the client")
- pin = dev[1].wps_read_pin()
- cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
- if "OK" not in dev[1].global_request(cmd):
- raise Exception("P2P_CONNECT join failed")
- pin = dev[2].wps_read_pin()
- cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
- if "OK" not in dev[2].global_request(cmd):
- raise Exception("P2P_CONNECT join failed")
- ev = dev[1].wait_global_event(["WPS-M2D"], timeout=16)
- if ev is None:
- raise Exception("No global M2D event")
- ifaces = dev[1].request("INTERFACES").splitlines()
- iface = ifaces[0] if "p2p-wlan" in ifaces[0] else ifaces[1]
- wpas = WpaSupplicant(ifname=iface)
- ev = wpas.wait_event(["WPS-M2D"], timeout=10)
- if ev is None:
- raise Exception("No M2D event on group interface")
- ev = dev[2].wait_global_event(["WPS-M2D"], timeout=10)
- if ev is None:
- raise Exception("No global M2D event (2)")
- ev = dev[2].wait_event(["WPS-M2D"], timeout=10)
- if ev is None:
- raise Exception("No M2D event on group interface (2)")
- @remote_compatible
- def test_autogo_fail(dev):
- """P2P autonomous GO and incorrect PIN"""
- autogo(dev[0], freq=2412)
- go_addr = dev[0].p2p_dev_addr()
- dev[0].p2p_go_authorize_client("00000000")
- dev[1].global_request("SET p2p_no_group_iface 0")
- if not dev[1].discover_peer(go_addr, social=True):
- raise Exception("GO " + go_addr + " not found")
- dev[1].dump_monitor()
- logger.info("Trying to join the group when GO has not authorized the client")
- pin = dev[1].wps_read_pin()
- cmd = "P2P_CONNECT " + go_addr + " " + pin + " join"
- if "OK" not in dev[1].global_request(cmd):
- raise Exception("P2P_CONNECT join failed")
- ev = dev[1].wait_global_event(["WPS-FAIL"], timeout=10)
- if ev is None:
- raise Exception("No global WPS-FAIL event")
- def test_autogo_2cli(dev):
- """P2P autonomous GO and two clients joining group"""
- autogo(dev[0], freq=2412)
- connect_cli(dev[0], dev[1], social=True, freq=2412)
- connect_cli(dev[0], dev[2], social=True, freq=2412)
- hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
- dev[0].global_request("P2P_REMOVE_CLIENT " + dev[1].p2p_dev_addr())
- dev[1].wait_go_ending_session()
- dev[0].global_request("P2P_REMOVE_CLIENT iface=" + dev[2].p2p_interface_addr())
- dev[2].wait_go_ending_session()
- if "FAIL" not in dev[0].global_request("P2P_REMOVE_CLIENT foo"):
- raise Exception("Invalid P2P_REMOVE_CLIENT command accepted")
- dev[0].remove_group()
- def test_autogo_pbc(dev):
- """P2P autonomous GO and PBC"""
- dev[1].global_request("SET p2p_no_group_iface 0")
- autogo(dev[0], freq=2412)
- if "FAIL" not in dev[0].group_request("WPS_PBC p2p_dev_addr=00:11:22:33:44"):
- raise Exception("Invalid WPS_PBC succeeded")
- if "OK" not in dev[0].group_request("WPS_PBC p2p_dev_addr=" + dev[1].p2p_dev_addr()):
- raise Exception("WPS_PBC failed")
- dev[2].p2p_connect_group(dev[0].p2p_dev_addr(), "pbc", timeout=0,
- social=True)
- ev = dev[2].wait_global_event(["WPS-M2D"], timeout=15)
- if ev is None:
- raise Exception("WPS-M2D not reported")
- if "config_error=12" not in ev:
- raise Exception("Unexpected config_error: " + ev)
- dev[1].p2p_connect_group(dev[0].p2p_dev_addr(), "pbc", timeout=15,
- social=True)
- def test_autogo_tdls(dev):
- """P2P autonomous GO and two clients using TDLS"""
- go = dev[0]
- logger.info("Start autonomous GO with fixed parameters " + go.ifname)
- id = go.add_network()
- go.set_network_quoted(id, "ssid", "DIRECT-tdls")
- go.set_network_quoted(id, "psk", "12345678")
- go.set_network(id, "mode", "3")
- go.set_network(id, "disabled", "2")
- res = go.p2p_start_go(persistent=id, freq="2462")
- logger.debug("res: " + str(res))
- Wlantest.setup(go, True)
- wt = Wlantest()
- wt.flush()
- wt.add_passphrase("12345678")
- connect_cli(go, dev[1], social=True, freq=2462)
- connect_cli(go, dev[2], social=True, freq=2462)
- hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
- bssid = dev[0].p2p_interface_addr()
- addr1 = dev[1].p2p_interface_addr()
- addr2 = dev[2].p2p_interface_addr()
- dev[1].tdls_setup(addr2)
- time.sleep(1)
- hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
- conf = wt.get_tdls_counter("setup_conf_ok", bssid, addr1, addr2)
- if conf == 0:
- raise Exception("No TDLS Setup Confirm (success) seen")
- dl = wt.get_tdls_counter("valid_direct_link", bssid, addr1, addr2)
- if dl == 0:
- raise Exception("No valid frames through direct link")
- wt.tdls_clear(bssid, addr1, addr2)
- dev[1].tdls_teardown(addr2)
- time.sleep(1)
- teardown = wt.get_tdls_counter("teardown", bssid, addr1, addr2)
- if teardown == 0:
- raise Exception("No TDLS Setup Teardown seen")
- wt.tdls_clear(bssid, addr1, addr2)
- hwsim_utils.test_connectivity_p2p(dev[1], dev[2])
- ap_path = wt.get_tdls_counter("valid_ap_path", bssid, addr1, addr2)
- if ap_path == 0:
- raise Exception("No valid frames via AP path")
- direct_link = wt.get_tdls_counter("valid_direct_link", bssid, addr1, addr2)
- if direct_link > 0:
- raise Exception("Unexpected frames through direct link")
- idirect_link = wt.get_tdls_counter("invalid_direct_link", bssid, addr1,
- addr2)
- if idirect_link > 0:
- raise Exception("Unexpected frames through direct link (invalid)")
- dev[2].remove_group()
- dev[1].remove_group()
- dev[0].remove_group()
- def test_autogo_legacy(dev):
- """P2P autonomous GO and legacy clients"""
- res = autogo(dev[0], freq=2462)
- if dev[0].get_group_status_field("passphrase", extra="WPS") != res['passphrase']:
- raise Exception("passphrase mismatch")
- if dev[0].group_request("P2P_GET_PASSPHRASE") != res['passphrase']:
- raise Exception("passphrase mismatch(2)")
- logger.info("Connect P2P client")
- connect_cli(dev[0], dev[1], social=True, freq=2462)
- if "FAIL" not in dev[1].request("P2P_GET_PASSPHRASE"):
- raise Exception("P2P_GET_PASSPHRASE succeeded on P2P Client")
- logger.info("Connect legacy WPS client")
- pin = dev[2].wps_read_pin()
- dev[0].p2p_go_authorize_client(pin)
- dev[2].request("P2P_SET disabled 1")
- dev[2].dump_monitor()
- dev[2].request("WPS_PIN any " + pin)
- dev[2].wait_connected(timeout=30)
- status = dev[2].get_status()
- if status['wpa_state'] != 'COMPLETED':
- raise Exception("Not fully connected")
- hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
- dev[2].request("DISCONNECT")
- logger.info("Connect legacy non-WPS client")
- dev[2].request("FLUSH")
- dev[2].request("P2P_SET disabled 1")
- dev[2].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
- key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
- scan_freq=res['freq'])
- hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
- dev[2].request("DISCONNECT")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo_chan_switch(dev):
- """P2P autonomous GO switching channels"""
- run_autogo_chan_switch(dev)
- def run_autogo_chan_switch(dev):
- autogo(dev[0], freq=2417)
- connect_cli(dev[0], dev[1], freq=2417)
- res = dev[0].group_request("CHAN_SWITCH 5 2422")
- if "FAIL" in res:
- # for now, skip test since mac80211_hwsim support is not yet widely
- # deployed
- raise HwsimSkip("Assume mac80211_hwsim did not support channel switching")
- ev = dev[0].wait_group_event(["AP-CSA-FINISHED"], timeout=10)
- if ev is None:
- raise Exception("CSA finished event timed out")
- if "freq=2422" not in ev:
- raise Exception("Unexpected cahnnel in CSA finished event")
- dev[0].dump_monitor()
- dev[1].dump_monitor()
- time.sleep(0.1)
- hwsim_utils.test_connectivity_p2p(dev[0], dev[1])
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo_chan_switch_group_iface(dev):
- """P2P autonomous GO switching channels (separate group interface)"""
- dev[0].global_request("SET p2p_no_group_iface 0")
- run_autogo_chan_switch(dev)
- @remote_compatible
- def test_autogo_extra_cred(dev):
- """P2P autonomous GO sending two WPS credentials"""
- if "FAIL" in dev[0].request("SET wps_testing_dummy_cred 1"):
- raise Exception("Failed to enable test mode")
- autogo(dev[0], freq=2412)
- connect_cli(dev[0], dev[1], social=True, freq=2412)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo_ifdown(dev):
- """P2P autonomous GO and external ifdown"""
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- wpas.interface_add("wlan5")
- res = autogo(wpas)
- wpas.dump_monitor()
- wpas.interface_remove("wlan5")
- wpas.interface_add("wlan5")
- res = autogo(wpas)
- wpas.dump_monitor()
- subprocess.call(['ifconfig', res['ifname'], 'down'])
- ev = wpas.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
- if ev is None:
- raise Exception("Group removal not reported")
- if res['ifname'] not in ev:
- raise Exception("Unexpected group removal event: " + ev)
- @remote_compatible
- def test_autogo_start_during_scan(dev):
- """P2P autonomous GO started during ongoing manual scan"""
- try:
- # use autoscan to set scan_req = MANUAL_SCAN_REQ
- if "OK" not in dev[0].request("AUTOSCAN periodic:1"):
- raise Exception("Failed to set autoscan")
- autogo(dev[0], freq=2462)
- connect_cli(dev[0], dev[1], social=True, freq=2462)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- finally:
- dev[0].request("AUTOSCAN ")
- def test_autogo_passphrase_len(dev):
- """P2P autonomous GO and longer passphrase"""
- try:
- if "OK" not in dev[0].request("SET p2p_passphrase_len 13"):
- raise Exception("Failed to set passphrase length")
- res = autogo(dev[0], freq=2412)
- if len(res['passphrase']) != 13:
- raise Exception("Unexpected passphrase length")
- if dev[0].get_group_status_field("passphrase", extra="WPS") != res['passphrase']:
- raise Exception("passphrase mismatch")
- logger.info("Connect P2P client")
- connect_cli(dev[0], dev[1], social=True, freq=2412)
- logger.info("Connect legacy WPS client")
- pin = dev[2].wps_read_pin()
- dev[0].p2p_go_authorize_client(pin)
- dev[2].request("P2P_SET disabled 1")
- dev[2].dump_monitor()
- dev[2].request("WPS_PIN any " + pin)
- dev[2].wait_connected(timeout=30)
- status = dev[2].get_status()
- if status['wpa_state'] != 'COMPLETED':
- raise Exception("Not fully connected")
- dev[2].request("DISCONNECT")
- logger.info("Connect legacy non-WPS client")
- dev[2].request("FLUSH")
- dev[2].request("P2P_SET disabled 1")
- dev[2].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
- key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
- scan_freq=res['freq'])
- hwsim_utils.test_connectivity_p2p_sta(dev[1], dev[2])
- dev[2].request("DISCONNECT")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- finally:
- dev[0].request("SET p2p_passphrase_len 8")
- @remote_compatible
- def test_autogo_bridge(dev):
- """P2P autonomous GO in a bridge"""
- try:
- # use autoscan to set scan_req = MANUAL_SCAN_REQ
- if "OK" not in dev[0].request("AUTOSCAN periodic:1"):
- raise Exception("Failed to set autoscan")
- autogo(dev[0])
- ifname = dev[0].get_group_ifname()
- dev[0].cmd_execute(['brctl', 'addbr', 'p2p-br0'])
- dev[0].cmd_execute(['brctl', 'setfd', 'p2p-br0', '0'])
- dev[0].cmd_execute(['brctl', 'addif', 'p2p-br0', ifname])
- dev[0].cmd_execute(['ip', 'link', 'set', 'dev', 'p2p-br0', 'up'])
- time.sleep(0.1)
- dev[0].cmd_execute(['brctl', 'delif', 'p2p-br0', ifname])
- time.sleep(0.1)
- dev[0].cmd_execute(['ip', 'link', 'set', 'dev', 'p2p-br0', 'down'])
- time.sleep(0.1)
- dev[0].cmd_execute(['brctl', 'delbr', 'p2p-br0'])
- ev = dev[0].wait_global_event(["P2P-GROUP-REMOVED"], timeout=1)
- if ev is not None:
- raise Exception("P2P group removed unexpectedly")
- if dev[0].get_group_status_field('wpa_state') != "COMPLETED":
- raise Exception("Unexpected wpa_state")
- dev[0].remove_group()
- finally:
- dev[0].request("AUTOSCAN ")
- dev[0].cmd_execute(['brctl', 'delif', 'p2p-br0', ifname,
- '2>', '/dev/null'], shell=True)
- dev[0].cmd_execute(['ip', 'link', 'set', 'dev', 'p2p-br0', 'down',
- '2>', '/dev/null'], shell=True)
- dev[0].cmd_execute(['brctl', 'delbr', 'p2p-br0', '2>', '/dev/null'],
- shell=True)
- @remote_compatible
- def test_presence_req_on_group_interface(dev):
- """P2P_PRESENCE_REQ on group interface"""
- dev[1].global_request("SET p2p_no_group_iface 0")
- res = autogo(dev[0], freq=2437)
- res = connect_cli(dev[0], dev[1], social=True, freq=2437)
- if "FAIL" in dev[1].group_request("P2P_PRESENCE_REQ 30000 102400"):
- raise Exception("Could not send presence request")
- ev = dev[1].wait_group_event(["P2P-PRESENCE-RESPONSE"])
- if ev is None:
- raise Exception("Timeout while waiting for Presence Response")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo_join_auto_go_not_found(dev):
- """P2P_CONNECT-auto not finding GO"""
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- wpas.interface_add("wlan5")
- wpas.request("P2P_SET listen_channel 1")
- wpas.global_request("SET p2p_no_group_iface 0")
- autogo(wpas, freq=2412)
- addr = wpas.p2p_dev_addr()
- bssid = wpas.p2p_interface_addr()
- wpas.dump_monitor()
- dev[1].global_request("SET p2p_no_group_iface 0")
- dev[1].scan_for_bss(bssid, freq=2412)
- # This makes the GO not show up in the scan iteration following the
- # P2P_CONNECT command by stopping beaconing and handling Probe Request
- # frames externally (but not really replying to them). P2P listen mode is
- # needed to keep the GO listening on the operating channel for the PD
- # exchange.
- if "OK" not in wpas.group_request("STOP_AP"):
- raise Exception("STOP_AP failed")
- wpas.dump_monitor()
- wpas.group_request("SET ext_mgmt_frame_handling 1")
- wpas.p2p_listen()
- wpas.dump_monitor()
- time.sleep(0.02)
- dev[1].global_request("P2P_CONNECT " + addr + " pbc auto")
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG-ENABLED"], 15)
- wpas.dump_monitor()
- if ev is None:
- raise Exception("Could not trigger old-scan-only case")
- return
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG"], 15)
- wpas.remove_group()
- if ev is None:
- raise Exception("Fallback to GO Negotiation not seen")
- if "reason=GO-not-found" not in ev:
- raise Exception("Unexpected reason for fallback: " + ev)
- wpas.dump_monitor()
- def test_autogo_join_auto(dev):
- """P2P_CONNECT-auto joining a group"""
- autogo(dev[0])
- addr = dev[0].p2p_dev_addr()
- if "OK" not in dev[1].global_request("P2P_CONNECT " + addr + " pbc auto"):
- raise Exception("P2P_CONNECT failed")
- ev = dev[0].wait_global_event(["P2P-PROV-DISC-PBC-REQ"], timeout=15)
- if ev is None:
- raise Exception("Timeout on P2P-PROV-DISC-PBC-REQ")
- if "group=" + dev[0].group_ifname not in ev:
- raise Exception("Unexpected PD event contents: " + ev)
- dev[0].group_request("WPS_PBC")
- ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
- if ev is None:
- raise Exception("Joining the group timed out")
- dev[1].group_form_result(ev)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- dev[1].flush_scan_cache()
- @remote_compatible
- def test_autogo_join_auto_go_neg(dev):
- """P2P_CONNECT-auto fallback to GO Neg"""
- dev[1].flush_scan_cache()
- dev[0].p2p_listen()
- addr = dev[0].p2p_dev_addr()
- if not dev[1].discover_peer(addr, social=True):
- raise Exception("Peer not found")
- dev[1].p2p_stop_find()
- if "OK" not in dev[1].global_request("P2P_CONNECT " + addr + " pbc auto"):
- raise Exception("P2P_CONNECT failed")
- ev = dev[0].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
- if ev is None:
- raise Exception("Timeout on P2P-GO-NEG-REQUEST")
- peer = ev.split(' ')[1]
- dev[0].p2p_go_neg_init(peer, None, "pbc", timeout=15, go_intent=15)
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG"], timeout=1)
- if ev is None:
- raise Exception("No P2P-FALLBACK-TO-GO-NEG event seen")
- if "P2P-FALLBACK-TO-GO-NEG-ENABLED" in ev:
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG"], timeout=1)
- if ev is None:
- raise Exception("No P2P-FALLBACK-TO-GO-NEG event seen")
- if "reason=peer-not-running-GO" not in ev:
- raise Exception("Unexpected reason: " + ev)
- ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
- if ev is None:
- raise Exception("Joining the group timed out")
- dev[1].group_form_result(ev)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- dev[1].flush_scan_cache()
- @remote_compatible
- def test_autogo_join_auto_go_neg_after_seeing_go(dev):
- """P2P_CONNECT-auto fallback to GO Neg after seeing GO"""
- autogo(dev[0], freq=2412)
- addr = dev[0].p2p_dev_addr()
- bssid = dev[0].p2p_interface_addr()
- dev[1].scan_for_bss(bssid, freq=2412)
- dev[0].remove_group()
- dev[0].p2p_listen()
- if "OK" not in dev[1].global_request("P2P_CONNECT " + addr + " pbc auto"):
- raise Exception("P2P_CONNECT failed")
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG-ENABLED"],
- timeout=15)
- if ev is None:
- raise Exception("No P2P-FALLBACK-TO-GO-NEG-ENABLED event seen")
- ev = dev[0].wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
- if ev is None:
- raise Exception("Timeout on P2P-GO-NEG-REQUEST")
- peer = ev.split(' ')[1]
- dev[0].p2p_go_neg_init(peer, None, "pbc", timeout=15, go_intent=15)
- ev = dev[1].wait_global_event(["P2P-FALLBACK-TO-GO-NEG"], timeout=1)
- if ev is None:
- raise Exception("No P2P-FALLBACK-TO-GO-NEG event seen")
- if "reason=no-ACK-to-PD-Req" not in ev and "reason=PD-failed" not in ev:
- raise Exception("Unexpected reason: " + ev)
- ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
- if ev is None:
- raise Exception("Joining the group timed out")
- dev[1].group_form_result(ev)
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- dev[1].flush_scan_cache()
- def test_go_search_non_social(dev):
- """P2P_FIND with freq parameter to scan a single channel"""
- addr0 = dev[0].p2p_dev_addr()
- autogo(dev[0], freq=2422)
- dev[1].p2p_find(freq=2422)
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=3.5)
- if ev is None:
- dev[1].p2p_stop_find()
- dev[1].p2p_find(freq=2422)
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=3.5)
- if ev is None:
- raise Exception("Did not find GO quickly enough")
- dev[2].p2p_listen()
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=5)
- if ev is None:
- raise Exception("Did not find peer")
- dev[2].p2p_stop_find()
- dev[1].p2p_stop_find()
- dev[0].remove_group()
- def test_go_search_non_social2(dev):
- """P2P_FIND with freq parameter to scan a single channel (2)"""
- addr0 = dev[0].p2p_dev_addr()
- dev[1].p2p_find(freq=2422)
- # Wait for the first p2p_find scan round to complete before starting GO
- time.sleep(1)
- autogo(dev[0], freq=2422)
- # Verify that p2p_find is still scanning the specified frequency
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=5)
- if ev is None:
- dev[1].p2p_stop_find()
- raise Exception("Did not find GO quickly enough")
- # Verify that p2p_find is scanning the social channels
- dev[2].p2p_listen()
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=5)
- if ev is None:
- raise Exception("Did not find peer")
- dev[2].p2p_stop_find()
- dev[1].p2p_stop_find()
- dev[0].remove_group()
- dev[1].dump_monitor()
- # Verify that social channel as the specific channel works
- dev[1].p2p_find(freq=2412)
- time.sleep(0.5)
- dev[2].p2p_listen()
- ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=5)
- if ev is None:
- raise Exception("Did not find peer (2)")
- def test_autogo_many(dev):
- """P2P autonomous GO with large number of GO instances"""
- dev[0].global_request("SET p2p_no_group_iface 0")
- for i in range(100):
- if "OK" not in dev[0].global_request("P2P_GROUP_ADD freq=2412"):
- logger.info("Was able to add %d groups" % i)
- if i < 5:
- raise Exception("P2P_GROUP_ADD failed")
- stop_ev = dev[0].wait_global_event(["P2P-GROUP-REMOVE"], timeout=1)
- if stop_ev is not None:
- raise Exception("Unexpected P2P-GROUP-REMOVE event")
- break
- ev = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
- if ev is None:
- raise Exception("GO start up timed out")
- dev[0].group_form_result(ev)
- for i in dev[0].global_request("INTERFACES").splitlines():
- dev[0].request("P2P_GROUP_REMOVE " + i)
- dev[0].dump_monitor()
- dev[0].request("P2P_GROUP_REMOVE *")
- def test_autogo_many_clients(dev):
- """P2P autonomous GO and many clients (P2P IE fragmentation)"""
- try:
- _test_autogo_many_clients(dev)
- finally:
- dev[0].global_request("SET device_name Device A")
- dev[1].global_request("SET device_name Device B")
- dev[2].global_request("SET device_name Device C")
- def _test_autogo_many_clients(dev):
- # These long device names will push the P2P IE contents beyond the limit
- # that requires fragmentation.
- name0 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- name1 = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
- name2 = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
- name3 = "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
- dev[0].global_request("SET device_name " + name0)
- dev[1].global_request("SET device_name " + name1)
- dev[2].global_request("SET device_name " + name2)
- addr0 = dev[0].p2p_dev_addr()
- res = autogo(dev[0], freq=2412)
- bssid = dev[0].p2p_interface_addr()
- connect_cli(dev[0], dev[1], social=True, freq=2412)
- dev[0].dump_monitor()
- connect_cli(dev[0], dev[2], social=True, freq=2412)
- dev[0].dump_monitor()
- wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- wpas.interface_add("wlan5")
- wpas.global_request("SET device_name " + name3)
- wpas.global_request("SET sec_device_type 1-11111111-1")
- wpas.global_request("SET sec_device_type 2-22222222-2")
- wpas.global_request("SET sec_device_type 3-33333333-3")
- wpas.global_request("SET sec_device_type 4-44444444-4")
- wpas.global_request("SET sec_device_type 5-55555555-5")
- connect_cli(dev[0], wpas, social=True, freq=2412)
- dev[0].dump_monitor()
- dev[1].dump_monitor()
- dev[1].p2p_find(freq=2412)
- ev1 = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=10)
- if ev1 is None:
- raise Exception("Could not find peer (1)")
- ev2 = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=10)
- if ev2 is None:
- raise Exception("Could not find peer (2)")
- ev3 = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=10)
- if ev3 is None:
- raise Exception("Could not find peer (3)")
- dev[1].p2p_stop_find()
- for i in [ name0, name2, name3 ]:
- if i not in ev1 and i not in ev2 and i not in ev3:
- raise Exception('name "%s" not found' % i)
- def rx_pd_req(dev):
- msg = dev.mgmt_rx()
- if msg is None:
- raise Exception("MGMT-RX timeout")
- p2p = parse_p2p_public_action(msg['payload'])
- if p2p is None:
- raise Exception("Not a P2P Public Action frame " + str(dialog_token))
- if p2p['subtype'] != P2P_PROV_DISC_REQ:
- raise Exception("Unexpected subtype %d" % p2p['subtype'])
- p2p['freq'] = msg['freq']
- return p2p
- @remote_compatible
- def test_autogo_scan(dev):
- """P2P autonomous GO and no P2P IE in Probe Response scan results"""
- addr0 = dev[0].p2p_dev_addr()
- addr1 = dev[1].p2p_dev_addr()
- dev[0].p2p_start_go(freq=2412, persistent=True)
- bssid = dev[0].p2p_interface_addr()
- dev[1].discover_peer(addr0)
- dev[1].p2p_stop_find()
- ev = dev[1].wait_global_event(["P2P-FIND-STOPPED"], timeout=2)
- time.sleep(0.1)
- dev[1].flush_scan_cache()
- pin = dev[1].wps_read_pin()
- dev[0].group_request("WPS_PIN any " + pin)
- try:
- dev[1].request("SET p2p_disabled 1")
- dev[1].request("SCAN freq=2412")
- ev = dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
- if ev is None:
- raise Exception("Active scan did not complete")
- finally:
- dev[1].request("SET p2p_disabled 0")
- for i in range(2):
- dev[1].request("SCAN freq=2412 passive=1")
- ev = dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
- if ev is None:
- raise Exception("Scan did not complete")
- # Disable management frame processing for a moment to skip Probe Response
- # frame with P2P IE.
- dev[0].group_request("SET ext_mgmt_frame_handling 1")
- dev[1].global_request("P2P_CONNECT " + bssid + " " + pin + " freq=2412 join")
- # Skip the first Probe Request frame
- ev = dev[0].wait_group_event(["MGMT-RX"], timeout=10)
- if ev is None:
- raise Exception("No Probe Request frame seen")
- if not ev.split(' ')[4].startswith("40"):
- raise Exception("Not a Probe Request frame")
- # If a P2P Device is not used, the PD Request will be received on the group
- # interface (which is actually wlan0, since a separate interface is not
- # used), which was set to external management frame handling, so need to
- # reply to it manually.
- res = dev[0].get_driver_status()
- if not (int(res['capa.flags'], 0) & 0x20000000):
- # Reply to PD Request while still filtering Probe Request frames
- msg = rx_pd_req(dev[0])
- mgmt_tx(dev[0], "MGMT_TX {} {} freq={} wait_time=10 no_cck=1 action={}".format(addr1, addr0, 2412, "0409506f9a0908%02xdd0a0050f204100800020008" % msg['dialog_token']))
- # Skip Probe Request frames until something else is received
- for i in range(10):
- ev = dev[0].wait_group_event(["MGMT-RX"], timeout=10)
- if ev is None:
- raise Exception("No frame seen")
- if not ev.split(' ')[4].startswith("40"):
- break
- # Allow wpa_supplicant to process authentication and association
- dev[0].group_request("SET ext_mgmt_frame_handling 0")
- # Joining the group should succeed and indicate persistent group based on
- # Beacon frame P2P IE.
- ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
- if ev is None:
- raise Exception("Failed to join group")
- if "[PERSISTENT]" not in ev:
- raise Exception("Did not recognize group as persistent")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- @remote_compatible
- def test_autogo_join_before_found(dev):
- """P2P client joining a group before having found GO Device Address"""
- dev[0].global_request("SET p2p_no_group_iface 0")
- res = autogo(dev[0], freq=2412)
- if "p2p-wlan" not in res['ifname']:
- raise Exception("Unexpected group interface name on GO")
- status = dev[0].get_group_status()
- bssid = status['bssid']
- pin = dev[1].wps_read_pin()
- dev[0].p2p_go_authorize_client(pin)
- cmd = "P2P_CONNECT " + bssid + " " + pin + " join freq=2412"
- if "OK" not in dev[1].global_request(cmd):
- raise Exception("P2P_CONNECT join failed")
- ev = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
- if ev is None:
- raise Exception("Joining the group timed out")
- dev[0].remove_group()
- dev[1].wait_go_ending_session()
- def test_autogo_noa(dev):
- """P2P autonomous GO and NoA"""
- res = autogo(dev[0])
- dev[0].group_request("P2P_SET noa 1,5,20")
- dev[0].group_request("P2P_SET noa 255,10,50")
- # Connect and disconnect legacy STA to check NoA special cases
- try:
- dev[1].request("SET p2p_disabled 1")
- dev[1].connect(ssid=res['ssid'], psk=res['passphrase'], proto='RSN',
- key_mgmt='WPA-PSK', pairwise='CCMP', group='CCMP',
- scan_freq=res['freq'])
- dev[0].group_request("P2P_SET noa 255,15,55")
- dev[1].request("DISCONNECT")
- dev[1].wait_disconnected()
- finally:
- dev[1].request("SET p2p_disabled 0")
- dev[0].group_request("P2P_SET noa 0,0,0")
- def test_autogo_interworking(dev):
- """P2P autonomous GO and Interworking"""
- try:
- run_autogo_interworking(dev)
- finally:
- dev[0].set("go_interworking", "0")
- def run_autogo_interworking(dev):
- dev[0].set("go_interworking", "1")
- dev[0].set("go_access_network_type", "1")
- dev[0].set("go_internet", "1")
- dev[0].set("go_venue_group", "2")
- dev[0].set("go_venue_type", "3")
- res = autogo(dev[0])
- bssid = dev[0].p2p_interface_addr()
- dev[1].scan_for_bss(bssid, freq=res['freq'])
- bss = dev[1].get_bss(bssid)
- dev[0].remove_group()
- if '6b03110203' not in bss['ie']:
- raise Exception("Interworking element not seen")
|