hwsim_utils.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # hwsim testing utilities
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import os
  7. import subprocess
  8. import time
  9. import logging
  10. logger = logging.getLogger()
  11. from wpasupplicant import WpaSupplicant
  12. def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
  13. ifname1=None, ifname2=None):
  14. addr1 = dev1.own_addr()
  15. if not dev1group and isinstance(dev1, WpaSupplicant):
  16. addr1 = dev1.get_driver_status_field('addr')
  17. addr2 = dev2.own_addr()
  18. if not dev2group and isinstance(dev2, WpaSupplicant):
  19. addr2 = dev2.get_driver_status_field('addr')
  20. dev1.dump_monitor()
  21. dev2.dump_monitor()
  22. try:
  23. cmd = "DATA_TEST_CONFIG 1"
  24. if ifname1:
  25. cmd = cmd + " ifname=" + ifname1
  26. if dev1group:
  27. res = dev1.group_request(cmd)
  28. else:
  29. res = dev1.request(cmd)
  30. if "OK" not in res:
  31. raise Exception("Failed to enable data test functionality")
  32. cmd = "DATA_TEST_CONFIG 1"
  33. if ifname2:
  34. cmd = cmd + " ifname=" + ifname2
  35. if dev2group:
  36. res = dev2.group_request(cmd)
  37. else:
  38. res = dev2.request(cmd)
  39. if "OK" not in res:
  40. raise Exception("Failed to enable data test functionality")
  41. cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
  42. if dev1group:
  43. dev1.group_request(cmd)
  44. else:
  45. dev1.request(cmd)
  46. if dev2group:
  47. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
  48. else:
  49. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
  50. if ev is None:
  51. raise Exception("dev1->dev2 unicast data delivery failed")
  52. if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
  53. raise Exception("Unexpected dev1->dev2 unicast data result")
  54. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
  55. if dev1group:
  56. dev1.group_request(cmd)
  57. else:
  58. dev1.request(cmd)
  59. if dev2group:
  60. ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
  61. else:
  62. ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
  63. if ev is None:
  64. raise Exception("dev1->dev2 broadcast data delivery failed")
  65. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
  66. raise Exception("Unexpected dev1->dev2 broadcast data result")
  67. cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
  68. if dev2group:
  69. dev2.group_request(cmd)
  70. else:
  71. dev2.request(cmd)
  72. if dev1group:
  73. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
  74. else:
  75. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
  76. if ev is None:
  77. raise Exception("dev2->dev1 unicast data delivery failed")
  78. if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
  79. raise Exception("Unexpected dev2->dev1 unicast data result")
  80. cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
  81. if dev2group:
  82. dev2.group_request(cmd)
  83. else:
  84. dev2.request(cmd)
  85. if dev1group:
  86. ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
  87. else:
  88. ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
  89. if ev is None:
  90. raise Exception("dev2->dev1 broadcast data delivery failed")
  91. if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
  92. raise Exception("Unexpected dev2->dev1 broadcast data result")
  93. finally:
  94. if dev1group:
  95. dev1.group_request("DATA_TEST_CONFIG 0")
  96. else:
  97. dev1.request("DATA_TEST_CONFIG 0")
  98. if dev2group:
  99. dev2.group_request("DATA_TEST_CONFIG 0")
  100. else:
  101. dev2.request("DATA_TEST_CONFIG 0")
  102. def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
  103. dev1group=False, dev2group=False,
  104. ifname1=None, ifname2=None):
  105. if dscp:
  106. tos = dscp << 2
  107. if not tos:
  108. tos = 0
  109. success = False
  110. last_err = None
  111. for i in range(0, max_tries):
  112. try:
  113. run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
  114. ifname1, ifname2)
  115. success = True
  116. break
  117. except Exception, e:
  118. last_err = e
  119. if i + 1 < max_tries:
  120. time.sleep(1)
  121. if not success:
  122. raise Exception(last_err)
  123. def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
  124. max_tries=1):
  125. test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
  126. max_tries=max_tries)
  127. def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
  128. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
  129. def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
  130. test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
  131. def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
  132. test_connectivity(dev1, dev2, dscp, tos)