test_dbus_old.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. # wpa_supplicant D-Bus old interface tests
  2. # Copyright (c) 2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import gobject
  7. import logging
  8. logger = logging.getLogger()
  9. import hostapd
  10. from test_dbus import TestDbus, start_ap
  11. WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
  12. WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
  13. WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
  14. WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
  15. WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
  16. def prepare_dbus(dev):
  17. try:
  18. import dbus
  19. from dbus.mainloop.glib import DBusGMainLoop
  20. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  21. bus = dbus.SystemBus()
  22. wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
  23. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  24. path = wpas.getInterface(dev.ifname)
  25. if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  26. return (dbus,bus,wpas_obj,path,if_obj)
  27. except Exception, e:
  28. logger.info("No D-Bus support available: " + str(e))
  29. raise Exception("hwsim-SKIP")
  30. class TestDbusOldWps(TestDbus):
  31. def __init__(self, bus):
  32. TestDbus.__init__(self, bus)
  33. self.event_ok = False
  34. def __enter__(self):
  35. gobject.timeout_add(1, self.run_wps)
  36. gobject.timeout_add(15000, self.timeout)
  37. self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
  38. self.loop.run()
  39. return self
  40. def wpsCred(self, cred):
  41. logger.debug("wpsCred: " + str(cred))
  42. self.event_ok = True
  43. self.loop.quit()
  44. def success(self):
  45. return self.event_ok
  46. def test_dbus_old(dev, apdev):
  47. """The old D-Bus interface"""
  48. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  49. res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
  50. logger.debug("capabilities(): " + str(res))
  51. if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
  52. raise Exception("Unexpected capabilities")
  53. res2 = if_obj.capabilities(dbus.Boolean(True),
  54. dbus_interface=WPAS_DBUS_OLD_IFACE)
  55. logger.debug("capabilities(strict): " + str(res2))
  56. res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
  57. logger.debug("State: " + res)
  58. res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
  59. if res != 0:
  60. raise Exception("Unexpected scanning: " + str(res))
  61. if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
  62. for t in [ dbus.UInt32(123), "foo" ]:
  63. try:
  64. if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  65. raise Exception("Invalid setAPScan() accepted")
  66. except dbus.exceptions.DBusException, e:
  67. if "InvalidOptions" not in str(e):
  68. raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
  69. for p in [ path + "/Networks/12345",
  70. path + "/Networks/foo" ]:
  71. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  72. try:
  73. obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  74. raise Exception("Invalid disable() accepted")
  75. except dbus.exceptions.DBusException, e:
  76. if "InvalidNetwork" not in str(e):
  77. raise Exception("Unexpected error message for invalid disable: " + str(e))
  78. for p in [ path + "/BSSIDs/foo",
  79. path + "/BSSIDs/001122334455"]:
  80. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  81. try:
  82. obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
  83. raise Exception("Invalid properties() accepted")
  84. except dbus.exceptions.DBusException, e:
  85. if "InvalidBSSID" not in str(e):
  86. raise Exception("Unexpected error message for invalid properties: " + str(e))
  87. def test_dbus_old_scan(dev, apdev):
  88. """The old D-Bus interface - scanning"""
  89. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  90. hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
  91. params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
  92. params['wpa'] = '3'
  93. hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
  94. class TestDbusScan(TestDbus):
  95. def __init__(self, bus):
  96. TestDbus.__init__(self, bus)
  97. self.scan_completed = False
  98. def __enter__(self):
  99. gobject.timeout_add(1, self.run_scan)
  100. gobject.timeout_add(7000, self.timeout)
  101. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  102. "ScanResultsAvailable")
  103. self.loop.run()
  104. return self
  105. def scanDone(self):
  106. logger.debug("scanDone")
  107. self.scan_completed = True
  108. self.loop.quit()
  109. def run_scan(self, *args):
  110. logger.debug("run_scan")
  111. if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
  112. raise Exception("Failed to trigger scan")
  113. return False
  114. def success(self):
  115. return self.scan_completed
  116. with TestDbusScan(bus) as t:
  117. if not t.success():
  118. raise Exception("Expected signals not seen")
  119. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  120. if len(res) != 2:
  121. raise Exception("Unexpected number of scan results: " + str(res))
  122. for i in range(2):
  123. logger.debug("Scan result BSS path: " + res[i])
  124. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  125. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  126. byte_arrays=True)
  127. logger.debug("BSS: " + str(bss))
  128. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
  129. try:
  130. bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
  131. raise Exception("Unknown BSSID method accepted")
  132. except Exception, e:
  133. logger.debug("Unknown BSSID method exception: " + str(e))
  134. if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
  135. raise Exception("Failed to issue flush(0)")
  136. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  137. if len(res) != 0:
  138. raise Exception("Unexpected BSS entry after flush")
  139. if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
  140. raise Exception("Failed to issue flush(1)")
  141. try:
  142. if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  143. raise Exception("Invalid flush arguments accepted")
  144. except dbus.exceptions.DBusException, e:
  145. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  146. raise Exception("Unexpected error message for invalid flush: " + str(e))
  147. try:
  148. bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  149. byte_arrays=True)
  150. except dbus.exceptions.DBusException, e:
  151. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
  152. raise Exception("Unexpected error message for invalid BSS: " + str(e))
  153. def test_dbus_old_debug(dev, apdev):
  154. """The old D-Bus interface - debug"""
  155. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  156. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  157. try:
  158. wpas.setDebugParams(123)
  159. raise Exception("Invalid setDebugParams accepted")
  160. except dbus.exceptions.DBusException, e:
  161. if "InvalidOptions" not in str(e):
  162. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  163. try:
  164. wpas.setDebugParams(123, True, True)
  165. raise Exception("Invalid setDebugParams accepted")
  166. except dbus.exceptions.DBusException, e:
  167. if "InvalidOptions" not in str(e):
  168. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  169. wpas.setDebugParams(1, True, True)
  170. dev[0].request("LOG_LEVEL MSGDUMP")
  171. def test_dbus_old_smartcard(dev, apdev):
  172. """The old D-Bus interface - smartcard"""
  173. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  174. params = dbus.Dictionary(signature='sv')
  175. if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
  176. params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
  177. 'pkcs11_engine_path': "foobar2",
  178. 'pkcs11_module_path': "foobar3",
  179. 'foo': 'bar' },
  180. signature='sv')
  181. params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
  182. 'foo': 'bar' },
  183. signature='sv')
  184. params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
  185. 'foo2': 'bar' },
  186. signature='sv')
  187. params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
  188. 'foo3': 'bar' },
  189. signature='sv')
  190. tests = [ 1, params, params2, params3, params4 ]
  191. for t in tests:
  192. try:
  193. if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  194. raise Exception("Invalid setSmartcardModules accepted: " + str(t))
  195. except dbus.exceptions.DBusException, e:
  196. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  197. raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
  198. def test_dbus_old_interface(dev, apdev):
  199. """The old D-Bus interface - interface get/add/remove"""
  200. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  201. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  202. tests = [ (123, "InvalidOptions"),
  203. ("foo", "InvalidInterface") ]
  204. for (ifname,err) in tests:
  205. try:
  206. wpas.getInterface(ifname)
  207. raise Exception("Invalid getInterface accepted")
  208. except dbus.exceptions.DBusException, e:
  209. if err not in str(e):
  210. raise Exception("Unexpected error message for invalid getInterface: " + str(e))
  211. params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
  212. wpas.addInterface("lo", params)
  213. path = wpas.getInterface("lo")
  214. logger.debug("New interface path: " + str(path))
  215. wpas.removeInterface(path)
  216. try:
  217. wpas.removeInterface(path)
  218. raise Exception("Invalid removeInterface() accepted")
  219. except dbus.exceptions.DBusException, e:
  220. if "InvalidInterface" not in str(e):
  221. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  222. params1 = dbus.Dictionary({ 'driver': 'foo',
  223. 'driver-params': 'foo',
  224. 'config-file': 'foo',
  225. 'bridge-ifname': 'foo' },
  226. signature='sv')
  227. params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
  228. tests = [ (123, None, "InvalidOptions"),
  229. ("", None, "InvalidOptions"),
  230. ("foo", None, "AddError"),
  231. ("foo", params1, "AddError"),
  232. ("foo", params2, "InvalidOptions"),
  233. ("foo", 1234, "InvalidOptions"),
  234. (dev[0].ifname, None, "ExistsError" ) ]
  235. for (ifname,params,err) in tests:
  236. try:
  237. if params is None:
  238. wpas.addInterface(ifname)
  239. else:
  240. wpas.addInterface(ifname, params)
  241. raise Exception("Invalid addInterface accepted: " + str(params))
  242. except dbus.exceptions.DBusException, e:
  243. if err not in str(e):
  244. raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
  245. try:
  246. wpas.removeInterface(123)
  247. raise Exception("Invalid removeInterface accepted")
  248. except dbus.exceptions.DBusException, e:
  249. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  250. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  251. def test_dbus_old_blob(dev, apdev):
  252. """The old D-Bus interface - blob operations"""
  253. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  254. param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
  255. param2 = dbus.Dictionary({ 'blob3': "foo" })
  256. param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
  257. signature='sv')
  258. tests = [ (1, "InvalidOptions"),
  259. (param1, "InvalidOptions"),
  260. (param2, "InvalidOptions"),
  261. (param3, "InvalidOptions") ]
  262. for (arg,err) in tests:
  263. try:
  264. if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  265. raise Exception("Invalid setBlobs() accepted: " + str(arg))
  266. except dbus.exceptions.DBusException, e:
  267. logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
  268. if err not in str(e):
  269. raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
  270. tests = [ (["foo"], "RemoveError: Error removing blob"),
  271. ([""], "RemoveError: Invalid blob name"),
  272. ([1], "InvalidOptions"),
  273. ("foo", "InvalidOptions") ]
  274. for (arg,err) in tests:
  275. try:
  276. if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  277. raise Exception("Invalid removeBlobs() accepted: " + str(arg))
  278. except dbus.exceptions.DBusException, e:
  279. logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
  280. if err not in str(e):
  281. raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
  282. blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
  283. 'blob2': dbus.ByteArray([ 1, 2 ]) },
  284. signature='sv')
  285. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  286. if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
  287. def test_dbus_old_connect(dev, apdev):
  288. """The old D-Bus interface - add a network and connect"""
  289. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  290. ssid = "test-wpa2-psk"
  291. passphrase = 'qwertyuiop'
  292. params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
  293. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  294. for p in [ "/no/where/to/be/found",
  295. path + "/Networks/12345",
  296. path + "/Networks/foo",
  297. "/fi/epitest/hostap/WPASupplicant/Interfaces",
  298. "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
  299. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  300. try:
  301. if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
  302. raise Exception("Invalid removeNetwork accepted: " + p)
  303. except dbus.exceptions.DBusException, e:
  304. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  305. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  306. try:
  307. if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  308. raise Exception("Invalid removeNetwork accepted")
  309. except dbus.exceptions.DBusException, e:
  310. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  311. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  312. try:
  313. if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
  314. raise Exception("Invalid removeNetwork accepted")
  315. except dbus.exceptions.DBusException, e:
  316. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  317. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  318. tests = [ (path, "InvalidNetwork"),
  319. (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
  320. "InvalidInterface"),
  321. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
  322. "InvalidNetwork"),
  323. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
  324. "InvalidNetwork"),
  325. (1, "InvalidOptions") ]
  326. for t,err in tests:
  327. try:
  328. if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  329. raise Exception("Invalid selectNetwork accepted: " + str(t))
  330. except dbus.exceptions.DBusException, e:
  331. if err not in str(e):
  332. raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
  333. npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  334. if not npath.startswith(WPAS_DBUS_OLD_PATH):
  335. raise Exception("Unexpected addNetwork result: " + path)
  336. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
  337. tests = [ 123,
  338. dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
  339. for t in tests:
  340. try:
  341. netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  342. raise Exception("Invalid set() accepted: " + str(t))
  343. except dbus.exceptions.DBusException, e:
  344. if "InvalidOptions" not in str(e):
  345. raise Exception("Unexpected error message for invalid set: " + str(e))
  346. params = dbus.Dictionary({ 'ssid': ssid,
  347. 'key_mgmt': 'WPA-PSK',
  348. 'psk': passphrase,
  349. 'identity': dbus.ByteArray([ 1, 2 ]),
  350. 'priority': dbus.Int32(0),
  351. 'scan_freq': dbus.UInt32(2412) },
  352. signature='sv')
  353. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  354. if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
  355. class TestDbusConnect(TestDbus):
  356. def __init__(self, bus):
  357. TestDbus.__init__(self, bus)
  358. self.state = 0
  359. def __enter__(self):
  360. gobject.timeout_add(1, self.run_connect)
  361. gobject.timeout_add(15000, self.timeout)
  362. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  363. "ScanResultsAvailable")
  364. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  365. "StateChange")
  366. self.loop.run()
  367. return self
  368. def scanDone(self):
  369. logger.debug("scanDone")
  370. def stateChange(self, new, old):
  371. logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
  372. if new == "COMPLETED":
  373. if self.state == 0:
  374. self.state = 1
  375. self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  376. elif self.state == 2:
  377. self.state = 3
  378. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  379. elif self.state == 4:
  380. self.state = 5
  381. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  382. elif self.state == 6:
  383. self.state = 7
  384. if_obj.removeNetwork(self.path,
  385. dbus_interface=WPAS_DBUS_OLD_IFACE)
  386. try:
  387. if_obj.removeNetwork(self.path,
  388. dbus_interface=WPAS_DBUS_OLD_IFACE)
  389. raise Exception("Invalid removeNetwork accepted")
  390. except dbus.exceptions.DBusException, e:
  391. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  392. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  393. self.loop.quit()
  394. elif new == "DISCONNECTED":
  395. if self.state == 1:
  396. self.state = 2
  397. self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  398. elif self.state == 3:
  399. self.state = 4
  400. if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  401. elif self.state == 5:
  402. self.state = 6
  403. if_obj.selectNetwork(self.path,
  404. dbus_interface=WPAS_DBUS_OLD_IFACE)
  405. def run_connect(self, *args):
  406. logger.debug("run_connect")
  407. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  408. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  409. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  410. params = dbus.Dictionary({ 'ssid': ssid,
  411. 'key_mgmt': 'WPA-PSK',
  412. 'psk': passphrase,
  413. 'scan_freq': 2412 },
  414. signature='sv')
  415. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  416. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  417. self.path = path
  418. self.netw_obj = netw_obj
  419. return False
  420. def success(self):
  421. return self.state == 7
  422. with TestDbusConnect(bus) as t:
  423. if not t.success():
  424. raise Exception("Expected signals not seen")
  425. if len(dev[0].list_networks()) != 0:
  426. raise Exception("Unexpected network")
  427. def test_dbus_old_connect_eap(dev, apdev):
  428. """The old D-Bus interface - add an EAP network and connect"""
  429. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  430. ssid = "test-wpa2-eap"
  431. params = hostapd.wpa2_eap_params(ssid=ssid)
  432. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  433. class TestDbusConnect(TestDbus):
  434. def __init__(self, bus):
  435. TestDbus.__init__(self, bus)
  436. self.connected = False
  437. self.certification_received = False
  438. def __enter__(self):
  439. gobject.timeout_add(1, self.run_connect)
  440. gobject.timeout_add(15000, self.timeout)
  441. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  442. "StateChange")
  443. self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
  444. "Certification")
  445. self.loop.run()
  446. return self
  447. def stateChange(self, new, old):
  448. logger.debug("stateChange: %s --> %s" % (old, new))
  449. if new == "COMPLETED":
  450. self.connected = True
  451. self.loop.quit()
  452. def certification(self, depth, subject, hash, cert_hex):
  453. logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
  454. self.certification_received = True
  455. def run_connect(self, *args):
  456. logger.debug("run_connect")
  457. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  458. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  459. params = dbus.Dictionary({ 'ssid': ssid,
  460. 'key_mgmt': 'WPA-EAP',
  461. 'eap': 'TTLS',
  462. 'anonymous_identity': 'ttls',
  463. 'identity': 'pap user',
  464. 'ca_cert': 'auth_serv/ca.pem',
  465. 'phase2': 'auth=PAP',
  466. 'password': 'password',
  467. 'scan_freq': 2412 },
  468. signature='sv')
  469. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  470. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  471. self.path = path
  472. self.netw_obj = netw_obj
  473. return False
  474. def success(self):
  475. return self.connected and self.certification_received
  476. with TestDbusConnect(bus) as t:
  477. if not t.success():
  478. raise Exception("Expected signals not seen")
  479. def test_dbus_old_wps_pbc(dev, apdev):
  480. """The old D-Bus interface and WPS/PBC"""
  481. try:
  482. return _test_dbus_old_wps_pbc(dev, apdev)
  483. finally:
  484. dev[0].request("SET wps_cred_processing 0")
  485. def _test_dbus_old_wps_pbc(dev, apdev):
  486. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  487. hapd = start_ap(apdev[0])
  488. hapd.request("WPS_PBC")
  489. bssid = apdev[0]['bssid']
  490. dev[0].scan_for_bss(bssid, freq="2412")
  491. dev[0].request("SET wps_cred_processing 2")
  492. for arg in [ 123, "123" ]:
  493. try:
  494. if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  495. raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
  496. except dbus.exceptions.DBusException, e:
  497. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  498. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  499. class TestDbusWps(TestDbusOldWps):
  500. def __init__(self, bus, pbc_param):
  501. TestDbusOldWps.__init__(self, bus)
  502. self.pbc_param = pbc_param
  503. def run_wps(self, *args):
  504. logger.debug("run_wps: pbc_param=" + self.pbc_param)
  505. if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
  506. return False
  507. with TestDbusWps(bus, "any") as t:
  508. if not t.success():
  509. raise Exception("Expected signals not seen")
  510. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  511. if len(res) != 1:
  512. raise Exception("Unexpected number of scan results: " + str(res))
  513. for i in range(1):
  514. logger.debug("Scan result BSS path: " + res[i])
  515. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  516. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  517. byte_arrays=True)
  518. logger.debug("BSS: " + str(bss))
  519. dev[0].wait_connected(timeout=10)
  520. dev[0].request("DISCONNECT")
  521. dev[0].wait_disconnected(timeout=10)
  522. dev[0].request("FLUSH")
  523. hapd.request("WPS_PBC")
  524. dev[0].scan_for_bss(bssid, freq="2412")
  525. with TestDbusWps(bus, bssid) as t:
  526. if not t.success():
  527. raise Exception("Expected signals not seen")
  528. dev[0].wait_connected(timeout=10)
  529. dev[0].request("DISCONNECT")
  530. dev[0].wait_disconnected(timeout=10)
  531. hapd.disable()
  532. dev[0].flush_scan_cache()
  533. def test_dbus_old_wps_pin(dev, apdev):
  534. """The old D-Bus interface and WPS/PIN"""
  535. try:
  536. return _test_dbus_old_wps_pin(dev, apdev)
  537. finally:
  538. dev[0].request("SET wps_cred_processing 0")
  539. def _test_dbus_old_wps_pin(dev, apdev):
  540. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  541. hapd = start_ap(apdev[0])
  542. hapd.request("WPS_PIN any 12345670")
  543. bssid = apdev[0]['bssid']
  544. dev[0].scan_for_bss(bssid, freq="2412")
  545. dev[0].request("SET wps_cred_processing 2")
  546. for arg in [ (123, "12345670"),
  547. ("123", "12345670") ]:
  548. try:
  549. if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  550. raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
  551. except dbus.exceptions.DBusException, e:
  552. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  553. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  554. class TestDbusWps(TestDbusOldWps):
  555. def __init__(self, bus, bssid, pin):
  556. TestDbusOldWps.__init__(self, bus)
  557. self.bssid = bssid
  558. self.pin = pin
  559. def run_wps(self, *args):
  560. logger.debug("run_wps %s %s" % (self.bssid, self.pin))
  561. pin = if_obj.wpsPin(self.bssid, self.pin,
  562. dbus_interface=WPAS_DBUS_OLD_IFACE)
  563. if len(self.pin) == 0:
  564. h = hostapd.Hostapd(apdev[0]['ifname'])
  565. h.request("WPS_PIN any " + pin)
  566. return False
  567. with TestDbusWps(bus, bssid, "12345670") as t:
  568. if not t.success():
  569. raise Exception("Expected signals not seen")
  570. dev[0].wait_connected(timeout=10)
  571. dev[0].request("DISCONNECT")
  572. dev[0].wait_disconnected(timeout=10)
  573. dev[0].request("FLUSH")
  574. dev[0].scan_for_bss(bssid, freq="2412")
  575. with TestDbusWps(bus, "any", "") as t:
  576. if not t.success():
  577. raise Exception("Expected signals not seen")
  578. def test_dbus_old_wps_reg(dev, apdev):
  579. """The old D-Bus interface and WPS/Registar"""
  580. try:
  581. return _test_dbus_old_wps_reg(dev, apdev)
  582. finally:
  583. dev[0].request("SET wps_cred_processing 0")
  584. def _test_dbus_old_wps_reg(dev, apdev):
  585. (dbus,bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  586. hapd = start_ap(apdev[0])
  587. bssid = apdev[0]['bssid']
  588. dev[0].scan_for_bss(bssid, freq="2412")
  589. dev[0].request("SET wps_cred_processing 2")
  590. for arg in [ (123, "12345670"),
  591. ("123", "12345670") ]:
  592. try:
  593. if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  594. raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
  595. except dbus.exceptions.DBusException, e:
  596. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  597. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  598. class TestDbusWps(TestDbusOldWps):
  599. def run_wps(self, *args):
  600. logger.debug("run_wps")
  601. if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
  602. return False
  603. with TestDbusWps(bus) as t:
  604. if not t.success():
  605. raise Exception("Expected signals not seen")
  606. dev[0].wait_connected(timeout=10)