hwsim_utils.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # hwsim testing utilities
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import os
  7. import subprocess
  8. import time
  9. import logging
  10. logger = logging.getLogger()
  11. from wpasupplicant import WpaSupplicant
  12. def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
  13. ifname1=None, ifname2=None, config=True, timeout=5):
  14. addr1 = dev1.own_addr()
  15. if not dev1group and isinstance(dev1, WpaSupplicant):
  16. addr1 = dev1.get_driver_status_field('addr')
  17. addr2 = dev2.own_addr()
  18. if not dev2group and isinstance(dev2, WpaSupplicant):
  19. addr2 = dev2.get_driver_status_field('addr')
  20. dev1.dump_monitor()
  21. dev2.dump_monitor()
  22. try:
  23. if config:
  24. cmd = "DATA_TEST_CONFIG 1"
  25. if ifname1:
  26. cmd = cmd + " ifname=" + ifname1
  27. if dev1group:
  28. res = dev1.group_request(cmd)
  29. else:
  30. res = dev1.request(cmd)
  31. if "OK" not in res:
  32. raise Exception("Failed to enable data test functionality")
  33. cmd = "DATA_TEST_CONFIG 1"
  34. if ifname2:
  35. cmd = cmd + " ifname=" + ifname2
  36. if dev2group:
  37. res = dev2.group_request(cmd)
  38. else:
  39. res = dev2.request(cmd)
  40. if "OK" not in res:
  41. raise Exception("Failed to enable data test functionality")
  42. cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
  43. if dev1group:
  44. dev1.group_request(cmd)
  45. else:
  46. dev1.request(cmd)
  47. if dev2group:
  48. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  49. else:
  50. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
  51. if ev is None:
  52. raise Exception("dev1->dev2 unicast data delivery failed")
  53. if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
  54. raise Exception("Unexpected dev1->dev2 unicast data result")
  55. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
  56. if dev1group:
  57. dev1.group_request(cmd)
  58. else:
  59. dev1.request(cmd)
  60. if dev2group:
  61. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  62. else:
  63. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
  64. if ev is None:
  65. raise Exception("dev1->dev2 broadcast data delivery failed")
  66. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
  67. raise Exception("Unexpected dev1->dev2 broadcast data result")
  68. cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
  69. if dev2group:
  70. dev2.group_request(cmd)
  71. else:
  72. dev2.request(cmd)
  73. if dev1group:
  74. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  75. else:
  76. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
  77. if ev is None:
  78. raise Exception("dev2->dev1 unicast data delivery failed")
  79. if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
  80. raise Exception("Unexpected dev2->dev1 unicast data result")
  81. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
  82. if dev2group:
  83. dev2.group_request(cmd)
  84. else:
  85. dev2.request(cmd)
  86. if dev1group:
  87. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
  88. else:
  89. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
  90. if ev is None:
  91. raise Exception("dev2->dev1 broadcast data delivery failed")
  92. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
  93. raise Exception("Unexpected dev2->dev1 broadcast data result")
  94. finally:
  95. if config:
  96. if dev1group:
  97. dev1.group_request("DATA_TEST_CONFIG 0")
  98. else:
  99. dev1.request("DATA_TEST_CONFIG 0")
  100. if dev2group:
  101. dev2.group_request("DATA_TEST_CONFIG 0")
  102. else:
  103. dev2.request("DATA_TEST_CONFIG 0")
  104. def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
  105. dev1group=False, dev2group=False,
  106. ifname1=None, ifname2=None, config=True, timeout=5):
  107. if dscp:
  108. tos = dscp << 2
  109. if not tos:
  110. tos = 0
  111. success = False
  112. last_err = None
  113. for i in range(0, max_tries):
  114. try:
  115. run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
  116. ifname1, ifname2, config=config,
  117. timeout=timeout)
  118. success = True
  119. break
  120. except Exception, e:
  121. last_err = e
  122. if i + 1 < max_tries:
  123. time.sleep(1)
  124. if not success:
  125. raise Exception(last_err)
  126. def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
  127. max_tries=1):
  128. test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
  129. max_tries=max_tries)
  130. def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
  131. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
  132. def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
  133. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
  134. def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
  135. test_connectivity(dev1, dev2, dscp, tos)
  136. (PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = range(4)
  137. def set_powersave(dev, val):
  138. phy = dev.get_driver_status_field("phyname")
  139. psf = open('/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy, 'w')
  140. psf.write('%d\n' % val)
  141. psf.close()