Browse Source

krackattack: code cleanup and better comments

Mathy 7 years ago
parent
commit
5764131e50
1 changed files with 43 additions and 26 deletions
  1. 43 26
      krackattack/krack-test-client.py

+ 43 - 26
krackattack/krack-test-client.py

@@ -7,8 +7,6 @@ from datetime import datetime
 from wpaspy import Ctrl
 from Cryptodome.Cipher import AES
 
-# TODO: Clean up code
-
 USAGE = """{name} - Tool to test Key Reinstallation Attacks against clients
 
 To test wheter a client is vulnerable to Key Reinstallation Attack against
@@ -108,7 +106,7 @@ the 4-way handshake or group key handshake, take the following steps:
 # Future work:
 # - Detect if the client reinstalls an all-zero encryption key (wpa_supplicant v2.4 and 2.5)
 # - Ability to test the group key handshake against specific clients only
-# - Individual test to see if the client accepts replayed broadcast traffic (without key reinstallation)
+# - Individual test to see if the client accepts replayed broadcast traffic (without performing key reinstallation)
 
 # After how many seconds a new message 3, or new group key message 1, is sent.
 HANDSHAKE_TRANSMIT_INTERVAL = 2
@@ -227,6 +225,7 @@ class IvInfo():
 		self.time = p.time
 
 	def is_reused(self, p):
+		"""Check if frame p reuses an IV and is not a retransmitted frame"""
 		iv = dot11_get_iv(p)
 		seq = dot11_get_seqnum(p)
 		return self.iv == iv and self.seq != seq and p.time >= self.time + 1
@@ -240,11 +239,11 @@ class ClientState():
 		self.TK = None
 		self.vuln_4way = ClientState.UNKNOWN
 		self.vuln_group = ClientState.UNKNOWN
-		# FIXME: Own variable for group handshake result?
+		# FIXME: Separate variable for group handshake result?
 
-		self.ivs = dict() # key is the IV value
-		self.encdata_prev = None
-		self.encdata_intervals = 0
+		self.ivs = dict() # maps IV values to IvInfo objects
+		self.pairkey_sent_time_prev_iv = None
+		self.pairkey_intervals_no_iv_reuse = 0
 
 		self.groupkey_reset()
 		self.groupkey_grouphs = test_group_hs
@@ -265,7 +264,7 @@ class ClientState():
 			# Clear old replies and messages from the hostapd control interface
 			while hostapd_ctrl.pending():
 				hostapd_ctrl.recv()
-			# Contact our modified Hostapd instrance to request the pairwise key
+			# Contact our modified Hostapd instance to request the pairwise key
 			response = hostapd_ctrl.request("GET_TK " + self.mac)
 			if not "FAIL" in response:
 				self.TK = response.strip().decode("hex")
@@ -300,7 +299,7 @@ class ClientState():
 		self.ivs[iv] = IvInfo(p)
 
 	def is_iv_reused(self, p):
-		"""Returns True if this is an *observed* IV reuse"""
+		"""Returns True if this is an *observed* IV reuse and not just a retransmission"""
 		iv = dot11_get_iv(p)
 		return iv in self.ivs and self.ivs[iv].is_reused(p)
 
@@ -311,7 +310,9 @@ class ClientState():
 		return iv > max(self.ivs.keys())
 
 	def check_pairwise_reinstall(self, p):
-		# If this is gaurenteed to be IV reuse, mark the client as vulnerable
+		"""Inspect whether the IV is reused, or whether the client seem to be patched"""
+
+		# If this is gaurenteed IV reuse (and not just a benign retransmission), mark the client as vulnerable
 		if self.is_iv_reused(p):
 			if self.vuln_4way != ClientState.VULNERABLE:
 				iv = dot11_get_iv(p)
@@ -324,26 +325,31 @@ class ClientState():
 		elif self.vuln_4way == ClientState.UNKNOWN and self.is_new_iv(p):
 			# Save how many intervals we received a data packet without IV reset. Use twice the
 			# transmission interval of message 3, in case one message 3 is lost due to noise.
-			if self.encdata_prev is None:
-				self.encdata_prev = p.time
-			elif self.encdata_prev + 2 * HANDSHAKE_TRANSMIT_INTERVAL + 1 <= p.time:
-				self.encdata_intervals += 1
-				self.encdata_prev = p.time
+			if self.pairkey_sent_time_prev_iv is None:
+				self.pairkey_sent_time_prev_iv = p.time
+			elif self.pairkey_sent_time_prev_iv + 2 * HANDSHAKE_TRANSMIT_INTERVAL + 1 <= p.time:
+				self.pairkey_intervals_no_iv_reuse += 1
+				self.pairkey_sent_time_prev_iv = p.time
 				log(DEBUG, "%s: no pairwise IV resets seem to have occured for one interval" % self.mac)
 
-			# If several reset attempts did not appear to reset the IV, the client is likely patched.
-			# Wait for enough reset attempts to occur and test, to avoid giving the wrong result.
-			if self.encdata_intervals >= 5 and self.vuln_4way == ClientState.UNKNOWN:
+			# If during several intervals all IV reset attempts failed, the client is likely patched.
+			# We wait for enough such intervals to occur, to avoid getting a wrong result.
+			if self.pairkey_intervals_no_iv_reuse >= 5 and self.vuln_4way == ClientState.UNKNOWN:
 				self.vuln_4way = ClientState.PATCHED
 				log(INFO, "%s: client DOESN'T seem vulnerable to pairwise key reinstallation in the 4-way handshake." % self.mac, color="green")
 
 	def groupkey_handle_canary(self, p):
+		"""Handle replies to the replayed ARP broadcast request (which reuses an IV)"""
+
+		# Must be testing this client, and must not be a benign retransmission
 		if not self.groupkey_state in [ClientState.STARTED, ClientState.GOT_CANARY]: return
 		if self.groupkey_prev_canary_time + 1 > p.time: return
 
 		self.groupkey_num_canaries += 1
-		log(DEBUG, "%s: received broadcast ARP replay number %d\n" % (self.mac, self.groupkey_num_canaries))
+		log(DEBUG, "%s: received %d replies to the replayed broadcast ARP requests\n" % (self.mac, self.groupkey_num_canaries))
 
+		# We wait for several replies before marking the client as vulnerable, because
+		# the first few broadcast ARP requests still use a valid (not yet used) IV.
 		if self.groupkey_num_canaries >= 5:
 			assert self.vuln_group != ClientState.VULNERABLE
 			log(INFO, "%s: Received %d unique replies to replayed broadcast ARP requests. Client is vulnerable to group" \
@@ -353,26 +359,30 @@ class ClientState():
 			self.vuln_group = ClientState.VULNERABLE
 			self.groupkey_state = ClientState.FINISHED
 
+		# Remember that we got a reply this interval (see groupkey_track_request to detect patched clients)
 		else:
 			self.groupkey_state = ClientState.GOT_CANARY
 
 		self.groupkey_prev_canary_time = p.time
 
 	def groupkey_track_request(self):
+		"""Track when we went broadcast ARP requests, and determine if a client seems patched"""
+
 		if self.vuln_group != ClientState.UNKNOWN: return
 		hstype = "group key" if self.groupkey_grouphs else "4-way"
 
+		# Show a message when we started with testing the client
 		if self.groupkey_state == ClientState.IDLE:
 			log(STATUS, "%s: client has IP address -> testing for group key reinstallation in the %s handshake" % (self.mac, hstype))
 			self.groupkey_state = ClientState.STARTED
 
 		if self.groupkey_requests_sent == 3:
-			# Seems like the client DID reinstall the group key in this interval
+			# We sent three broadcast ARP requests, and at least one got a reply. Indication that client is vulnerable.
 			if self.groupkey_state == ClientState.GOT_CANARY:
 				log(DEBUG, "%s: got a reply to broadcast ARP during this interval" % self.mac)
 				self.groupkey_state = ClientState.STARTED
 
-			# Seems like the client DIDN'T reinstall the group key in this interval
+			# We sent three broadcast ARP requests, and didn't get a reply to any. Indication that client is patched.
 			elif self.groupkey_state == ClientState.STARTED:
 				self.groupkey_patched_intervals += 1
 				log(DEBUG, "%s: no group IV resets seem to have occured for %d interval(s)" % (self.mac, self.groupkey_patched_intervals))
@@ -380,7 +390,7 @@ class ClientState():
 
 			self.groupkey_requests_sent = 0
 
-		# If the client appears secure for several intervals, it's likely patched
+		# If the client appears secure for several intervals (see above), it's likely patched
 		if self.groupkey_patched_intervals >= 5 and self.vuln_group == ClientState.UNKNOWN:
 			log(INFO, "%s: client DOESN'T seem vulnerable to group key reinstallation in the %s handshake." % (self.mac, hstype), color="green")
 			self.vuln_group = ClientState.PATCHED
@@ -420,6 +430,8 @@ class KRAckAttackClient():
 			log(DEBUG, "%s: Removing ClientState object" % clientmac)
 
 	def handle_replay(self, p):
+		"""Replayed frames (caused by a pairwise key reinstallation) are rejected by the kernel.
+		This process these frames manually so we can still test reinstallations of the group key."""
 		if not Dot11WEP in p: return
 
 		# Reconstruct Ethernet header
@@ -452,16 +464,19 @@ class KRAckAttackClient():
 		if p == None: return
 		if p.type == 1: return
 
-		# Note: here we cannot verify that the NIC is indeed reusing IVs when sending the
-		# broadcast ARP requests, because it may override them in the firmware/hardware.
+		# Note: we cannot verify that the NIC is indeed reusing IVs when sending the broadcast
+		# ARP requests, because it may override them in the firmware/hardware (some Atheros
+		# Wi-Fi NICs do no properly reset the Tx group key IV when using hardware encryption).
 
 		# The first bit in FCfield is set if the frames is "to-DS"
 		clientmac, apmac = (p.addr1, p.addr2) if (p.FCfield & 2) != 0 else (p.addr2, p.addr1)
 		if apmac != self.apmac: return None
 
+		# Reset info about disconnected clients
 		if Dot11Deauth in p or Dot11Disas in p:
 			self.reset_client_info(clientmac)
 
+		# Inspect encrypt frames for IV reuse & handle replayed frames rejected by the kernel
 		elif p.addr1 == self.apmac and Dot11WEP in p:
 			if not clientmac in self.clients:
 				self.clients[clientmac] = ClientState(clientmac, test_group_hs=self.test_grouphs)
@@ -531,23 +546,25 @@ class KRAckAttackClient():
 		# Configure gateway IP: reply to ARP and ping requests
 		subprocess.check_output(["ifconfig", self.nic_iface, "192.168.100.254"])
 
+		# Use a dedicated IP address for our broadcast ARP requests and replies
 		self.group_ip = self.dhcp.pool.pop()
 		self.group_arp = ARP_sock(sock=self.sock_eth, IP_addr=self.group_ip, ARP_addr=self.apmac)
 
-		# Inform hostapd that we are testing the group key, if applicalbe
+		# If applicable, inform hostapd that we are testing the group key handshake
 		if test_grouphs:
 			self.hostapd_ctrl.request("START_GROUP_TESTS")
 			self.test_grouphs = True
 
 		log(STATUS, "Ready. Connect to this Access Point to start the tests. Make sure the client requests an IP using DHCP!", color="green")
 
-		# Monitor the virtual monitor interface of the AP and perform the needed actions
+		# Monitor both the normal interface and virtual monitor interface of the AP
 		self.next_arp = time.time() + 1
 		while True:
 			sel = select.select([self.sock_mon, self.sock_eth], [], [], 1)
 			if self.sock_mon in sel[0]: self.handle_mon_rx()
 			if self.sock_eth in sel[0]: self.handle_eth_rx()
 
+			# Periodically send the replayed broadcast ARP requests to test for group key reinstallations
 			if time.time() > self.next_arp:
 				self.next_arp = time.time() + HANDSHAKE_TRANSMIT_INTERVAL
 				for client in self.clients.values():