wlantest.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # Python class for controlling wlantest
  2. # Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import os
  7. import time
  8. import subprocess
  9. import logging
  10. import wpaspy
  11. logger = logging.getLogger()
  12. class Wlantest:
  13. def __init__(self):
  14. if os.path.isfile('../../wlantest/wlantest_cli'):
  15. self.wlantest_cli = '../../wlantest/wlantest_cli'
  16. else:
  17. self.wlantest_cli = 'wlantest_cli'
  18. def flush(self):
  19. res = subprocess.check_output([self.wlantest_cli, "flush"])
  20. if "FAIL" in res:
  21. raise Exception("wlantest_cli flush failed")
  22. def relog(self):
  23. res = subprocess.check_output([self.wlantest_cli, "relog"])
  24. if "FAIL" in res:
  25. raise Exception("wlantest_cli relog failed")
  26. def add_passphrase(self, passphrase):
  27. res = subprocess.check_output([self.wlantest_cli, "add_passphrase",
  28. passphrase])
  29. if "FAIL" in res:
  30. raise Exception("wlantest_cli add_passphrase failed")
  31. def add_wepkey(self, key):
  32. res = subprocess.check_output([self.wlantest_cli, "add_wepkey", key])
  33. if "FAIL" in res:
  34. raise Exception("wlantest_cli add_key failed")
  35. def info_bss(self, field, bssid):
  36. res = subprocess.check_output([self.wlantest_cli, "info_bss",
  37. field, bssid])
  38. if "FAIL" in res:
  39. raise Exception("Could not get BSS info from wlantest for " + bssid)
  40. return res
  41. def get_bss_counter(self, field, bssid):
  42. try:
  43. res = subprocess.check_output([self.wlantest_cli, "get_bss_counter",
  44. field, bssid]);
  45. except Exception, e:
  46. return 0
  47. if "FAIL" in res:
  48. return 0
  49. return int(res)
  50. def clear_bss_counters(self, bssid):
  51. subprocess.call([self.wlantest_cli, "clear_bss_counters", bssid],
  52. stdout=open('/dev/null', 'w'));
  53. def info_sta(self, field, bssid, addr):
  54. res = subprocess.check_output([self.wlantest_cli, "info_sta",
  55. field, bssid, addr])
  56. if "FAIL" in res:
  57. raise Exception("Could not get STA info from wlantest for " + addr)
  58. return res
  59. def get_sta_counter(self, field, bssid, addr):
  60. res = subprocess.check_output([self.wlantest_cli, "get_sta_counter",
  61. field, bssid, addr]);
  62. if "FAIL" in res:
  63. raise Exception("wlantest_cli command failed")
  64. return int(res)
  65. def clear_sta_counters(self, bssid, addr):
  66. res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
  67. bssid, addr]);
  68. if "FAIL" in res:
  69. raise Exception("wlantest_cli command failed")
  70. def tdls_clear(self, bssid, addr1, addr2):
  71. res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
  72. bssid, addr1, addr2]);
  73. def get_tdls_counter(self, field, bssid, addr1, addr2):
  74. res = subprocess.check_output([self.wlantest_cli, "get_tdls_counter",
  75. field, bssid, addr1, addr2]);
  76. if "FAIL" in res:
  77. raise Exception("wlantest_cli command failed")
  78. return int(res)
  79. def require_ap_pmf_mandatory(self, bssid):
  80. res = self.info_bss("rsn_capab", bssid)
  81. if "MFPR" not in res:
  82. raise Exception("AP did not require PMF")
  83. if "MFPC" not in res:
  84. raise Exception("AP did not enable PMF")
  85. res = self.info_bss("key_mgmt", bssid)
  86. if "PSK-SHA256" not in res:
  87. raise Exception("AP did not enable SHA256-based AKM for PMF")
  88. def require_ap_pmf_optional(self, bssid):
  89. res = self.info_bss("rsn_capab", bssid)
  90. if "MFPR" in res:
  91. raise Exception("AP required PMF")
  92. if "MFPC" not in res:
  93. raise Exception("AP did not enable PMF")
  94. def require_ap_no_pmf(self, bssid):
  95. res = self.info_bss("rsn_capab", bssid)
  96. if "MFPR" in res:
  97. raise Exception("AP required PMF")
  98. if "MFPC" in res:
  99. raise Exception("AP enabled PMF")
  100. def require_sta_pmf_mandatory(self, bssid, addr):
  101. res = self.info_sta("rsn_capab", bssid, addr)
  102. if "MFPR" not in res:
  103. raise Exception("STA did not require PMF")
  104. if "MFPC" not in res:
  105. raise Exception("STA did not enable PMF")
  106. def require_sta_pmf(self, bssid, addr):
  107. res = self.info_sta("rsn_capab", bssid, addr)
  108. if "MFPC" not in res:
  109. raise Exception("STA did not enable PMF")
  110. def require_sta_no_pmf(self, bssid, addr):
  111. res = self.info_sta("rsn_capab", bssid, addr)
  112. if "MFPC" in res:
  113. raise Exception("STA enabled PMF")
  114. def require_sta_key_mgmt(self, bssid, addr, key_mgmt):
  115. res = self.info_sta("key_mgmt", bssid, addr)
  116. if key_mgmt not in res:
  117. raise Exception("Unexpected STA key_mgmt")
  118. def get_tx_tid(self, bssid, addr, tid):
  119. res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
  120. bssid, addr, str(tid)]);
  121. if "FAIL" in res:
  122. raise Exception("wlantest_cli command failed")
  123. return int(res)
  124. def get_rx_tid(self, bssid, addr, tid):
  125. res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
  126. bssid, addr, str(tid)]);
  127. if "FAIL" in res:
  128. raise Exception("wlantest_cli command failed")
  129. return int(res)
  130. def get_tid_counters(self, bssid, addr):
  131. tx = {}
  132. rx = {}
  133. for tid in range(0, 17):
  134. tx[tid] = self.get_tx_tid(bssid, addr, tid)
  135. rx[tid] = self.get_rx_tid(bssid, addr, tid)
  136. return [ tx, rx ]