hwsim_utils.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # hwsim testing utilities
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import os
  7. import time
  8. import logging
  9. logger = logging.getLogger()
  10. from wpasupplicant import WpaSupplicant
  11. def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
  12. ifname1=None, ifname2=None, config=True, timeout=5,
  13. multicast_to_unicast=False, broadcast=True):
  14. addr1 = dev1.own_addr()
  15. if not dev1group and isinstance(dev1, WpaSupplicant):
  16. addr1 = dev1.get_driver_status_field('addr')
  17. addr2 = dev2.own_addr()
  18. if not dev2group and isinstance(dev2, WpaSupplicant):
  19. addr2 = dev2.get_driver_status_field('addr')
  20. dev1.dump_monitor()
  21. dev2.dump_monitor()
  22. if dev1.hostname is None and dev2.hostname is None:
  23. broadcast_retry_c = 1
  24. else:
  25. broadcast_retry_c = 10
  26. try:
  27. if config:
  28. cmd = "DATA_TEST_CONFIG 1"
  29. if ifname1:
  30. cmd = cmd + " ifname=" + ifname1
  31. if dev1group:
  32. res = dev1.group_request(cmd)
  33. else:
  34. res = dev1.request(cmd)
  35. if "OK" not in res:
  36. raise Exception("Failed to enable data test functionality")
  37. cmd = "DATA_TEST_CONFIG 1"
  38. if ifname2:
  39. cmd = cmd + " ifname=" + ifname2
  40. if dev2group:
  41. res = dev2.group_request(cmd)
  42. else:
  43. res = dev2.request(cmd)
  44. if "OK" not in res:
  45. raise Exception("Failed to enable data test functionality")
  46. cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
  47. if dev1group:
  48. dev1.group_request(cmd)
  49. else:
  50. dev1.request(cmd)
  51. if dev2group:
  52. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  53. else:
  54. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
  55. if ev is None:
  56. raise Exception("dev1->dev2 unicast data delivery failed")
  57. if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
  58. raise Exception("Unexpected dev1->dev2 unicast data result")
  59. if broadcast:
  60. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
  61. for i in xrange(broadcast_retry_c):
  62. try:
  63. if dev1group:
  64. dev1.group_request(cmd)
  65. else:
  66. dev1.request(cmd)
  67. if dev2group:
  68. ev = dev2.wait_group_event(["DATA-TEST-RX"],
  69. timeout=timeout)
  70. else:
  71. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
  72. if ev is None:
  73. raise Exception("dev1->dev2 broadcast data delivery failed")
  74. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
  75. raise Exception("Unexpected dev1->dev2 broadcast data result")
  76. break
  77. except Exception as e:
  78. if i == broadcast_retry_c - 1:
  79. raise
  80. cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
  81. if dev2group:
  82. dev2.group_request(cmd)
  83. else:
  84. dev2.request(cmd)
  85. if dev1group:
  86. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  87. else:
  88. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
  89. if ev is None:
  90. raise Exception("dev2->dev1 unicast data delivery failed")
  91. if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
  92. raise Exception("Unexpected dev2->dev1 unicast data result")
  93. if broadcast:
  94. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
  95. for i in xrange(broadcast_retry_c):
  96. try:
  97. if dev2group:
  98. dev2.group_request(cmd)
  99. else:
  100. dev2.request(cmd)
  101. if dev1group:
  102. ev = dev1.wait_group_event(["DATA-TEST-RX"],
  103. timeout=timeout)
  104. else:
  105. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
  106. if ev is None:
  107. raise Exception("dev2->dev1 broadcast data delivery failed")
  108. if multicast_to_unicast:
  109. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) in ev:
  110. raise Exception("Unexpected dev2->dev1 broadcast data result: multicast to unicast conversion missing")
  111. if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
  112. raise Exception("Unexpected dev2->dev1 broadcast data result (multicast to unicast enabled)")
  113. else:
  114. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
  115. raise Exception("Unexpected dev2->dev1 broadcast data result")
  116. break
  117. except Exception as e:
  118. if i == broadcast_retry_c - 1:
  119. raise
  120. finally:
  121. if config:
  122. if dev1group:
  123. dev1.group_request("DATA_TEST_CONFIG 0")
  124. else:
  125. dev1.request("DATA_TEST_CONFIG 0")
  126. if dev2group:
  127. dev2.group_request("DATA_TEST_CONFIG 0")
  128. else:
  129. dev2.request("DATA_TEST_CONFIG 0")
  130. def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
  131. dev1group=False, dev2group=False,
  132. ifname1=None, ifname2=None, config=True, timeout=5,
  133. multicast_to_unicast=False, success_expected=True,
  134. broadcast=True):
  135. if dscp:
  136. tos = dscp << 2
  137. if not tos:
  138. tos = 0
  139. success = False
  140. last_err = None
  141. for i in range(0, max_tries):
  142. try:
  143. run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
  144. ifname1, ifname2, config=config,
  145. timeout=timeout,
  146. multicast_to_unicast=multicast_to_unicast,
  147. broadcast=broadcast)
  148. success = True
  149. break
  150. except Exception, e:
  151. last_err = e
  152. if i + 1 < max_tries:
  153. time.sleep(1)
  154. if success_expected and not success:
  155. raise Exception(last_err)
  156. if not success_expected and success:
  157. raise Exception("Unexpected connectivity detected")
  158. def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
  159. max_tries=1, timeout=5):
  160. test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
  161. max_tries=max_tries, timeout=timeout)
  162. def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
  163. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
  164. def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
  165. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
  166. def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
  167. test_connectivity(dev1, dev2, dscp, tos)
  168. (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
  169. def set_powersave(dev, val):
  170. phy = dev.get_driver_status_field("phyname")
  171. fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
  172. data = '%d' % val
  173. (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
  174. if res != 0:
  175. raise Exception("Failed to set power save for device")
  176. def set_group_map(dev, val):
  177. phy = dev.get_driver_status_field("phyname")
  178. fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy
  179. data = '%d' % val
  180. (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
  181. if res != 0:
  182. raise Exception("Failed to set group map for %s" % phy)