Browse Source

krackattacks: rename variables and clean up output

Mathy Vanhoef 7 years ago
parent
commit
a04905e325
1 changed files with 63 additions and 70 deletions
  1. 63 70
      krackattack/krack-test-client.py

+ 63 - 70
krackattack/krack-test-client.py

@@ -14,17 +14,12 @@ from libwifi import *
 import sys, socket, struct, time, subprocess, atexit, select, os.path
 from wpaspy import Ctrl
 
-# FIXME:
+# Futute work:
 # - If the client installs an all-zero key, we cannot reliably test the group key handshake
-# - We should test decryption using an all-zero key, and warn if this seems to succeed
-
-# Future work:
-# - Detect if the client reinstalls an all-zero encryption key (wpa_supplicant v2.4 and 2.5)
-# - Ability to test the group key handshake against specific clients only
-# - Individual test to see if the client accepts replayed broadcast traffic (without performing key reinstallation)
+# - Automatically execute all relevant tests in order
+# - Force client to request a new IP address when connecting
 
 # After how many seconds a new message 3, or new group key message 1, is sent.
-# This value must match the one in `../src/ap/wpa_auth.c` (same variable name).
 HANDSHAKE_TRANSMIT_INTERVAL = 2
 
 #### Utility Commands ####
@@ -45,8 +40,7 @@ def hostapd_command(hostapd_ctrl, cmd):
 #### Main Testing Code ####
 
 class TestOptions():
-	### XXX should be grouphs instead of groupkey
-	ReplayBroadcast, ReplayUnicast, Fourway, Groupkey = range(4)
+	ReplayBroadcast, ReplayUnicast, Fourway, Grouphs = range(4)
 	TptkNone, TptkReplay, TptkRand = range(3)
 
 	def __init__(self, variant=Fourway):
@@ -55,7 +49,7 @@ class TestOptions():
 		# Additional options for Fourway tests
 		self.tptk = TestOptions.TptkNone
 
-		# Extra option for Fourway and GroupKey tests
+		# Extra option for Fourway and Grouphs tests
 		self.gtkinit = False
 
 class ClientState():
@@ -67,21 +61,20 @@ class ClientState():
 		self.options = options
 		self.TK = None
 		self.vuln_4way = ClientState.UNKNOWN
-		self.vuln_group = ClientState.UNKNOWN
-		# FIXME: Separate variable for group handshake result?
+		self.vuln_bcast = ClientState.UNKNOWN
 
 		self.ivs = IvCollection()
 		self.pairkey_sent_time_prev_iv = None
 		self.pairkey_intervals_no_iv_reuse = 0
 
-		self.groupkey_reset()
+		self.broadcast_reset()
 
-	def groupkey_reset(self):
-		self.groupkey_state = ClientState.IDLE
-		self.groupkey_prev_canary_time = 0
-		self.groupkey_num_canaries = 0
-		self.groupkey_requests_sent = 0
-		self.groupkey_patched_intervals = -1 # -1 because the first broadcast ARP requests are still valid
+	def broadcast_reset(self):
+		self.broadcast_state = ClientState.IDLE
+		self.broadcast_prev_canary_time = 0
+		self.broadcast_num_canaries = 0
+		self.broadcast_requests_sent = 0
+		self.broadcast_patched_intervals = -1 # -1 because the first broadcast ARP requests are still valid
 
 	# TODO: Put in libwifi?
 	def get_encryption_key(self, hostapd_ctrl):
@@ -161,14 +154,15 @@ class ClientState():
 			iv = dot11_get_iv(p)
 			seq = dot11_get_seqnum(p)
 			log(INFO, ("%s: usage of all-zero key detected (IV=%d, seq=%d). " +
-				"Client (re)installs an all-zero key in the 4-way handshake (this is very bad)") % (self.mac, iv, seq), color="green")
+				"Client (re)installs an all-zero key in the 4-way handshake (this is very bad).") % (self.mac, iv, seq), color="green")
 			log(WARNING, "%s: !!! Other tests are unreliable due to all-zero key usage, please fix this vulnerability first !!!" % self.mac)
 		self.vuln_4way = ClientState.VULNERABLE
 
 
 	def broadcast_print_patched(self):
-		if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey]:
-			hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
+		if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
+			# TODO: Mention which variant of the 4-way handshake test was used
+			hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
 			if self.options.gtkinit:
 				log(INFO, "%s: Client installs the group key in the %s handshake with the given replay counter (this is good)" % (self.mac, hstype), color="green")
 			else:
@@ -177,64 +171,63 @@ class ClientState():
 			log(INFO, "%s: Client DOESN'T accept replayed broadcast frames (this is good)" % self.mac, color="green")
 
 	def broadcast_print_vulnerable(self):
-		if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey]:
-			hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
+		if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
+			hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
 			if self.options.gtkinit:
-				log(INFO, "%s: Client installs the group key in the %s handshake with a zero replay counter (this is bad)" % (self.mac, hstype), color="green")
+				log(INFO, "%s: Client always installs the group key in the %s handshake with a zero replay counter (this is bad)." % (self.mac, hstype), color="green")
 			else:
-				log(INFO, "%s: Client reinstalls the group key in the %s handshake (this is bad)" % (self.mac, hstype), color="green")
-			log(INFO, "                    Or client accepts replayed broadcast frames (%d replies to replayed broadcast ARPs)" % \
-				self.groupkey_num_canaries, color="green")
+				log(INFO, "%s: Client reinstalls the group key in the %s handshake (this is bad)." % (self.mac, hstype), color="green")
+			log(INFO, "                   Or client accepts replayed broadcast frames (see --replay-broadcast).", color="green")
 		if self.options.variant == TestOptions.ReplayBroadcast:
-			log(INFO, "%s: Client accepts replayed broadcast frames (this is bad)" % self.mac, color="green")
-			log(INFO, "                    Fix this before testing for group key (re)installations!", color="green")
+			log(INFO, "%s: Client accepts replayed broadcast frames (this is bad)." % self.mac, color="green")
+			log(INFO, "                   Fix this before testing for group key (re)installations!", color="green")
 
 	def broadcast_process_reply(self, p):
 		"""Handle replies to the replayed ARP broadcast request (which reuses an IV)"""
 
 		# Must be testing this client, and must not be a benign retransmission
-		if not self.groupkey_state in [ClientState.STARTED, ClientState.GOT_CANARY]: return
-		if self.groupkey_prev_canary_time + 1 > p.time: return
+		if not self.broadcast_state in [ClientState.STARTED, ClientState.GOT_CANARY]: return
+		if self.broadcast_prev_canary_time + 1 > p.time: return
 
-		self.groupkey_num_canaries += 1
-		log(DEBUG, "%s: received %d replies to the replayed broadcast ARP requests" % (self.mac, self.groupkey_num_canaries))
+		self.broadcast_num_canaries += 1
+		log(DEBUG, "%s: received %d replies to the replayed broadcast ARP requests" % (self.mac, self.broadcast_num_canaries))
 
 		# We wait for several replies before marking the client as vulnerable, because
 		# the first few broadcast ARP requests still use a valid (not yet used) IV.
-		if self.groupkey_num_canaries >= 5:
-			assert self.vuln_group != ClientState.VULNERABLE
-			self.vuln_group = ClientState.VULNERABLE
-			self.groupkey_state = ClientState.FINISHED
+		if self.broadcast_num_canaries >= 5:
+			assert self.vuln_bcast != ClientState.VULNERABLE
+			self.vuln_bcast = ClientState.VULNERABLE
+			self.broadcast_state = ClientState.FINISHED
 			self.broadcast_print_vulnerable()
 		# Remember that we got a reply this interval (see broadcast_check_replies to detect patched clients)
 		else:
-			self.groupkey_state = ClientState.GOT_CANARY
+			self.broadcast_state = ClientState.GOT_CANARY
 
-		self.groupkey_prev_canary_time = p.time
+		self.broadcast_prev_canary_time = p.time
 
 	def broadcast_check_replies(self):
 		"""Track when we send broadcast ARP requests, and determine if a client seems patched"""
-		if self.groupkey_state == ClientState.IDLE:
+		if self.broadcast_state == ClientState.IDLE:
 			return
 
-		if self.groupkey_requests_sent == 4:
+		if self.broadcast_requests_sent == 4:
 			# We sent four broadcast ARP requests, and got at least one got a reply. This indicates the client is vulnerable.
-			if self.groupkey_state == ClientState.GOT_CANARY:
+			if self.broadcast_state == ClientState.GOT_CANARY:
 				log(DEBUG, "%s: got a reply to broadcast ARPs during this interval" % self.mac)
-				self.groupkey_state = ClientState.STARTED
+				self.broadcast_state = ClientState.STARTED
 
 			# We sent four broadcast ARP requests, and didn't get a reply to any. This indicates the client is patched.
-			elif self.groupkey_state == ClientState.STARTED:
-				self.groupkey_patched_intervals += 1
+			elif self.broadcast_state == ClientState.STARTED:
+				self.broadcast_patched_intervals += 1
 				log(DEBUG, "%s: didn't get reply received to broadcast ARPs during this interval" % self.mac)
-				self.groupkey_state = ClientState.STARTED
+				self.broadcast_state = ClientState.STARTED
 
-			self.groupkey_requests_sent = 0
+			self.broadcast_requests_sent = 0
 
 		# If the client appears secure for several intervals (see above), it's likely patched
-		if self.groupkey_patched_intervals >= 5 and self.vuln_group == ClientState.UNKNOWN:
-			self.vuln_group = ClientState.PATCHED
-			self.groupkey_state = ClientState.FINISHED
+		if self.broadcast_patched_intervals >= 5 and self.vuln_bcast == ClientState.UNKNOWN:
+			self.vuln_bcast = ClientState.PATCHED
+			self.broadcast_state = ClientState.FINISHED
 			self.broadcast_print_patched()
 
 class KRAckAttackClient():
@@ -266,8 +259,8 @@ class KRAckAttackClient():
 		self.hostapd_ctrl = None
 
 		self.dhcp = None
-		self.group_ip = None
-		self.group_arp = None
+		self.broadcast_sender_ip = None
+		self.broadcast_arp_sock = None
 
 		self.clients = dict()
 
@@ -345,13 +338,13 @@ class KRAckAttackClient():
 
 	def process_eth_rx(self, p):
 		self.dhcp.reply(p)
-		self.group_arp.reply(p)
+		self.broadcast_arp_sock.reply(p)
 
 		clientmac = p[Ether].src
 		if not clientmac in self.clients: return
 		client = self.clients[clientmac]
 
-		if ARP in p and p[ARP].pdst == self.group_ip:
+		if ARP in p and p[ARP].pdst == self.broadcast_sender_ip:
 			client.broadcast_process_reply(p)
 
 	def handle_eth_rx(self):
@@ -363,23 +356,23 @@ class KRAckAttackClient():
 		clientip = self.dhcp.leases[client.mac]
 
 		# Print a message when we start testing the client --- XXX this should be in the client?
-		if client.groupkey_state == ClientState.IDLE:
-			hstype = "group key" if self.options.variant == TestOptions.Groupkey else "4-way"
-			log(STATUS, "%s: client has IP address -> testing for group key reinstallation in the %s handshake" % (client.mac, hstype))
-			client.groupkey_state = ClientState.STARTED
+		if client.broadcast_state == ClientState.IDLE:
+			hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
+			log(STATUS, "%s: client has IP address -> now sending replayed broadcast ARP packets" % client.mac)
+			client.broadcast_state = ClientState.STARTED
 
 		# Send a new handshake message when testing the group key handshake
-		if self.options.variant == TestOptions.Groupkey:
+		if self.options.variant == TestOptions.Grouphs:
 			cmd = "RESEND_GROUP_M1 " + client.mac
 			cmd += "maxrsc" if self.options.gtkinit else ""
 			hostapd_command(self.hostapd_ctrl, cmd)
 
 		# Send a replayed broadcast ARP request to the client
-		request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.group_ip, pdst=clientip)
+		request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.broadcast_sender_ip, pdst=clientip)
 		self.sock_eth.send(request)
-		client.groupkey_requests_sent += 1
+		client.broadcast_requests_sent += 1
 		log(INFO, "%s: sending broadcast ARP to %s from %s (sent %d ARPs this interval)" % (client.mac,
-			clientip, self.group_ip, client.groupkey_requests_sent))
+			clientip, self.broadcast_sender_ip, client.broadcast_requests_sent))
 
 	def configure_interfaces(self):
 		log(STATUS, "Note: disable Wi-Fi in network manager & disable hardware encryption. Both may interfere with this script.")
@@ -438,8 +431,8 @@ class KRAckAttackClient():
 		subprocess.check_output(["ifconfig", self.nic_iface, "192.168.100.254"])
 
 		# Use a dedicated IP address for our broadcast ARP requests and replies
-		self.group_ip = self.dhcp.pool.pop()
-		self.group_arp = ARP_sock(sock=self.sock_eth, IP_addr=self.group_ip, ARP_addr=self.apmac)
+		self.broadcast_sender_ip = self.dhcp.pool.pop()
+		self.broadcast_arp_sock = ARP_sock(sock=self.sock_eth, IP_addr=self.broadcast_sender_ip, ARP_addr=self.apmac)
 
 		log(STATUS, "Ready. Connect to this Access Point to start the tests. Make sure the client requests an IP using DHCP!", color="green")
 
@@ -455,7 +448,7 @@ class KRAckAttackClient():
 				# When testing if the replay counter of the group key is properly installed, always install
 				# a new group key. Otherwise KRACK patches might interfere with this test.
 				# Otherwise just reset the replay counter of the current group key.
-				if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey] and self.options.gtkinit:
+				if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs] and self.options.gtkinit:
 					hostapd_command(self.hostapd_ctrl, "RENEW_GTK")
 				else:
 					hostapd_command(self.hostapd_ctrl, "RESET_PN FF:FF:FF:FF:FF:FF")
@@ -475,12 +468,12 @@ class KRAckAttackClient():
 
 					# 2. Test if broadcast ARP request are accepted by the client. Keep injecting even
 					#    to PATCHED clients (just to be sure they keep rejecting replayed frames).
-					if self.options.variant in [TestOptions.Fourway, TestOptions.Groupkey, TestOptions.ReplayBroadcast]:
+					if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs, TestOptions.ReplayBroadcast]:
 						# 2a. Check if we got replies to previous requests (and determine if vulnerable)
 						client.broadcast_check_replies()
 
 						# 2b. Send new broadcast ARP requests (and handshake messages if needed)
-						if client.vuln_group != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
+						if client.vuln_bcast != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
 							self.broadcast_send_request(client)
 
 
@@ -555,7 +548,7 @@ if __name__ == "__main__":
 	elif replay_unicast:
 		options.variant = TestOptions.ReplayUnicast
 	elif groupkey:
-		options.variant = TestOptions.Groupkey
+		options.variant = TestOptions.Grouphs
 	else:
 		options.variant = TestOptions.Fourway