test_dbus_old.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. # wpa_supplicant D-Bus old interface tests
  2. # Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import gobject
  7. import logging
  8. logger = logging.getLogger()
  9. try:
  10. import dbus
  11. dbus_imported = True
  12. except ImportError:
  13. dbus_imported = False
  14. import hostapd
  15. from utils import HwsimSkip
  16. from test_dbus import TestDbus, start_ap
  17. WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant"
  18. WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant"
  19. WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface"
  20. WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID"
  21. WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network"
  22. def prepare_dbus(dev):
  23. if not dbus_imported:
  24. raise HwsimSkip("No dbus module available")
  25. try:
  26. from dbus.mainloop.glib import DBusGMainLoop
  27. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  28. bus = dbus.SystemBus()
  29. wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH)
  30. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  31. path = wpas.getInterface(dev.ifname)
  32. if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  33. return (bus,wpas_obj,path,if_obj)
  34. except Exception, e:
  35. raise HwsimSkip("Could not connect to D-Bus: %s" % e)
  36. class TestDbusOldWps(TestDbus):
  37. def __init__(self, bus):
  38. TestDbus.__init__(self, bus)
  39. self.event_ok = False
  40. def __enter__(self):
  41. gobject.timeout_add(1, self.run_wps)
  42. gobject.timeout_add(15000, self.timeout)
  43. self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred")
  44. self.loop.run()
  45. return self
  46. def wpsCred(self, cred):
  47. logger.debug("wpsCred: " + str(cred))
  48. self.event_ok = True
  49. self.loop.quit()
  50. def success(self):
  51. return self.event_ok
  52. def test_dbus_old(dev, apdev):
  53. """The old D-Bus interface"""
  54. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  55. res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE)
  56. logger.debug("capabilities(): " + str(res))
  57. if 'auth_alg' not in res or "OPEN" not in res['auth_alg']:
  58. raise Exception("Unexpected capabilities")
  59. res2 = if_obj.capabilities(dbus.Boolean(True),
  60. dbus_interface=WPAS_DBUS_OLD_IFACE)
  61. logger.debug("capabilities(strict): " + str(res2))
  62. res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE)
  63. logger.debug("State: " + res)
  64. res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE)
  65. if res != 0:
  66. raise Exception("Unexpected scanning: " + str(res))
  67. if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE)
  68. for t in [ dbus.UInt32(123), "foo" ]:
  69. try:
  70. if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  71. raise Exception("Invalid setAPScan() accepted")
  72. except dbus.exceptions.DBusException, e:
  73. if "InvalidOptions" not in str(e):
  74. raise Exception("Unexpected error message for invalid setAPScan: " + str(e))
  75. for p in [ path + "/Networks/12345",
  76. path + "/Networks/foo" ]:
  77. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  78. try:
  79. obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  80. raise Exception("Invalid disable() accepted")
  81. except dbus.exceptions.DBusException, e:
  82. if "InvalidNetwork" not in str(e):
  83. raise Exception("Unexpected error message for invalid disable: " + str(e))
  84. for p in [ path + "/BSSIDs/foo",
  85. path + "/BSSIDs/001122334455"]:
  86. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  87. try:
  88. obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID)
  89. raise Exception("Invalid properties() accepted")
  90. except dbus.exceptions.DBusException, e:
  91. if "InvalidBSSID" not in str(e):
  92. raise Exception("Unexpected error message for invalid properties: " + str(e))
  93. def test_dbus_old_scan(dev, apdev):
  94. """The old D-Bus interface - scanning"""
  95. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  96. hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
  97. params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap")
  98. params['wpa'] = '3'
  99. hapd2 = hostapd.add_ap(apdev[1]['ifname'], params)
  100. class TestDbusScan(TestDbus):
  101. def __init__(self, bus):
  102. TestDbus.__init__(self, bus)
  103. self.scan_completed = False
  104. def __enter__(self):
  105. gobject.timeout_add(1, self.run_scan)
  106. gobject.timeout_add(7000, self.timeout)
  107. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  108. "ScanResultsAvailable")
  109. self.loop.run()
  110. return self
  111. def scanDone(self):
  112. logger.debug("scanDone")
  113. self.scan_completed = True
  114. self.loop.quit()
  115. def run_scan(self, *args):
  116. logger.debug("run_scan")
  117. if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE):
  118. raise Exception("Failed to trigger scan")
  119. return False
  120. def success(self):
  121. return self.scan_completed
  122. with TestDbusScan(bus) as t:
  123. if not t.success():
  124. raise Exception("Expected signals not seen")
  125. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  126. if len(res) != 2:
  127. raise Exception("Unexpected number of scan results: " + str(res))
  128. for i in range(2):
  129. logger.debug("Scan result BSS path: " + res[i])
  130. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  131. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  132. byte_arrays=True)
  133. logger.debug("BSS: " + str(bss))
  134. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0])
  135. try:
  136. bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID)
  137. raise Exception("Unknown BSSID method accepted")
  138. except Exception, e:
  139. logger.debug("Unknown BSSID method exception: " + str(e))
  140. if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE):
  141. raise Exception("Failed to issue flush(0)")
  142. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  143. if len(res) != 0:
  144. raise Exception("Unexpected BSS entry after flush")
  145. if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE):
  146. raise Exception("Failed to issue flush(1)")
  147. try:
  148. if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  149. raise Exception("Invalid flush arguments accepted")
  150. except dbus.exceptions.DBusException, e:
  151. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  152. raise Exception("Unexpected error message for invalid flush: " + str(e))
  153. try:
  154. bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  155. byte_arrays=True)
  156. except dbus.exceptions.DBusException, e:
  157. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"):
  158. raise Exception("Unexpected error message for invalid BSS: " + str(e))
  159. def test_dbus_old_debug(dev, apdev):
  160. """The old D-Bus interface - debug"""
  161. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  162. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  163. try:
  164. wpas.setDebugParams(123)
  165. raise Exception("Invalid setDebugParams accepted")
  166. except dbus.exceptions.DBusException, e:
  167. if "InvalidOptions" not in str(e):
  168. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  169. try:
  170. wpas.setDebugParams(123, True, True)
  171. raise Exception("Invalid setDebugParams accepted")
  172. except dbus.exceptions.DBusException, e:
  173. if "InvalidOptions" not in str(e):
  174. raise Exception("Unexpected error message for invalid setDebugParam: " + str(e))
  175. wpas.setDebugParams(1, True, True)
  176. dev[0].request("LOG_LEVEL MSGDUMP")
  177. def test_dbus_old_smartcard(dev, apdev):
  178. """The old D-Bus interface - smartcard"""
  179. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  180. params = dbus.Dictionary(signature='sv')
  181. if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE)
  182. params = dbus.Dictionary({ 'opensc_engine_path': "foobar1",
  183. 'pkcs11_engine_path': "foobar2",
  184. 'pkcs11_module_path': "foobar3",
  185. 'foo': 'bar' },
  186. signature='sv')
  187. params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2",
  188. 'foo': 'bar' },
  189. signature='sv')
  190. params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3",
  191. 'foo2': 'bar' },
  192. signature='sv')
  193. params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4",
  194. 'foo3': 'bar' },
  195. signature='sv')
  196. tests = [ 1, params, params2, params3, params4 ]
  197. for t in tests:
  198. try:
  199. if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  200. raise Exception("Invalid setSmartcardModules accepted: " + str(t))
  201. except dbus.exceptions.DBusException, e:
  202. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  203. raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e)))
  204. def test_dbus_old_interface(dev, apdev):
  205. """The old D-Bus interface - interface get/add/remove"""
  206. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  207. wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE)
  208. tests = [ (123, "InvalidOptions"),
  209. ("foo", "InvalidInterface") ]
  210. for (ifname,err) in tests:
  211. try:
  212. wpas.getInterface(ifname)
  213. raise Exception("Invalid getInterface accepted")
  214. except dbus.exceptions.DBusException, e:
  215. if err not in str(e):
  216. raise Exception("Unexpected error message for invalid getInterface: " + str(e))
  217. params = dbus.Dictionary({ 'driver': 'none' }, signature='sv')
  218. wpas.addInterface("lo", params)
  219. path = wpas.getInterface("lo")
  220. logger.debug("New interface path: " + str(path))
  221. wpas.removeInterface(path)
  222. try:
  223. wpas.removeInterface(path)
  224. raise Exception("Invalid removeInterface() accepted")
  225. except dbus.exceptions.DBusException, e:
  226. if "InvalidInterface" not in str(e):
  227. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  228. params1 = dbus.Dictionary({ 'driver': 'foo',
  229. 'driver-params': 'foo',
  230. 'config-file': 'foo',
  231. 'bridge-ifname': 'foo' },
  232. signature='sv')
  233. params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv')
  234. tests = [ (123, None, "InvalidOptions"),
  235. ("", None, "InvalidOptions"),
  236. ("foo", None, "AddError"),
  237. ("foo", params1, "AddError"),
  238. ("foo", params2, "InvalidOptions"),
  239. ("foo", 1234, "InvalidOptions"),
  240. (dev[0].ifname, None, "ExistsError" ) ]
  241. for (ifname,params,err) in tests:
  242. try:
  243. if params is None:
  244. wpas.addInterface(ifname)
  245. else:
  246. wpas.addInterface(ifname, params)
  247. raise Exception("Invalid addInterface accepted: " + str(params))
  248. except dbus.exceptions.DBusException, e:
  249. if err not in str(e):
  250. raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e)))
  251. try:
  252. wpas.removeInterface(123)
  253. raise Exception("Invalid removeInterface accepted")
  254. except dbus.exceptions.DBusException, e:
  255. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  256. raise Exception("Unexpected error message for invalid removeInterface: " + str(e))
  257. def test_dbus_old_blob(dev, apdev):
  258. """The old D-Bus interface - blob operations"""
  259. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  260. param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv')
  261. param2 = dbus.Dictionary({ 'blob3': "foo" })
  262. param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) },
  263. signature='sv')
  264. tests = [ (1, "InvalidOptions"),
  265. (param1, "InvalidOptions"),
  266. (param2, "InvalidOptions"),
  267. (param3, "InvalidOptions") ]
  268. for (arg,err) in tests:
  269. try:
  270. if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  271. raise Exception("Invalid setBlobs() accepted: " + str(arg))
  272. except dbus.exceptions.DBusException, e:
  273. logger.debug("setBlobs(%s): %s" % (str(arg), str(e)))
  274. if err not in str(e):
  275. raise Exception("Unexpected error message for invalid setBlobs: " + str(e))
  276. tests = [ (["foo"], "RemoveError: Error removing blob"),
  277. ([""], "RemoveError: Invalid blob name"),
  278. ([1], "InvalidOptions"),
  279. ("foo", "InvalidOptions") ]
  280. for (arg,err) in tests:
  281. try:
  282. if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  283. raise Exception("Invalid removeBlobs() accepted: " + str(arg))
  284. except dbus.exceptions.DBusException, e:
  285. logger.debug("removeBlobs(%s): %s" % (str(arg), str(e)))
  286. if err not in str(e):
  287. raise Exception("Unexpected error message for invalid removeBlobs: " + str(e))
  288. blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]),
  289. 'blob2': dbus.ByteArray([ 1, 2 ]) },
  290. signature='sv')
  291. if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE)
  292. if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE)
  293. def test_dbus_old_connect(dev, apdev):
  294. """The old D-Bus interface - add a network and connect"""
  295. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  296. ssid = "test-wpa2-psk"
  297. passphrase = 'qwertyuiop'
  298. params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
  299. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  300. for p in [ "/no/where/to/be/found",
  301. path + "/Networks/12345",
  302. path + "/Networks/foo",
  303. "/fi/epitest/hostap/WPASupplicant/Interfaces",
  304. "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]:
  305. obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p)
  306. try:
  307. if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE)
  308. raise Exception("Invalid removeNetwork accepted: " + p)
  309. except dbus.exceptions.DBusException, e:
  310. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  311. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  312. try:
  313. if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE)
  314. raise Exception("Invalid removeNetwork accepted")
  315. except dbus.exceptions.DBusException, e:
  316. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  317. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  318. try:
  319. if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE)
  320. raise Exception("Invalid removeNetwork accepted")
  321. except dbus.exceptions.DBusException, e:
  322. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  323. raise Exception("Unexpected error message for invalid removeNetwork: " + str(e))
  324. tests = [ (path, "InvalidNetwork"),
  325. (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"),
  326. "InvalidInterface"),
  327. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"),
  328. "InvalidNetwork"),
  329. (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"),
  330. "InvalidNetwork"),
  331. (1, "InvalidOptions") ]
  332. for t,err in tests:
  333. try:
  334. if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE)
  335. raise Exception("Invalid selectNetwork accepted: " + str(t))
  336. except dbus.exceptions.DBusException, e:
  337. if err not in str(e):
  338. raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e)))
  339. npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  340. if not npath.startswith(WPAS_DBUS_OLD_PATH):
  341. raise Exception("Unexpected addNetwork result: " + path)
  342. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath)
  343. tests = [ 123,
  344. dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ]
  345. for t in tests:
  346. try:
  347. netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  348. raise Exception("Invalid set() accepted: " + str(t))
  349. except dbus.exceptions.DBusException, e:
  350. if "InvalidOptions" not in str(e):
  351. raise Exception("Unexpected error message for invalid set: " + str(e))
  352. params = dbus.Dictionary({ 'ssid': ssid,
  353. 'key_mgmt': 'WPA-PSK',
  354. 'psk': passphrase,
  355. 'identity': dbus.ByteArray([ 1, 2 ]),
  356. 'priority': dbus.Int32(0),
  357. 'scan_freq': dbus.UInt32(2412) },
  358. signature='sv')
  359. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  360. if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE)
  361. class TestDbusConnect(TestDbus):
  362. def __init__(self, bus):
  363. TestDbus.__init__(self, bus)
  364. self.state = 0
  365. def __enter__(self):
  366. gobject.timeout_add(1, self.run_connect)
  367. gobject.timeout_add(15000, self.timeout)
  368. self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE,
  369. "ScanResultsAvailable")
  370. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  371. "StateChange")
  372. self.loop.run()
  373. return self
  374. def scanDone(self):
  375. logger.debug("scanDone")
  376. def stateChange(self, new, old):
  377. logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new))
  378. if new == "COMPLETED":
  379. if self.state == 0:
  380. self.state = 1
  381. self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  382. elif self.state == 2:
  383. self.state = 3
  384. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  385. elif self.state == 4:
  386. self.state = 5
  387. if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE)
  388. elif self.state == 6:
  389. self.state = 7
  390. if_obj.removeNetwork(self.path,
  391. dbus_interface=WPAS_DBUS_OLD_IFACE)
  392. try:
  393. if_obj.removeNetwork(self.path,
  394. dbus_interface=WPAS_DBUS_OLD_IFACE)
  395. raise Exception("Invalid removeNetwork accepted")
  396. except dbus.exceptions.DBusException, e:
  397. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"):
  398. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  399. self.loop.quit()
  400. elif new == "DISCONNECTED":
  401. if self.state == 1:
  402. self.state = 2
  403. self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  404. elif self.state == 3:
  405. self.state = 4
  406. if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  407. elif self.state == 5:
  408. self.state = 6
  409. if_obj.selectNetwork(self.path,
  410. dbus_interface=WPAS_DBUS_OLD_IFACE)
  411. def run_connect(self, *args):
  412. logger.debug("run_connect")
  413. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  414. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  415. netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  416. params = dbus.Dictionary({ 'ssid': ssid,
  417. 'key_mgmt': 'WPA-PSK',
  418. 'psk': passphrase,
  419. 'scan_freq': 2412 },
  420. signature='sv')
  421. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  422. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  423. self.path = path
  424. self.netw_obj = netw_obj
  425. return False
  426. def success(self):
  427. return self.state == 7
  428. with TestDbusConnect(bus) as t:
  429. if not t.success():
  430. raise Exception("Expected signals not seen")
  431. if len(dev[0].list_networks()) != 0:
  432. raise Exception("Unexpected network")
  433. def test_dbus_old_connect_eap(dev, apdev):
  434. """The old D-Bus interface - add an EAP network and connect"""
  435. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  436. ssid = "test-wpa2-eap"
  437. params = hostapd.wpa2_eap_params(ssid=ssid)
  438. hapd = hostapd.add_ap(apdev[0]['ifname'], params)
  439. class TestDbusConnect(TestDbus):
  440. def __init__(self, bus):
  441. TestDbus.__init__(self, bus)
  442. self.connected = False
  443. self.certification_received = False
  444. def __enter__(self):
  445. gobject.timeout_add(1, self.run_connect)
  446. gobject.timeout_add(15000, self.timeout)
  447. self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE,
  448. "StateChange")
  449. self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE,
  450. "Certification")
  451. self.loop.run()
  452. return self
  453. def stateChange(self, new, old):
  454. logger.debug("stateChange: %s --> %s" % (old, new))
  455. if new == "COMPLETED":
  456. self.connected = True
  457. self.loop.quit()
  458. def certification(self, depth, subject, hash, cert_hex):
  459. logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex))
  460. self.certification_received = True
  461. def run_connect(self, *args):
  462. logger.debug("run_connect")
  463. path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE)
  464. netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path)
  465. params = dbus.Dictionary({ 'ssid': ssid,
  466. 'key_mgmt': 'WPA-EAP',
  467. 'eap': 'TTLS',
  468. 'anonymous_identity': 'ttls',
  469. 'identity': 'pap user',
  470. 'ca_cert': 'auth_serv/ca.pem',
  471. 'phase2': 'auth=PAP',
  472. 'password': 'password',
  473. 'scan_freq': 2412 },
  474. signature='sv')
  475. netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK)
  476. netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK)
  477. self.path = path
  478. self.netw_obj = netw_obj
  479. return False
  480. def success(self):
  481. return self.connected and self.certification_received
  482. with TestDbusConnect(bus) as t:
  483. if not t.success():
  484. raise Exception("Expected signals not seen")
  485. def test_dbus_old_wps_pbc(dev, apdev):
  486. """The old D-Bus interface and WPS/PBC"""
  487. try:
  488. return _test_dbus_old_wps_pbc(dev, apdev)
  489. finally:
  490. dev[0].request("SET wps_cred_processing 0")
  491. def _test_dbus_old_wps_pbc(dev, apdev):
  492. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  493. hapd = start_ap(apdev[0])
  494. hapd.request("WPS_PBC")
  495. bssid = apdev[0]['bssid']
  496. dev[0].scan_for_bss(bssid, freq="2412")
  497. dev[0].request("SET wps_cred_processing 2")
  498. for arg in [ 123, "123" ]:
  499. try:
  500. if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE)
  501. raise Exception("Invalid wpsPbc arguments accepted: " + str(arg))
  502. except dbus.exceptions.DBusException, e:
  503. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  504. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  505. class TestDbusWps(TestDbusOldWps):
  506. def __init__(self, bus, pbc_param):
  507. TestDbusOldWps.__init__(self, bus)
  508. self.pbc_param = pbc_param
  509. def run_wps(self, *args):
  510. logger.debug("run_wps: pbc_param=" + self.pbc_param)
  511. if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE)
  512. return False
  513. with TestDbusWps(bus, "any") as t:
  514. if not t.success():
  515. raise Exception("Expected signals not seen")
  516. res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE)
  517. if len(res) != 1:
  518. raise Exception("Unexpected number of scan results: " + str(res))
  519. for i in range(1):
  520. logger.debug("Scan result BSS path: " + res[i])
  521. bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i])
  522. bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID,
  523. byte_arrays=True)
  524. logger.debug("BSS: " + str(bss))
  525. dev[0].wait_connected(timeout=10)
  526. dev[0].request("DISCONNECT")
  527. dev[0].wait_disconnected(timeout=10)
  528. dev[0].request("FLUSH")
  529. hapd.request("WPS_PBC")
  530. dev[0].scan_for_bss(bssid, freq="2412")
  531. with TestDbusWps(bus, bssid) as t:
  532. if not t.success():
  533. raise Exception("Expected signals not seen")
  534. dev[0].wait_connected(timeout=10)
  535. dev[0].request("DISCONNECT")
  536. dev[0].wait_disconnected(timeout=10)
  537. hapd.disable()
  538. dev[0].flush_scan_cache()
  539. def test_dbus_old_wps_pin(dev, apdev):
  540. """The old D-Bus interface and WPS/PIN"""
  541. try:
  542. return _test_dbus_old_wps_pin(dev, apdev)
  543. finally:
  544. dev[0].request("SET wps_cred_processing 0")
  545. def _test_dbus_old_wps_pin(dev, apdev):
  546. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  547. hapd = start_ap(apdev[0])
  548. hapd.request("WPS_PIN any 12345670")
  549. bssid = apdev[0]['bssid']
  550. dev[0].scan_for_bss(bssid, freq="2412")
  551. dev[0].request("SET wps_cred_processing 2")
  552. for arg in [ (123, "12345670"),
  553. ("123", "12345670") ]:
  554. try:
  555. if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  556. raise Exception("Invalid wpsPin arguments accepted: " + str(arg))
  557. except dbus.exceptions.DBusException, e:
  558. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  559. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  560. class TestDbusWps(TestDbusOldWps):
  561. def __init__(self, bus, bssid, pin):
  562. TestDbusOldWps.__init__(self, bus)
  563. self.bssid = bssid
  564. self.pin = pin
  565. def run_wps(self, *args):
  566. logger.debug("run_wps %s %s" % (self.bssid, self.pin))
  567. pin = if_obj.wpsPin(self.bssid, self.pin,
  568. dbus_interface=WPAS_DBUS_OLD_IFACE)
  569. if len(self.pin) == 0:
  570. h = hostapd.Hostapd(apdev[0]['ifname'])
  571. h.request("WPS_PIN any " + pin)
  572. return False
  573. with TestDbusWps(bus, bssid, "12345670") as t:
  574. if not t.success():
  575. raise Exception("Expected signals not seen")
  576. dev[0].wait_connected(timeout=10)
  577. dev[0].request("DISCONNECT")
  578. dev[0].wait_disconnected(timeout=10)
  579. dev[0].request("FLUSH")
  580. dev[0].scan_for_bss(bssid, freq="2412")
  581. with TestDbusWps(bus, "any", "") as t:
  582. if not t.success():
  583. raise Exception("Expected signals not seen")
  584. def test_dbus_old_wps_reg(dev, apdev):
  585. """The old D-Bus interface and WPS/Registar"""
  586. try:
  587. return _test_dbus_old_wps_reg(dev, apdev)
  588. finally:
  589. dev[0].request("SET wps_cred_processing 0")
  590. def _test_dbus_old_wps_reg(dev, apdev):
  591. (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
  592. hapd = start_ap(apdev[0])
  593. bssid = apdev[0]['bssid']
  594. dev[0].scan_for_bss(bssid, freq="2412")
  595. dev[0].request("SET wps_cred_processing 2")
  596. for arg in [ (123, "12345670"),
  597. ("123", "12345670") ]:
  598. try:
  599. if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE)
  600. raise Exception("Invalid wpsReg arguments accepted: " + str(arg))
  601. except dbus.exceptions.DBusException, e:
  602. if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"):
  603. raise Exception("Unexpected error message for invalid wpsPbc: " + str(e))
  604. class TestDbusWps(TestDbusOldWps):
  605. def run_wps(self, *args):
  606. logger.debug("run_wps")
  607. if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE)
  608. return False
  609. with TestDbusWps(bus) as t:
  610. if not t.success():
  611. raise Exception("Expected signals not seen")
  612. dev[0].wait_connected(timeout=10)