123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552 |
- # FST configuration tests
- # Copyright (c) 2015, Qualcomm Atheros, Inc.
- #
- # This software may be distributed under the terms of the BSD license.
- # See README for more details.
- import logging
- logger = logging.getLogger()
- import subprocess
- import time
- import os
- import signal
- import hostapd
- import wpasupplicant
- import utils
- import fst_test_common
- class FstLauncherConfig:
- """FstLauncherConfig class represents configuration to be used for
- FST config tests related hostapd/wpa_supplicant instances"""
- def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
- self.iface = iface
- self.fst_group = fst_group
- self.fst_pri = fst_pri
- self.fst_llt = fst_llt # None llt means no llt parameter will be set
- def ifname(self):
- return self.iface
- def is_ap(self):
- """Returns True if the configuration is for AP, otherwise - False"""
- raise Exception("Virtual is_ap() called!")
- def to_file(self, pathname):
- """Creates configuration file to be used by FST config tests related
- hostapd/wpa_supplicant instances"""
- raise Exception("Virtual to_file() called!")
- class FstLauncherConfigAP(FstLauncherConfig):
- """FstLauncherConfigAP class represents configuration to be used for
- FST config tests related hostapd instance"""
- def __init__(self, iface, ssid, mode, chan, fst_group, fst_pri,
- fst_llt=None):
- self.ssid = ssid
- self.mode = mode
- self.chan = chan
- FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
- def is_ap(self):
- return True
- def get_channel(self):
- return self.chan
- def to_file(self, pathname):
- """Creates configuration file to be used by FST config tests related
- hostapd instance"""
- with open(pathname, "w") as f:
- f.write("country_code=US\n"
- "interface=%s\n"
- "ctrl_interface=/var/run/hostapd\n"
- "ssid=%s\n"
- "channel=%s\n"
- "hw_mode=%s\n"
- "ieee80211n=1\n" % (self.iface, self.ssid, self.chan,
- self.mode))
- if len(self.fst_group) != 0:
- f.write("fst_group_id=%s\n"
- "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
- if self.fst_llt is not None:
- f.write("fst_llt=%s\n" % self.fst_llt)
- with open(pathname, "r") as f:
- logger.debug("wrote hostapd config file %s:\n%s" % (pathname,
- f.read()))
- class FstLauncherConfigSTA(FstLauncherConfig):
- """FstLauncherConfig class represents configuration to be used for
- FST config tests related wpa_supplicant instance"""
- def __init__(self, iface, fst_group, fst_pri, fst_llt=None):
- FstLauncherConfig.__init__(self, iface, fst_group, fst_pri, fst_llt)
- def is_ap(self):
- return False
- def to_file(self, pathname):
- """Creates configuration file to be used by FST config tests related
- wpa_supplicant instance"""
- with open(pathname, "w") as f:
- f.write("ctrl_interface=DIR=/var/run/wpa_supplicant\n"
- "p2p_no_group_iface=1\n")
- if len(self.fst_group) != 0:
- f.write("fst_group_id=%s\n"
- "fst_priority=%s\n" % (self.fst_group, self.fst_pri))
- if self.fst_llt is not None:
- f.write("fst_llt=%s\n" % self.fst_llt)
- with open(pathname, "r") as f:
- logger.debug("wrote wpa_supplicant config file %s:\n%s" % (pathname, f.read()))
- class FstLauncher:
- """FstLauncher class is responsible for launching and cleaning up of FST
- config tests related hostapd/wpa_supplicant instances"""
- def __init__(self, logpath):
- self.logger = logging.getLogger()
- self.fst_logpath = logpath
- self.cfgs_to_run = []
- self.hapd_fst_global = '/var/run/hostapd-fst-global'
- self.wsup_fst_global = '/tmp/fststa'
- self.nof_aps = 0
- self.nof_stas = 0
- self.reg_ctrl = fst_test_common.HapdRegCtrl()
- self.test_is_supported()
- def __del__(self):
- self.cleanup()
- @staticmethod
- def test_is_supported():
- h = hostapd.HostapdGlobal()
- resp = h.request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
- if not resp.startswith("OK"):
- raise utils.HwsimSkip("FST not supported")
- w = wpasupplicant.WpaSupplicant(global_iface='/tmp/wpas-wlan5')
- resp = w.global_request("FST-MANAGER TEST_REQUEST IS_SUPPORTED")
- if not resp.startswith("OK"):
- raise utils.HwsimSkip("FST not supported")
- def get_cfg_pathname(self, cfg):
- """Returns pathname of ifname based configuration file"""
- return self.fst_logpath +'/'+ cfg.ifname() + '.conf'
- def add_cfg(self, cfg):
- """Adds configuration to be used for launching hostapd/wpa_supplicant
- instances"""
- if cfg not in self.cfgs_to_run:
- self.cfgs_to_run.append(cfg)
- if cfg.is_ap() == True:
- self.nof_aps += 1
- else:
- self.nof_stas += 1
- def remove_cfg(self, cfg):
- """Removes configuration previously added with add_cfg"""
- if cfg in self.cfgs_to_run:
- self.cfgs_to_run.remove(cfg)
- if cfg.is_ap() == True:
- self.nof_aps -= 1
- else:
- self.nof_stas -= 1
- config_file = self.get_cfg_pathname(cfg)
- if os.path.exists(config_file):
- os.remove(config_file)
- def run_hostapd(self):
- """Lauches hostapd with interfaces configured according to
- FstLauncherConfigAP configurations added"""
- if self.nof_aps == 0:
- raise Exception("No FST APs to start")
- pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
- mylogfile = self.fst_logpath + '/' + 'fst-hostapd'
- prg = os.path.join(self.fst_logpath,
- 'alt-hostapd/hostapd/hostapd')
- if not os.path.exists(prg):
- prg = '../../hostapd/hostapd'
- cmd = [ prg, '-B', '-dddt',
- '-P', pidfile, '-f', mylogfile, '-g', self.hapd_fst_global]
- for i in range(0, len(self.cfgs_to_run)):
- cfg = self.cfgs_to_run[i]
- if cfg.is_ap() == True:
- cfgfile = self.get_cfg_pathname(cfg)
- cfg.to_file(cfgfile)
- cmd.append(cfgfile)
- self.reg_ctrl.add_ap(cfg.ifname(), cfg.get_channel())
- self.logger.debug("Starting fst hostapd: " + ' '.join(cmd))
- res = subprocess.call(cmd)
- self.logger.debug("fst hostapd start result: %d" % res)
- if res == 0:
- self.reg_ctrl.start()
- return res
- def run_wpa_supplicant(self):
- """Lauches wpa_supplicant with interfaces configured according to
- FstLauncherConfigSTA configurations added"""
- if self.nof_stas == 0:
- raise Exception("No FST STAs to start")
- pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
- mylogfile = self.fst_logpath + '/' + 'fst-wpa_supplicant'
- prg = os.path.join(self.fst_logpath,
- 'alt-wpa_supplicant/wpa_supplicant/wpa_supplicant')
- if not os.path.exists(prg):
- prg = '../../wpa_supplicant/wpa_supplicant'
- cmd = [ prg, '-B', '-dddt',
- '-P' + pidfile, '-f', mylogfile, '-g', self.wsup_fst_global ]
- sta_no = 0
- for i in range(0, len(self.cfgs_to_run)):
- cfg = self.cfgs_to_run[i]
- if cfg.is_ap() == False:
- cfgfile = self.get_cfg_pathname(cfg)
- cfg.to_file(cfgfile)
- cmd.append('-c' + cfgfile)
- cmd.append('-i' + cfg.ifname())
- cmd.append('-Dnl80211')
- if sta_no != self.nof_stas -1:
- cmd.append('-N') # Next station configuration
- sta_no += 1
- self.logger.debug("Starting fst supplicant: " + ' '.join(cmd))
- res = subprocess.call(cmd)
- self.logger.debug("fst supplicant start result: %d" % res)
- return res
- def cleanup(self):
- """Terminates hostapd/wpa_supplicant processes previously launched with
- run_hostapd/run_wpa_supplicant"""
- pidfile = self.fst_logpath + '/' + 'myhostapd.pid'
- self.kill_pid(pidfile, self.nof_aps > 0)
- pidfile = self.fst_logpath + '/' + 'mywpa_supplicant.pid'
- self.kill_pid(pidfile, self.nof_stas > 0)
- self.reg_ctrl.stop()
- while len(self.cfgs_to_run) != 0:
- cfg = self.cfgs_to_run[0]
- self.remove_cfg(cfg)
- def kill_pid(self, pidfile, try_again=False):
- """Kills process by PID file"""
- if not os.path.exists(pidfile):
- if not try_again:
- return
- # It might take some time for the process to write the PID file,
- # so wait a bit longer before giving up.
- self.logger.info("kill_pid: pidfile %s does not exist - try again after a second" % pidfile)
- time.sleep(1)
- if not os.path.exists(pidfile):
- self.logger.info("kill_pid: pidfile %s does not exist - could not kill the process" % pidfile)
- return
- pid = -1
- try:
- for i in range(3):
- pf = file(pidfile, 'r')
- pidtxt = pf.read().strip()
- self.logger.debug("kill_pid: %s: '%s'" % (pidfile, pidtxt))
- pf.close()
- try:
- pid = int(pidtxt)
- break
- except Exception, e:
- self.logger.debug("kill_pid: No valid PID found: %s" % str(e))
- time.sleep(1)
- self.logger.debug("kill_pid %s --> pid %d" % (pidfile, pid))
- os.kill(pid, signal.SIGTERM)
- for i in range(10):
- try:
- # Poll the pid (Is the process still existing?)
- os.kill(pid, 0)
- except OSError:
- # No, already done
- break
- # Wait and check again
- time.sleep(1)
- except Exception, e:
- self.logger.debug("Didn't stop the pid=%d. Was it stopped already? (%s)" % (pid, str(e)))
- def parse_ies(iehex, el=-1):
- """Parses the information elements hex string 'iehex' in format
- "0a0b0c0d0e0f". If no 'el' defined just checks the IE string for integrity.
- If 'el' is defined returns the list of hex values of the specific IE (or
- empty list if the element is not in the string."""
- iel = [iehex[i:i + 2] for i in range(0, len(iehex), 2)]
- for i in range(0, len(iel)):
- iel[i] = int(iel[i], 16)
- # Sanity check
- i = 0
- res = []
- while i < len(iel):
- logger.debug("IE found: %x" % iel[i])
- if el != -1 and el == iel[i]:
- res = iel[i + 2:i + 2 + iel[i + 1]]
- i += 2 + iel[i + 1]
- if i != len(iel):
- logger.error("Bad IE string: " + iehex)
- res = []
- return res
- def scan_and_get_bss(dev, frq):
- """Issues a scan on given device on given frequency, returns the bss info
- dictionary ('ssid','ie','flags', etc.) or None. Note, the function
- implies there is only one AP on the given channel. If not a case,
- the function must be changed to call dev.get_bss() till the AP with the
- [b]ssid that we need is found"""
- dev.scan(freq=frq)
- return dev.get_bss('0')
- # AP configuration tests
- def run_test_ap_configuration(apdev, test_params,
- fst_group = fst_test_common.fst_test_def_group,
- fst_pri = fst_test_common.fst_test_def_prio_high,
- fst_llt = fst_test_common.fst_test_def_llt):
- """Runs FST hostapd where the 1st AP configuration is fixed, the 2nd fst
- configuration is provided by the parameters. Returns the result of the run:
- 0 - no errors discovered, an error otherwise. The function is used for
- simplek "bad configuration" tests."""
- logdir = test_params['logdir']
- fst_launcher = FstLauncher(logdir)
- ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_goodconf', 'a',
- fst_test_common.fst_test_def_chan_a,
- fst_test_common.fst_test_def_group,
- fst_test_common.fst_test_def_prio_low,
- fst_test_common.fst_test_def_llt)
- ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_badconf', 'b',
- fst_test_common.fst_test_def_chan_g, fst_group,
- fst_pri, fst_llt)
- fst_launcher.add_cfg(ap1)
- fst_launcher.add_cfg(ap2)
- res = fst_launcher.run_hostapd()
- return res
- def run_test_sta_configuration(test_params,
- fst_group = fst_test_common.fst_test_def_group,
- fst_pri = fst_test_common.fst_test_def_prio_high,
- fst_llt = fst_test_common.fst_test_def_llt):
- """Runs FST wpa_supplicant where the 1st STA configuration is fixed, the
- 2nd fst configuration is provided by the parameters. Returns the result of
- the run: 0 - no errors discovered, an error otherwise. The function is used
- for simple "bad configuration" tests."""
- logdir = test_params['logdir']
- fst_launcher = FstLauncher(logdir)
- sta1 = FstLauncherConfigSTA('wlan5',
- fst_test_common.fst_test_def_group,
- fst_test_common.fst_test_def_prio_low,
- fst_test_common.fst_test_def_llt)
- sta2 = FstLauncherConfigSTA('wlan6', fst_group, fst_pri, fst_llt)
- fst_launcher.add_cfg(sta1)
- fst_launcher.add_cfg(sta2)
- res = fst_launcher.run_wpa_supplicant()
- return res
- def test_fst_ap_config_llt_neg(dev, apdev, test_params):
- """FST AP configuration negative LLT"""
- res = run_test_ap_configuration(apdev, test_params, fst_llt = '-1')
- if res == 0:
- raise Exception("hostapd started with a negative llt")
- def test_fst_ap_config_llt_zero(dev, apdev, test_params):
- """FST AP configuration zero LLT"""
- res = run_test_ap_configuration(apdev, test_params, fst_llt = '0')
- if res == 0:
- raise Exception("hostapd started with a zero llt")
- def test_fst_ap_config_llt_too_big(dev, apdev, test_params):
- """FST AP configuration LLT is too big"""
- res = run_test_ap_configuration(apdev, test_params,
- fst_llt = '4294967296') #0x100000000
- if res == 0:
- raise Exception("hostapd started with llt that is too big")
- def test_fst_ap_config_llt_nan(dev, apdev, test_params):
- """FST AP configuration LLT is not a number"""
- res = run_test_ap_configuration(apdev, test_params, fst_llt = 'nan')
- if res == 0:
- raise Exception("hostapd started with llt not a number")
- def test_fst_ap_config_pri_neg(dev, apdev, test_params):
- """FST AP configuration Priority negative"""
- res = run_test_ap_configuration(apdev, test_params, fst_pri = '-1')
- if res == 0:
- raise Exception("hostapd started with a negative fst priority")
- def test_fst_ap_config_pri_zero(dev, apdev, test_params):
- """FST AP configuration Priority zero"""
- res = run_test_ap_configuration(apdev, test_params, fst_pri = '0')
- if res == 0:
- raise Exception("hostapd started with a zero fst priority")
- def test_fst_ap_config_pri_large(dev, apdev, test_params):
- """FST AP configuration Priority too large"""
- res = run_test_ap_configuration(apdev, test_params, fst_pri = '256')
- if res == 0:
- raise Exception("hostapd started with too large fst priority")
- def test_fst_ap_config_pri_nan(dev, apdev, test_params):
- """FST AP configuration Priority not a number"""
- res = run_test_ap_configuration(apdev, test_params, fst_pri = 'nan')
- if res == 0:
- raise Exception("hostapd started with fst priority not a number")
- def test_fst_ap_config_group_len(dev, apdev, test_params):
- """FST AP configuration Group max length"""
- res = run_test_ap_configuration(apdev, test_params,
- fst_group = 'fstg5678abcd34567')
- if res == 0:
- raise Exception("hostapd started with fst_group length too big")
- def test_fst_ap_config_good(dev, apdev, test_params):
- """FST AP configuration good parameters"""
- res = run_test_ap_configuration(apdev, test_params)
- if res != 0:
- raise Exception("hostapd didn't start with valid config parameters")
- def test_fst_ap_config_default(dev, apdev, test_params):
- """FST AP configuration default parameters"""
- res = run_test_ap_configuration(apdev, test_params, fst_llt = None)
- if res != 0:
- raise Exception("hostapd didn't start with valid config parameters")
- # STA configuration tests
- def test_fst_sta_config_llt_neg(dev, apdev, test_params):
- """FST STA configuration negative LLT"""
- res = run_test_sta_configuration(test_params, fst_llt = '-1')
- if res == 0:
- raise Exception("wpa_supplicant started with a negative llt")
- def test_fst_sta_config_llt_zero(dev, apdev, test_params):
- """FST STA configuration zero LLT"""
- res = run_test_sta_configuration(test_params, fst_llt = '0')
- if res == 0:
- raise Exception("wpa_supplicant started with a zero llt")
- def test_fst_sta_config_llt_large(dev, apdev, test_params):
- """FST STA configuration LLT is too large"""
- res = run_test_sta_configuration(test_params,
- fst_llt = '4294967296') #0x100000000
- if res == 0:
- raise Exception("wpa_supplicant started with llt that is too large")
- def test_fst_sta_config_llt_nan(dev, apdev, test_params):
- """FST STA configuration LLT is not a number"""
- res = run_test_sta_configuration(test_params, fst_llt = 'nan')
- if res == 0:
- raise Exception("wpa_supplicant started with llt not a number")
- def test_fst_sta_config_pri_neg(dev, apdev, test_params):
- """FST STA configuration Priority negative"""
- res = run_test_sta_configuration(test_params, fst_pri = '-1')
- if res == 0:
- raise Exception("wpa_supplicant started with a negative fst priority")
- def test_fst_sta_config_pri_zero(dev, apdev, test_params):
- """FST STA configuration Priority zero"""
- res = run_test_sta_configuration(test_params, fst_pri = '0')
- if res == 0:
- raise Exception("wpa_supplicant started with a zero fst priority")
- def test_fst_sta_config_pri_big(dev, apdev, test_params):
- """FST STA configuration Priority too large"""
- res = run_test_sta_configuration(test_params, fst_pri = '256')
- if res == 0:
- raise Exception("wpa_supplicant started with too large fst priority")
- def test_fst_sta_config_pri_nan(dev, apdev, test_params):
- """FST STA configuration Priority not a number"""
- res = run_test_sta_configuration(test_params, fst_pri = 'nan')
- if res == 0:
- raise Exception("wpa_supplicant started with fst priority not a number")
- def test_fst_sta_config_group_len(dev, apdev, test_params):
- """FST STA configuration Group max length"""
- res = run_test_sta_configuration(test_params,
- fst_group = 'fstg5678abcd34567')
- if res == 0:
- raise Exception("wpa_supplicant started with fst_group length too big")
- def test_fst_sta_config_good(dev, apdev, test_params):
- """FST STA configuration good parameters"""
- res = run_test_sta_configuration(test_params)
- if res != 0:
- raise Exception("wpa_supplicant didn't start with valid config parameters")
- def test_fst_sta_config_default(dev, apdev, test_params):
- """FST STA configuration default parameters"""
- res = run_test_sta_configuration(test_params, fst_llt = None)
- if res != 0:
- raise Exception("wpa_supplicant didn't start with valid config parameters")
- def test_fst_scan_mb(dev, apdev, test_params):
- """FST scan valid MB IE presence with normal start"""
- logdir = test_params['logdir']
- # Test valid MB IE in scan results
- fst_launcher = FstLauncher(logdir)
- ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
- fst_test_common.fst_test_def_chan_a,
- fst_test_common.fst_test_def_group,
- fst_test_common.fst_test_def_prio_high)
- ap2 = FstLauncherConfigAP(apdev[1]['ifname'], 'fst_11g', 'b',
- fst_test_common.fst_test_def_chan_g,
- fst_test_common.fst_test_def_group,
- fst_test_common.fst_test_def_prio_low)
- fst_launcher.add_cfg(ap1)
- fst_launcher.add_cfg(ap2)
- res = fst_launcher.run_hostapd()
- if res != 0:
- raise Exception("hostapd didn't start properly")
- try:
- mbie1=[]
- flags1 = ''
- mbie2=[]
- flags2 = ''
- # Scan 1st AP
- vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
- if vals1 != None:
- if 'ie' in vals1:
- mbie1 = parse_ies(vals1['ie'], 0x9e)
- if 'flags' in vals1:
- flags1 = vals1['flags']
- # Scan 2nd AP
- vals2 = scan_and_get_bss(dev[2], fst_test_common.fst_test_def_freq_g)
- if vals2 != None:
- if 'ie' in vals2:
- mbie2 = parse_ies(vals2['ie'],0x9e)
- if 'flags' in vals2:
- flags2 = vals2['flags']
- finally:
- fst_launcher.cleanup()
- if len(mbie1) == 0:
- raise Exception("No MB IE created by 1st AP")
- if len(mbie2) == 0:
- raise Exception("No MB IE created by 2nd AP")
- def test_fst_scan_nomb(dev, apdev, test_params):
- """FST scan no MB IE presence with 1 AP start"""
- logdir = test_params['logdir']
- # Test valid MB IE in scan results
- fst_launcher = FstLauncher(logdir)
- ap1 = FstLauncherConfigAP(apdev[0]['ifname'], 'fst_11a', 'a',
- fst_test_common.fst_test_def_chan_a,
- fst_test_common.fst_test_def_group,
- fst_test_common.fst_test_def_prio_high)
- fst_launcher.add_cfg(ap1)
- res = fst_launcher.run_hostapd()
- if res != 0:
- raise Exception("Hostapd didn't start properly")
- try:
- time.sleep(2)
- mbie1=[]
- flags1 = ''
- vals1 = scan_and_get_bss(dev[0], fst_test_common.fst_test_def_freq_a)
- if vals1 != None:
- if 'ie' in vals1:
- mbie1 = parse_ies(vals1['ie'], 0x9e)
- if 'flags' in vals1:
- flags1 = vals1['flags']
- finally:
- fst_launcher.cleanup()
- if len(mbie1) != 0:
- raise Exception("MB IE exists with 1 AP")
|