Browse Source

tests: Initial mechanism for RADIUS protocol testing

This brings in a minimal pyrad-based RADIUS server to allow various
protocol tests to be run. For now, the server is not adding
Message-Authenticator, so that error case is checked. Additional tests
can be added in the future.

Signed-off-by: Jouni Malinen <j@w1.fi>
Jouni Malinen 10 years ago
parent
commit
8f614cd7a7
1 changed files with 65 additions and 0 deletions
  1. 65 0
      tests/hwsim/test_radius.py

+ 65 - 0
tests/hwsim/test_radius.py

@@ -6,7 +6,9 @@
 
 import logging
 logger = logging.getLogger()
+import select
 import subprocess
+import threading
 import time
 
 import hostapd
@@ -590,3 +592,66 @@ def test_radius_failover(dev, apdev):
     req_e = int(as_mib_end['radiusAccServTotalRequests'])
     if req_e <= req_s:
         raise Exception("Unexpected RADIUS server acct MIB value")
+
+def run_pyrad_server(srv, stop_event):
+    srv.RunWithStop(stop_event)
+
+def test_radius_protocol(dev, apdev):
+    """RADIUS Authentication protocol tests with a fake server"""
+    try:
+        import pyrad.server
+        import pyrad.packet
+        import pyrad.dictionary
+    except ImportError:
+        return "skip"
+
+    class TestServer(pyrad.server.Server):
+        def _HandleAuthPacket(self, pkt):
+            pyrad.server.Server._HandleAuthPacket(self, pkt)
+            logger.info("Received authentication request")
+            reply = self.CreateReplyPacket(pkt)
+            reply.code = pyrad.packet.AccessAccept
+            # TODO: Add attributes. For now, this works as a test case for
+            # missing Message-Authenticator.
+            self.SendReplyPacket(pkt.fd, reply)
+
+        def RunWithStop(self, stop_event):
+            self._poll = select.poll()
+            self._fdmap = {}
+            self._PrepareSockets()
+
+            while not stop_event.is_set():
+                for (fd, event) in self._poll.poll(1000):
+                    if event == select.POLLIN:
+                        try:
+                            fdo = self._fdmap[fd]
+                            self._ProcessInput(fdo)
+                        except ServerPacketError as err:
+                            logger.info("pyrad server dropping packet: " + str(err))
+                        except pyrad.packet.PacketError as err:
+                            logger.info("pyrad server received invalid packet: " + str(err))
+                    else:
+                        logger.error("Unexpected event in pyrad server main loop")
+
+    srv = TestServer(dict=pyrad.dictionary.Dictionary("dictionary.radius"),
+                     authport=18138, acctport=18139)
+    srv.hosts["127.0.0.1"] = pyrad.server.RemoteHost("127.0.0.1",
+                                                     "radius",
+                                                     "localhost")
+    srv.BindToAddress("")
+    t_stop = threading.Event()
+    t = threading.Thread(target=run_pyrad_server, args=(srv, t_stop))
+    t.start()
+
+    try:
+        params = hostapd.wpa2_eap_params(ssid="radius-test")
+        params['auth_server_port'] = "18138"
+        hapd = hostapd.add_ap(apdev[0]['ifname'], params)
+        connect(dev[0], "radius-test", wait_connect=False)
+        ev = dev[0].wait_event(["CTRL-EVENT-EAP-STARTED"], timeout=15)
+        if ev is None:
+            raise Exception("Timeout on EAP start")
+        time.sleep(1)
+    finally:
+        t_stop.set()
+        t.join()