|
@@ -9,6 +9,7 @@
|
|
|
import logging
|
|
|
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
|
|
|
from scapy.all import *
|
|
|
+import libwifi
|
|
|
from libwifi import *
|
|
|
import sys, socket, struct, time, subprocess, atexit, select, os.path
|
|
|
from wpaspy import Ctrl
|
|
@@ -37,12 +38,27 @@ def hostapd_command(hostapd_ctrl, cmd):
|
|
|
|
|
|
#### Main Testing Code ####
|
|
|
|
|
|
+class TestOptions():
|
|
|
+ ### XXX should be grouphs instead of groupkey
|
|
|
+ ReplayBroadcast, ReplayUnicast, Fourway, Groupkey = range(4)
|
|
|
+ TptkNone, TptkReplay, TptkRand = range(3)
|
|
|
+
|
|
|
+ def __init__(self, variant=Fourway):
|
|
|
+ self.variant = variant
|
|
|
+
|
|
|
+ # Additional options for Fourway tests
|
|
|
+ self.tptk = TestOptions.TptkNone
|
|
|
+
|
|
|
+ # Extra option for Fourway and GroupKey tests
|
|
|
+ self.gtkinit = False
|
|
|
+
|
|
|
class ClientState():
|
|
|
UNKNOWN, VULNERABLE, PATCHED = range(3)
|
|
|
IDLE, STARTED, GOT_CANARY, FINISHED = range(4)
|
|
|
|
|
|
- def __init__(self, clientmac, test_group_hs=False, test_tptk=False):
|
|
|
+ def __init__(self, clientmac, options):
|
|
|
self.mac = clientmac
|
|
|
+ self.options = options
|
|
|
self.TK = None
|
|
|
self.vuln_4way = ClientState.UNKNOWN
|
|
|
self.vuln_group = ClientState.UNKNOWN
|
|
@@ -51,10 +67,8 @@ class ClientState():
|
|
|
self.ivs = IvCollection()
|
|
|
self.pairkey_sent_time_prev_iv = None
|
|
|
self.pairkey_intervals_no_iv_reuse = 0
|
|
|
- self.pairkey_tptk = test_tptk
|
|
|
|
|
|
self.groupkey_reset()
|
|
|
- self.groupkey_grouphs = test_group_hs
|
|
|
|
|
|
def groupkey_reset(self):
|
|
|
self.groupkey_state = ClientState.IDLE
|
|
@@ -63,10 +77,6 @@ class ClientState():
|
|
|
self.groupkey_requests_sent = 0
|
|
|
self.groupkey_patched_intervals = -1 # -1 because the first broadcast ARP requests are still valid
|
|
|
|
|
|
- def start_grouphs_test():
|
|
|
- self.groupkey_reset()
|
|
|
- self.groupkey_grouphs = True
|
|
|
-
|
|
|
# TODO: Put in libwifi?
|
|
|
def get_encryption_key(self, hostapd_ctrl):
|
|
|
if self.TK is None:
|
|
@@ -114,7 +124,7 @@ class ClientState():
|
|
|
iv = dot11_get_iv(p)
|
|
|
seq = dot11_get_seqnum(p)
|
|
|
log(INFO, ("%s: IV reuse detected (IV=%d, seq=%d). " +
|
|
|
- "Client is vulnerable to pairwise key reinstallations in the 4-way handshake!") % (self.mac, iv, seq), color="green")
|
|
|
+ "Client reinstalls the pairwise key in the 4-way handshake (this is bad)") % (self.mac, iv, seq), color="green")
|
|
|
self.vuln_4way = ClientState.VULNERABLE
|
|
|
|
|
|
# If it's a higher IV than all previous ones, try to check if the client seems patched
|
|
@@ -134,13 +144,13 @@ class ClientState():
|
|
|
self.vuln_4way = ClientState.PATCHED
|
|
|
|
|
|
# Be sure to clarify *which* type of attack failed (to remind user to test others attacks as well)
|
|
|
- msg = "%s: client DOESN'T seem vulnerable to pairwise key reinstallation in the 4-way handshake"
|
|
|
- if self.pairkey_tptk == KRAckAttackClient.TPTK_NONE:
|
|
|
- msg += " (using standard attack)"
|
|
|
- elif self.pairkey_tptk == KRAckAttackClient.TPTK_REPLAY:
|
|
|
- msg += " (using TPTK attack)"
|
|
|
- elif self.pairkey_tptk == KRAckAttackClient.TPTK_RAND:
|
|
|
- msg += " (using TPTK-RAND attack)"
|
|
|
+ msg = "%s: client DOESN'T reinstall the pairwise key in the 4-way handshake (this is good)"
|
|
|
+ if self.options.tptk == TestOptions.TptkNone:
|
|
|
+ msg += " (used standard attack)"
|
|
|
+ elif self.options.tptk == TestOptions.TptkReplay:
|
|
|
+ msg += " (used TPTK attack)"
|
|
|
+ elif self.options.tptk == TestOptions.TptkRand:
|
|
|
+ msg += " (used TPTK-RAND attack)"
|
|
|
log(INFO, (msg + ".") % self.mac, color="green")
|
|
|
|
|
|
def mark_allzero_key(self, p):
|
|
@@ -148,11 +158,35 @@ class ClientState():
|
|
|
iv = dot11_get_iv(p)
|
|
|
seq = dot11_get_seqnum(p)
|
|
|
log(INFO, ("%s: usage of all-zero key detected (IV=%d, seq=%d). " +
|
|
|
- "Client is vulnerable to (re)installation of an all-zero key in the 4-way handshake!") % (self.mac, iv, seq), color="green")
|
|
|
- log(WARNING, "%s: !!! Other tests are unreliable due to all-zero key usage, please fix this first !!!" % self.mac)
|
|
|
+ "Client (re)installs an all-zero key in the 4-way handshake (this is very bad)") % (self.mac, iv, seq), color="green")
|
|
|
+ log(WARNING, "%s: !!! Other tests are unreliable due to all-zero key usage, please fix this vulnerability first !!!" % self.mac)
|
|
|
self.vuln_4way = ClientState.VULNERABLE
|
|
|
|
|
|
- def groupkey_handle_canary(self, p):
|
|
|
+
|
|
|
+ def broadcast_print_patched(self):
|
|
|
+ if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey]:
|
|
|
+ hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
|
|
|
+ if self.options.gtkinit:
|
|
|
+ log(INFO, "%s: Client installs the group key in the %s handshake with the given replay counter (this is good)" % (self.mac, hstype), color="green")
|
|
|
+ else:
|
|
|
+ log(INFO, "%s: Client DOESN'T reinstall the group key in the %s handshake (this is good)" % (self.mac, hstype), color="green")
|
|
|
+ if self.options.variant == TestOptions.ReplayBroadcast:
|
|
|
+ log(INFO, "%s: Client DOESN'T accept replayed broadcast frames (this is good)" % self.mac, color="green")
|
|
|
+
|
|
|
+ def broadcast_print_vulnerable(self):
|
|
|
+ if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey]:
|
|
|
+ hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
|
|
|
+ if self.options.gtkinit:
|
|
|
+ log(INFO, "%s: Client installs the group key in the %s handshake with a zero replay counter (this is bad)" % (self.mac, hstype), color="green")
|
|
|
+ else:
|
|
|
+ log(INFO, "%s: Client reinstalls the group key in the %s handshake (this is bad)" % (self.mac, hstype), color="green")
|
|
|
+ log(INFO, " Or client accepts replayed broadcast frames (%d replies to replayed broadcast ARPs)" % \
|
|
|
+ self.groupkey_num_canaries, color="green")
|
|
|
+ if self.options.variant == TestOptions.ReplayBroadcast:
|
|
|
+ log(INFO, "%s: Client accepts replayed broadcast frames (this is bad)" % self.mac, color="green")
|
|
|
+ log(INFO, " Fix this before testing for group key (re)installations!", color="green")
|
|
|
+
|
|
|
+ def broadcast_process_reply(self, p):
|
|
|
"""Handle replies to the replayed ARP broadcast request (which reuses an IV)"""
|
|
|
|
|
|
# Must be testing this client, and must not be a benign retransmission
|
|
@@ -166,56 +200,41 @@ class ClientState():
|
|
|
# the first few broadcast ARP requests still use a valid (not yet used) IV.
|
|
|
if self.groupkey_num_canaries >= 5:
|
|
|
assert self.vuln_group != ClientState.VULNERABLE
|
|
|
- log(INFO, "%s: Received %d unique replies to replayed broadcast ARP requests. Client is vulnerable to group" \
|
|
|
- % (self.mac, self.groupkey_num_canaries), color="green")
|
|
|
- log(INFO, " key reinstallations in the %s handshake (or client accepts replayed broadcast frames)!" \
|
|
|
- % ("group key" if self.groupkey_grouphs else "4-way"), color="green")
|
|
|
self.vuln_group = ClientState.VULNERABLE
|
|
|
self.groupkey_state = ClientState.FINISHED
|
|
|
-
|
|
|
- # Remember that we got a reply this interval (see groupkey_track_request to detect patched clients)
|
|
|
+ self.broadcast_print_vulnerable()
|
|
|
+ # Remember that we got a reply this interval (see broadcast_check_replies to detect patched clients)
|
|
|
else:
|
|
|
self.groupkey_state = ClientState.GOT_CANARY
|
|
|
|
|
|
self.groupkey_prev_canary_time = p.time
|
|
|
|
|
|
- def groupkey_track_request(self):
|
|
|
+ def broadcast_check_replies(self):
|
|
|
"""Track when we send broadcast ARP requests, and determine if a client seems patched"""
|
|
|
-
|
|
|
- if self.vuln_group != ClientState.UNKNOWN: return
|
|
|
- hstype = "group key" if self.groupkey_grouphs else "4-way"
|
|
|
-
|
|
|
- # Show a message when we started with testing the client
|
|
|
if self.groupkey_state == ClientState.IDLE:
|
|
|
- log(STATUS, "%s: client has IP address -> testing for group key reinstallation in the %s handshake" % (self.mac, hstype))
|
|
|
- self.groupkey_state = ClientState.STARTED
|
|
|
+ return
|
|
|
|
|
|
if self.groupkey_requests_sent == 4:
|
|
|
# We sent four broadcast ARP requests, and got at least one got a reply. This indicates the client is vulnerable.
|
|
|
if self.groupkey_state == ClientState.GOT_CANARY:
|
|
|
- log(DEBUG, "%s: got a reply to broadcast ARP during this interval" % self.mac)
|
|
|
+ log(DEBUG, "%s: got a reply to broadcast ARPs during this interval" % self.mac)
|
|
|
self.groupkey_state = ClientState.STARTED
|
|
|
|
|
|
# We sent four broadcast ARP requests, and didn't get a reply to any. This indicates the client is patched.
|
|
|
elif self.groupkey_state == ClientState.STARTED:
|
|
|
self.groupkey_patched_intervals += 1
|
|
|
- log(DEBUG, "%s: no group IV resets seem to have occured for %d interval(s)" % (self.mac, self.groupkey_patched_intervals))
|
|
|
+ log(DEBUG, "%s: didn't get reply received to broadcast ARPs during this interval" % self.mac)
|
|
|
self.groupkey_state = ClientState.STARTED
|
|
|
|
|
|
self.groupkey_requests_sent = 0
|
|
|
|
|
|
# If the client appears secure for several intervals (see above), it's likely patched
|
|
|
if self.groupkey_patched_intervals >= 5 and self.vuln_group == ClientState.UNKNOWN:
|
|
|
- log(INFO, "%s: client DOESN'T seem vulnerable to group key reinstallation in the %s handshake." % (self.mac, hstype), color="green")
|
|
|
self.vuln_group = ClientState.PATCHED
|
|
|
self.groupkey_state = ClientState.FINISHED
|
|
|
-
|
|
|
- self.groupkey_requests_sent += 1
|
|
|
- log(DEBUG, "%s: sent %d broadcasts ARPs this interval" % (self.mac, self.groupkey_requests_sent))
|
|
|
+ self.broadcast_print_patched()
|
|
|
|
|
|
class KRAckAttackClient():
|
|
|
- TPTK_NONE, TPTK_REPLAY, TPTK_RAND = range(3)
|
|
|
-
|
|
|
def __init__(self):
|
|
|
# Parse hostapd.conf
|
|
|
self.script_path = os.path.dirname(os.path.realpath(__file__))
|
|
@@ -231,8 +250,7 @@ class KRAckAttackClient():
|
|
|
# Set other variables
|
|
|
self.nic_iface = interface
|
|
|
self.nic_mon = interface + "mon"
|
|
|
- self.test_grouphs = False
|
|
|
- self.test_tptk = KRAckAttackClient.TPTK_NONE
|
|
|
+ self.options = None
|
|
|
try:
|
|
|
self.apmac = scapy.arch.get_if_hwaddr(interface)
|
|
|
except:
|
|
@@ -308,7 +326,7 @@ class KRAckAttackClient():
|
|
|
# Inspect encrypt frames for IV reuse & handle replayed frames rejected by the kernel
|
|
|
elif p.addr1 == self.apmac and Dot11WEP in p:
|
|
|
if not clientmac in self.clients:
|
|
|
- self.clients[clientmac] = ClientState(clientmac, test_group_hs=self.test_grouphs, test_tptk=self.test_tptk)
|
|
|
+ self.clients[clientmac] = ClientState(clientmac, options=options)
|
|
|
client = self.clients[clientmac]
|
|
|
|
|
|
iv = dot11_get_iv(p)
|
|
@@ -316,7 +334,7 @@ class KRAckAttackClient():
|
|
|
|
|
|
if decrypt_ccmp(p, "\x00" * 16).startswith("\xAA\xAA\x03\x00\x00\x00"):
|
|
|
client.mark_allzero_key(p)
|
|
|
- if not self.test_grouphs:
|
|
|
+ if self.options.variant == TestOptions.Fourway:
|
|
|
client.check_pairwise_reinstall(p)
|
|
|
if client.is_iv_reused(p):
|
|
|
self.handle_replay(p)
|
|
@@ -331,13 +349,35 @@ class KRAckAttackClient():
|
|
|
client = self.clients[clientmac]
|
|
|
|
|
|
if ARP in p and p[ARP].pdst == self.group_ip:
|
|
|
- client.groupkey_handle_canary(p)
|
|
|
+ client.broadcast_process_reply(p)
|
|
|
|
|
|
def handle_eth_rx(self):
|
|
|
p = self.sock_eth.recv()
|
|
|
if p == None or not Ether in p: return
|
|
|
self.process_eth_rx(p)
|
|
|
|
|
|
+ def broadcast_send_request(self, client):
|
|
|
+ clientip = self.dhcp.leases[client.mac]
|
|
|
+
|
|
|
+ # Print a message when we start testing the client --- XXX this should be in the client?
|
|
|
+ if client.groupkey_state == ClientState.IDLE:
|
|
|
+ hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
|
|
|
+ log(STATUS, "%s: client has IP address -> testing for group key reinstallation in the %s handshake" % (client.mac, hstype))
|
|
|
+ client.groupkey_state = ClientState.STARTED
|
|
|
+
|
|
|
+ # Send a new handshake message when testing the group key handshake
|
|
|
+ if self.options.variant == TestOptions.Groupkey:
|
|
|
+ cmd = "RESEND_GROUP_M1 " + client.mac
|
|
|
+ cmd += "maxrsc" if self.options.gtkinit else ""
|
|
|
+ hostapd_command(self.hostapd_ctrl, cmd)
|
|
|
+
|
|
|
+ # Send a replayed broadcast ARP request to the client
|
|
|
+ request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.group_ip, pdst=clientip)
|
|
|
+ self.sock_eth.send(request)
|
|
|
+ client.groupkey_requests_sent += 1
|
|
|
+ log(INFO, "%s: sending broadcast ARP to %s from %s (sent %d ARPs this interval)" % (client.mac,
|
|
|
+ clientip, self.group_ip, client.groupkey_requests_sent))
|
|
|
+
|
|
|
def configure_interfaces(self):
|
|
|
log(STATUS, "Note: disable Wi-Fi in network manager & disable hardware encryption. Both may interfere with this script.")
|
|
|
|
|
@@ -356,9 +396,9 @@ class KRAckAttackClient():
|
|
|
subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
|
|
|
subprocess.check_output(["ifconfig", self.nic_mon, "up"])
|
|
|
|
|
|
- def run(self, test_grouphs=False, test_tptk=False):
|
|
|
+ def run(self, options):
|
|
|
+ self.options = options
|
|
|
self.configure_interfaces()
|
|
|
- self.test_tptk = test_tptk
|
|
|
|
|
|
# Open the patched hostapd instance that carries out tests and let it start
|
|
|
log(STATUS, "Starting hostapd ...")
|
|
@@ -415,29 +455,25 @@ class KRAckAttackClient():
|
|
|
for client in self.clients.values():
|
|
|
# 1. Test the 4-way handshake. We rely on an encrypted message 4 as reply to detect pairwise
|
|
|
# key reinstallations reinstallations.
|
|
|
- if not self.test_grouphs and client.vuln_4way != ClientState.VULNERABLE:
|
|
|
+ if self.options.variant == TestOptions.Fourway and client.vuln_4way != ClientState.VULNERABLE:
|
|
|
# First inject a message 1 if requested using the TPTK option
|
|
|
- if self.test_tptk == KRAckAttackClient.TPTK_REPLAY:
|
|
|
+ if self.options.tptk == TestOptions.TptkReplay:
|
|
|
hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac)
|
|
|
- elif self.test_tptk == KRAckAttackClient.TPTK_RAND:
|
|
|
+ elif self.options.tptk == TestOptions.TptkRand:
|
|
|
hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac + " change-anonce")
|
|
|
|
|
|
- hostapd_command(self.hostapd_ctrl, "RESEND_M3 " + client.mac)
|
|
|
+ hostapd_command(self.hostapd_ctrl, "RESEND_M3 " + client.mac + ("maxrsc" if self.options.gtkinit else ""))
|
|
|
|
|
|
- # 2. Test reinstallation of the group key (in both the 4-way and group key handshake). Keep
|
|
|
- # injecting to PATCHED clients (just to be sure they keep rejecting replayed frames)
|
|
|
- if client.vuln_group != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
|
|
|
- if self.test_grouphs:
|
|
|
- hostapd_command(self.hostapd_ctrl, "RESEND_GROUP_M1 " + client.mac)
|
|
|
+ # 2. Test if broadcast ARP request are accepted by the client. Keep injecting even
|
|
|
+ # to PATCHED clients (just to be sure they keep rejecting replayed frames).
|
|
|
+ if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey, TestOptions.ReplayBroadcast]:
|
|
|
+ # 2a. Check if we got replies to previous requests (and determine if vulnerable)
|
|
|
+ client.broadcast_check_replies()
|
|
|
|
|
|
- clientip = self.dhcp.leases[client.mac]
|
|
|
- client.groupkey_track_request()
|
|
|
+ # 2b. Send new broadcast ARP requests (and handshake messages if needed)
|
|
|
+ if client.vuln_group != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
|
|
|
+ self.broadcast_send_request(client)
|
|
|
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- log(INFO, "%s: sending broadcast ARP to %s from %s" % (client.mac, clientip, self.group_ip))
|
|
|
- request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.group_ip, pdst=clientip)
|
|
|
- self.sock_eth.send(request)
|
|
|
|
|
|
def stop(self):
|
|
|
log(STATUS, "Closing hostapd and cleaning up ...")
|
|
@@ -491,23 +527,48 @@ def hostapd_read_config(config):
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
if "--help" in sys.argv or "-h" in sys.argv:
|
|
|
- print "See README.md for instructions on how to use this script"
|
|
|
+ print "\nSee README.md for usage instructions. Accepted parameters are"
|
|
|
+ print "\n\t" + "\n\t".join(["--replay-broadcast", "--group", "--tptk", "--tptk-rand", "--gtkinit", "--debug"]) + "\n"
|
|
|
quit(1)
|
|
|
|
|
|
- test_grouphs = argv_pop_argument("--group")
|
|
|
- test_tptk_replay = argv_pop_argument("--tptk")
|
|
|
- test_tptk_rand = argv_pop_argument("--tptk-rand")
|
|
|
- while argv_pop_argument("--debug"):
|
|
|
- global_log_level -= 1
|
|
|
+ options = TestOptions()
|
|
|
|
|
|
- test_tptk = KRAckAttackClient.TPTK_NONE
|
|
|
- if test_tptk_replay and test_tptk_rand:
|
|
|
- log(ERROR, "Please only specify --tptk or --tptk-rand")
|
|
|
- elif test_tptk_replay:
|
|
|
- test_tptk = KRAckAttackClient.TPTK_REPLAY
|
|
|
- elif test_tptk_rand:
|
|
|
- test_tptk = KRAckAttackClient.TPTK_RAND
|
|
|
+ # Parse the type of test variant to execute
|
|
|
+ replay_broadcast = argv_pop_argument("--replay-broadcast")
|
|
|
+ replay_unicast = argv_pop_argument("--replay-unicast")
|
|
|
+ groupkey = argv_pop_argument("--group")
|
|
|
+ fourway = argv_pop_argument("--fourway")
|
|
|
+ if replay_broadcast + replay_unicast + fourway + groupkey > 1:
|
|
|
+ print "You can only select one argument of out replay-broadcast, replay-unicast, fourway, and group"
|
|
|
+ quit(1)
|
|
|
+ if replay_broadcast:
|
|
|
+ options.variant = TestOptions.ReplayBroadcast
|
|
|
+ elif replay_unicast:
|
|
|
+ options.variant = TestOptions.ReplayUnicast
|
|
|
+ elif groupkey:
|
|
|
+ options.variant = TestOptions.Groupkey
|
|
|
+ else:
|
|
|
+ options.variant = TestOptions.Fourway
|
|
|
+
|
|
|
+ # Parse options for the 4-way handshake
|
|
|
+ tptk = argv_pop_argument("--tptk")
|
|
|
+ tptk_rand = argv_pop_argument("--tptk-rand")
|
|
|
+ if tptk + tptk_rand > 1:
|
|
|
+ print "You can only select one argument of out tptk and tptk-rand"
|
|
|
+ quit(1)
|
|
|
+ if tptk:
|
|
|
+ options.tptk = TestOptions.TptkReplay
|
|
|
+ elif tptk_rand:
|
|
|
+ options.tptk = TestOptions.TptkRand
|
|
|
+ else:
|
|
|
+ options.tptk = TestOptions.TptkNone
|
|
|
+
|
|
|
+ # Parse remaining options
|
|
|
+ options.gtkinit = argv_pop_argument("--gtkinit")
|
|
|
+ while argv_pop_argument("--debug"):
|
|
|
+ libwifi.global_log_level -= 1
|
|
|
|
|
|
+ # Now start the tests
|
|
|
attack = KRAckAttackClient()
|
|
|
atexit.register(cleanup)
|
|
|
- attack.run(test_grouphs=test_grouphs, test_tptk=test_tptk)
|
|
|
+ attack.run(options=options)
|