utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # Testing utilities
  2. # Copyright (c) 2013-2015, Jouni Malinen <j@w1.fi>
  3. #
  4. # This software may be distributed under the terms of the BSD license.
  5. # See README for more details.
  6. import binascii
  7. import os
  8. import struct
  9. import time
  10. import remotehost
  11. def get_ifnames():
  12. ifnames = []
  13. with open("/proc/net/dev", "r") as f:
  14. lines = f.readlines()
  15. for l in lines:
  16. val = l.split(':', 1)
  17. if len(val) == 2:
  18. ifnames.append(val[0].strip(' '))
  19. return ifnames
  20. class HwsimSkip(Exception):
  21. def __init__(self, reason):
  22. self.reason = reason
  23. def __str__(self):
  24. return self.reason
  25. class alloc_fail(object):
  26. def __init__(self, dev, count, funcs):
  27. self._dev = dev
  28. self._count = count
  29. self._funcs = funcs
  30. def __enter__(self):
  31. cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
  32. if "OK" not in self._dev.request(cmd):
  33. raise HwsimSkip("TEST_ALLOC_FAIL not supported")
  34. def __exit__(self, type, value, traceback):
  35. if type is None:
  36. if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
  37. raise Exception("Allocation failure did not trigger")
  38. class fail_test(object):
  39. def __init__(self, dev, count, funcs):
  40. self._dev = dev
  41. self._count = count
  42. self._funcs = funcs
  43. def __enter__(self):
  44. cmd = "TEST_FAIL %d:%s" % (self._count, self._funcs)
  45. if "OK" not in self._dev.request(cmd):
  46. raise HwsimSkip("TEST_FAIL not supported")
  47. def __exit__(self, type, value, traceback):
  48. if type is None:
  49. if self._dev.request("GET_FAIL") != "0:%s" % self._funcs:
  50. raise Exception("Test failure did not trigger")
  51. def wait_fail_trigger(dev, cmd, note="Failure not triggered", max_iter=40):
  52. for i in range(0, max_iter):
  53. if dev.request(cmd).startswith("0:"):
  54. break
  55. if i == max_iter - 1:
  56. raise Exception(note)
  57. time.sleep(0.05)
  58. def require_under_vm():
  59. with open('/proc/1/cmdline', 'r') as f:
  60. cmd = f.read()
  61. if "inside.sh" not in cmd:
  62. raise HwsimSkip("Not running under VM")
  63. def iface_is_in_bridge(bridge, ifname):
  64. fname = "/sys/class/net/"+ifname+"/brport/bridge"
  65. if not os.path.exists(fname):
  66. return False
  67. if not os.path.islink(fname):
  68. return False
  69. truebridge = os.path.basename(os.readlink(fname))
  70. if bridge == truebridge:
  71. return True
  72. return False
  73. def skip_with_fips(dev, reason="Not supported in FIPS mode"):
  74. res = dev.get_capability("fips")
  75. if res and 'FIPS' in res:
  76. raise HwsimSkip(reason)
  77. def get_phy(ap, ifname=None):
  78. phy = "phy3"
  79. try:
  80. hostname = ap['hostname']
  81. except:
  82. hostname = None
  83. host = remotehost.Host(hostname)
  84. if ifname == None:
  85. ifname = ap['ifname']
  86. status, buf = host.execute(["iw", "dev", ifname, "info"])
  87. if status != 0:
  88. raise Exception("iw " + ifname + " info failed")
  89. lines = buf.split("\n")
  90. for line in lines:
  91. if "wiphy" in line:
  92. words = line.split()
  93. phy = "phy" + words[1]
  94. break
  95. return phy
  96. def parse_ie(buf):
  97. ret = {}
  98. data = binascii.unhexlify(buf)
  99. while len(data) >= 2:
  100. ie,elen = struct.unpack('BB', data[0:2])
  101. data = data[2:]
  102. if elen > len(data):
  103. break
  104. ret[ie] = data[0:elen]
  105. data = data[elen:]
  106. return ret