test_wnm.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # WNM tests
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import binascii
  7. import struct
  8. import time
  9. import logging
  10. logger = logging.getLogger()
  11. import hostapd
  12. from wlantest import Wlantest
  13. def test_wnm_bss_transition_mgmt(dev, apdev):
  14. """WNM BSS Transition Management"""
  15. params = { "ssid": "test-wnm",
  16. "time_advertisement": "2",
  17. "time_zone": "EST5",
  18. "wnm_sleep_mode": "1",
  19. "bss_transition": "1" }
  20. hostapd.add_ap(apdev[0]['ifname'], params)
  21. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  22. dev[0].request("WNM_BSS_QUERY 0")
  23. def test_wnm_disassoc_imminent(dev, apdev):
  24. """WNM Disassociation Imminent"""
  25. params = { "ssid": "test-wnm",
  26. "time_advertisement": "2",
  27. "time_zone": "EST5",
  28. "wnm_sleep_mode": "1",
  29. "bss_transition": "1" }
  30. hostapd.add_ap(apdev[0]['ifname'], params)
  31. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  32. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  33. addr = dev[0].p2p_interface_addr()
  34. hapd.request("DISASSOC_IMMINENT " + addr + " 10")
  35. ev = dev[0].wait_event(["WNM: Disassociation Imminent"])
  36. if ev is None:
  37. raise Exception("Timeout while waiting for disassociation imminent")
  38. if "Disassociation Timer 10" not in ev:
  39. raise Exception("Unexpected disassociation imminent contents")
  40. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  41. if ev is None:
  42. raise Exception("Timeout while waiting for re-connection scan")
  43. def test_wnm_ess_disassoc_imminent(dev, apdev):
  44. """WNM ESS Disassociation Imminent"""
  45. params = { "ssid": "test-wnm",
  46. "time_advertisement": "2",
  47. "time_zone": "EST5",
  48. "wnm_sleep_mode": "1",
  49. "bss_transition": "1" }
  50. hostapd.add_ap(apdev[0]['ifname'], params)
  51. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  52. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  53. addr = dev[0].p2p_interface_addr()
  54. hapd.request("ESS_DISASSOC " + addr + " 10 http://example.com/session-info")
  55. ev = dev[0].wait_event(["ESS-DISASSOC-IMMINENT"])
  56. if ev is None:
  57. raise Exception("Timeout while waiting for ESS disassociation imminent")
  58. if "0 1024 http://example.com/session-info" not in ev:
  59. raise Exception("Unexpected ESS disassociation imminent message contents")
  60. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  61. if ev is None:
  62. raise Exception("Timeout while waiting for re-connection scan")
  63. def test_wnm_ess_disassoc_imminent_pmf(dev, apdev):
  64. """WNM ESS Disassociation Imminent"""
  65. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  66. params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
  67. params["ieee80211w"] = "2";
  68. params["bss_transition"] = "1"
  69. hostapd.add_ap(apdev[0]['ifname'], params)
  70. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  71. dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
  72. key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
  73. addr = dev[0].p2p_interface_addr()
  74. hapd.request("ESS_DISASSOC " + addr + " 10 http://example.com/session-info")
  75. ev = dev[0].wait_event(["ESS-DISASSOC-IMMINENT"])
  76. if ev is None:
  77. raise Exception("Timeout while waiting for ESS disassociation imminent")
  78. if "1 1024 http://example.com/session-info" not in ev:
  79. raise Exception("Unexpected ESS disassociation imminent message contents")
  80. ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"])
  81. if ev is None:
  82. raise Exception("Timeout while waiting for re-connection scan")
  83. def check_wnm_sleep_mode_enter_exit(hapd, dev, interval=None, tfs_req=None):
  84. addr = dev.p2p_interface_addr()
  85. sta = hapd.get_sta(addr)
  86. if "[WNM_SLEEP_MODE]" in sta['flags']:
  87. raise Exception("Station unexpectedly in WNM-Sleep Mode")
  88. logger.info("Going to WNM Sleep Mode")
  89. extra = ""
  90. if interval is not None:
  91. extra += " interval=" + str(interval)
  92. if tfs_req:
  93. extra += " tfs_req=" + tfs_req
  94. if "OK" not in dev.request("WNM_SLEEP enter" + extra):
  95. raise Exception("WNM_SLEEP failed")
  96. time.sleep(0.5)
  97. sta = hapd.get_sta(addr)
  98. if "[WNM_SLEEP_MODE]" not in sta['flags']:
  99. raise Exception("Station failed to enter WNM-Sleep Mode")
  100. logger.info("Waking up from WNM Sleep Mode")
  101. dev.request("WNM_SLEEP exit")
  102. time.sleep(0.5)
  103. sta = hapd.get_sta(addr)
  104. if "[WNM_SLEEP_MODE]" in sta['flags']:
  105. raise Exception("Station failed to exit WNM-Sleep Mode")
  106. def test_wnm_sleep_mode_open(dev, apdev):
  107. """WNM Sleep Mode - open"""
  108. params = { "ssid": "test-wnm",
  109. "time_advertisement": "2",
  110. "time_zone": "EST5",
  111. "wnm_sleep_mode": "1",
  112. "bss_transition": "1" }
  113. hostapd.add_ap(apdev[0]['ifname'], params)
  114. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  115. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  116. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  117. check_wnm_sleep_mode_enter_exit(hapd, dev[0], interval=100)
  118. check_wnm_sleep_mode_enter_exit(hapd, dev[0], tfs_req="5b17010001130e110000071122334455661122334455661234")
  119. def test_wnm_sleep_mode_rsn(dev, apdev):
  120. """WNM Sleep Mode - RSN"""
  121. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  122. params["time_advertisement"] = "2"
  123. params["time_zone"] = "EST5"
  124. params["wnm_sleep_mode"] = "1"
  125. params["bss_transition"] = "1"
  126. hostapd.add_ap(apdev[0]['ifname'], params)
  127. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  128. dev[0].connect("test-wnm-rsn", psk="12345678", scan_freq="2412")
  129. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  130. def test_wnm_sleep_mode_rsn_pmf(dev, apdev):
  131. """WNM Sleep Mode - RSN with PMF"""
  132. wt = Wlantest()
  133. wt.flush()
  134. wt.add_passphrase("12345678")
  135. params = hostapd.wpa2_params("test-wnm-rsn", "12345678")
  136. params["wpa_key_mgmt"] = "WPA-PSK-SHA256";
  137. params["ieee80211w"] = "2";
  138. params["time_advertisement"] = "2"
  139. params["time_zone"] = "EST5"
  140. params["wnm_sleep_mode"] = "1"
  141. params["bss_transition"] = "1"
  142. hostapd.add_ap(apdev[0]['ifname'], params)
  143. hapd = hostapd.Hostapd(apdev[0]['ifname'])
  144. dev[0].connect("test-wnm-rsn", psk="12345678", ieee80211w="2",
  145. key_mgmt="WPA-PSK-SHA256", proto="WPA2", scan_freq="2412")
  146. check_wnm_sleep_mode_enter_exit(hapd, dev[0])
  147. MGMT_SUBTYPE_ACTION = 13
  148. ACTION_CATEG_WNM = 10
  149. WNM_ACT_BSS_TM_REQ = 7
  150. WNM_ACT_BSS_TM_RESP = 8
  151. def bss_tm_req(dst, src, dialog_token=1, req_mode=0, disassoc_timer=0,
  152. validity_interval=1):
  153. msg = {}
  154. msg['fc'] = MGMT_SUBTYPE_ACTION << 4
  155. msg['da'] = dst
  156. msg['sa'] = src
  157. msg['bssid'] = src
  158. msg['payload'] = struct.pack("<BBBBHB",
  159. ACTION_CATEG_WNM, WNM_ACT_BSS_TM_REQ,
  160. dialog_token, req_mode, disassoc_timer,
  161. validity_interval)
  162. return msg
  163. def rx_bss_tm_resp(hapd, expect_dialog=None, expect_status=None):
  164. for i in range(0, 100):
  165. resp = hapd.mgmt_rx()
  166. if resp is None:
  167. raise Exception("No BSS TM Response received")
  168. if resp['subtype'] == MGMT_SUBTYPE_ACTION:
  169. break
  170. if i == 99:
  171. raise Exception("Not an Action frame")
  172. payload = resp['payload']
  173. if len(payload) < 2 + 3:
  174. raise Exception("Too short payload")
  175. (category, action) = struct.unpack('BB', payload[0:2])
  176. if category != ACTION_CATEG_WNM or action != WNM_ACT_BSS_TM_RESP:
  177. raise Exception("Not a BSS TM Response")
  178. pos = payload[2:]
  179. (dialog, status, bss_term_delay) = struct.unpack('BBB', pos[0:3])
  180. resp['dialog'] = dialog
  181. resp['status'] = status
  182. resp['bss_term_delay'] = bss_term_delay
  183. pos = pos[3:]
  184. if len(pos) >= 6 and status == 0:
  185. resp['target_bssid'] = binascii.hexlify(pos[0:6])
  186. pos = pos[6:]
  187. resp['candidates'] = pos
  188. if expect_dialog is not None and dialog != expect_dialog:
  189. raise Exception("Unexpected dialog token")
  190. if expect_status is not None and status != expect_status:
  191. raise Exception("Unexpected status code %d" % status)
  192. return resp
  193. def except_ack(hapd):
  194. ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
  195. if ev is None:
  196. raise Exception("Missing TX status")
  197. if "ok=1" not in ev:
  198. raise Exception("Action frame not acknowledged")
  199. def test_wnm_bss_tm_req(dev, apdev):
  200. """BSS Transition Management Request"""
  201. params = { "ssid": "test-wnm", "bss_transition": "1" }
  202. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  203. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  204. hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
  205. hapd.set("ext_mgmt_frame_handling", "1")
  206. # truncated BSS TM Request
  207. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  208. req_mode=0x08)
  209. req['payload'] = struct.pack("<BBBBH",
  210. ACTION_CATEG_WNM, WNM_ACT_BSS_TM_REQ,
  211. 1, 0, 0)
  212. hapd.mgmt_tx(req)
  213. except_ack(hapd)
  214. # no disassociation and no candidate list
  215. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  216. dialog_token=2)
  217. hapd.mgmt_tx(req)
  218. resp = rx_bss_tm_resp(hapd, expect_dialog=2, expect_status=1)
  219. # truncated BSS Termination Duration
  220. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  221. req_mode=0x08)
  222. hapd.mgmt_tx(req)
  223. except_ack(hapd)
  224. # BSS Termination Duration with TSF=0 and Duration=10
  225. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  226. req_mode=0x08, dialog_token=3)
  227. req['payload'] += struct.pack("<BBQH", 4, 10, 0, 10)
  228. hapd.mgmt_tx(req)
  229. resp = rx_bss_tm_resp(hapd, expect_dialog=3, expect_status=1)
  230. # truncated Session Information URL
  231. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  232. req_mode=0x10)
  233. hapd.mgmt_tx(req)
  234. except_ack(hapd)
  235. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  236. req_mode=0x10)
  237. req['payload'] += struct.pack("<BBB", 3, 65, 66)
  238. hapd.mgmt_tx(req)
  239. except_ack(hapd)
  240. # Session Information URL
  241. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  242. req_mode=0x10, dialog_token=4)
  243. req['payload'] += struct.pack("<BBB", 2, 65, 66)
  244. hapd.mgmt_tx(req)
  245. resp = rx_bss_tm_resp(hapd, expect_dialog=4, expect_status=0)
  246. # Preferred Candidate List without any entries
  247. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  248. req_mode=0x01, dialog_token=5)
  249. hapd.mgmt_tx(req)
  250. resp = rx_bss_tm_resp(hapd, expect_dialog=5, expect_status=1)
  251. # Preferred Candidate List with a truncated entry
  252. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  253. req_mode=0x01)
  254. req['payload'] += struct.pack("<BB", 52, 1)
  255. hapd.mgmt_tx(req)
  256. except_ack(hapd)
  257. # Preferred Candidate List with a too short entry
  258. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  259. req_mode=0x01, dialog_token=6)
  260. req['payload'] += struct.pack("<BB", 52, 0)
  261. hapd.mgmt_tx(req)
  262. resp = rx_bss_tm_resp(hapd, expect_dialog=6, expect_status=1)
  263. # Preferred Candidate List with a non-matching entry
  264. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  265. req_mode=0x01, dialog_token=6)
  266. req['payload'] += struct.pack("<BB6BLBBB", 52, 13,
  267. 1, 2, 3, 4, 5, 6,
  268. 0, 81, 1, 7)
  269. hapd.mgmt_tx(req)
  270. resp = rx_bss_tm_resp(hapd, expect_dialog=6, expect_status=1)
  271. # Preferred Candidate List with a truncated subelement
  272. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  273. req_mode=0x01, dialog_token=7)
  274. req['payload'] += struct.pack("<BB6BLBBBBB", 52, 13 + 2,
  275. 1, 2, 3, 4, 5, 6,
  276. 0, 81, 1, 7,
  277. 1, 1)
  278. hapd.mgmt_tx(req)
  279. resp = rx_bss_tm_resp(hapd, expect_dialog=7, expect_status=1)
  280. # Preferred Candidate List with lots of invalid optional subelements
  281. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  282. req_mode=0x01, dialog_token=8)
  283. subelems = struct.pack("<BBHB", 1, 3, 0, 100)
  284. subelems += struct.pack("<BBB", 2, 1, 65)
  285. subelems += struct.pack("<BB", 3, 0)
  286. subelems += struct.pack("<BBQB", 4, 9, 0, 10)
  287. subelems += struct.pack("<BBHLB", 5, 7, 0, 0, 0)
  288. subelems += struct.pack("<BB", 66, 0)
  289. subelems += struct.pack("<BBBBBB", 70, 4, 0, 0, 0, 0)
  290. subelems += struct.pack("<BB", 71, 0)
  291. req['payload'] += struct.pack("<BB6BLBBB", 52, 13 + len(subelems),
  292. 1, 2, 3, 4, 5, 6,
  293. 0, 81, 1, 7) + subelems
  294. hapd.mgmt_tx(req)
  295. resp = rx_bss_tm_resp(hapd, expect_dialog=8, expect_status=1)
  296. # Preferred Candidate List with lots of valid optional subelements (twice)
  297. req = bss_tm_req(dev[0].p2p_interface_addr(), apdev[0]['bssid'],
  298. req_mode=0x01, dialog_token=8)
  299. # TSF Information
  300. subelems = struct.pack("<BBHH", 1, 4, 0, 100)
  301. # Condensed Country String
  302. subelems += struct.pack("<BBBB", 2, 2, 65, 66)
  303. # BSS Transition Candidate Preference
  304. subelems += struct.pack("<BBB", 3, 1, 100)
  305. # BSS Termination Duration
  306. subelems += struct.pack("<BBQH", 4, 10, 0, 10)
  307. # Bearing
  308. subelems += struct.pack("<BBHLH", 5, 8, 0, 0, 0)
  309. # Measurement Pilot Transmission
  310. subelems += struct.pack("<BBBBB", 66, 3, 0, 0, 0)
  311. # RM Enabled Capabilities
  312. subelems += struct.pack("<BBBBBBB", 70, 5, 0, 0, 0, 0, 0)
  313. # Multiple BSSID
  314. subelems += struct.pack("<BBBB", 71, 2, 0, 0)
  315. req['payload'] += struct.pack("<BB6BLBBB", 52, 13 + len(subelems) * 2,
  316. 1, 2, 3, 4, 5, 6,
  317. 0, 81, 1, 7) + subelems + subelems
  318. hapd.mgmt_tx(req)
  319. resp = rx_bss_tm_resp(hapd, expect_dialog=8, expect_status=1)
  320. def test_wnm_bss_keep_alive(dev, apdev):
  321. """WNM keep-alive"""
  322. params = { "ssid": "test-wnm",
  323. "ap_max_inactivity": "1" }
  324. hostapd.add_ap(apdev[0]['ifname'], params)
  325. dev[0].connect("test-wnm", key_mgmt="NONE", scan_freq="2412")
  326. time.sleep(2)