libwifi.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. # Copyright (c) 2017, Mathy Vanhoef <Mathy.Vanhoef@cs.kuleuven.be>
  2. #
  3. # This code may be distributed under the terms of the BSD license.
  4. # See README for more details.
  5. from scapy.all import *
  6. from Cryptodome.Cipher import AES
  7. from datetime import datetime
  8. #### Basic output and logging functionality ####
  9. ALL, DEBUG, INFO, STATUS, WARNING, ERROR = range(6)
  10. COLORCODES = { "gray" : "\033[0;37m",
  11. "green" : "\033[0;32m",
  12. "orange": "\033[0;33m",
  13. "red" : "\033[0;31m" }
  14. global_log_level = INFO
  15. def log(level, msg, color=None, showtime=True):
  16. if level < global_log_level: return
  17. if level == DEBUG and color is None: color="gray"
  18. if level == WARNING and color is None: color="orange"
  19. if level == ERROR and color is None: color="red"
  20. print (datetime.now().strftime('[%H:%M:%S] ') if showtime else " "*11) + COLORCODES.get(color, "") + msg + "\033[1;0m"
  21. #### Back-wards compatibility with older scapy
  22. if not "Dot11FCS" in locals():
  23. class Dot11FCS():
  24. pass
  25. if not "Dot11Encrypted" in locals():
  26. class Dot11Encrypted():
  27. pass
  28. class Dot11CCMP():
  29. pass
  30. class Dot11TKIP():
  31. pass
  32. #### Packet Processing Functions ####
  33. class DHCP_sock(DHCP_am):
  34. def __init__(self, **kwargs):
  35. self.sock = kwargs.pop("sock")
  36. self.server_ip = kwargs["gw"]
  37. super(DHCP_sock, self).__init__(**kwargs)
  38. def make_reply(self, req):
  39. rep = super(DHCP_sock, self).make_reply(req)
  40. # Fix scapy bug: set broadcast IP if required
  41. if rep is not None and BOOTP in req and IP in rep:
  42. if req[BOOTP].flags & 0x8000 != 0 and req[BOOTP].giaddr == '0.0.0.0' and req[BOOTP].ciaddr == '0.0.0.0':
  43. rep[IP].dst = "255.255.255.255"
  44. # Explicitly set source IP if requested
  45. if not self.server_ip is None:
  46. rep[IP].src = self.server_ip
  47. return rep
  48. def send_reply(self, reply):
  49. self.sock.send(reply, **self.optsend)
  50. def print_reply(self, req, reply):
  51. log(STATUS, "%s: DHCP reply %s to %s" % (reply.getlayer(Ether).dst, reply.getlayer(BOOTP).yiaddr, reply.dst), color="green")
  52. def remove_client(self, clientmac):
  53. clientip = self.leases[clientmac]
  54. self.pool.append(clientip)
  55. del self.leases[clientmac]
  56. class ARP_sock(ARP_am):
  57. def __init__(self, **kwargs):
  58. self.sock = kwargs.pop("sock")
  59. super(ARP_am, self).__init__(**kwargs)
  60. def send_reply(self, reply):
  61. self.sock.send(reply, **self.optsend)
  62. def print_reply(self, req, reply):
  63. log(STATUS, "%s: ARP: %s ==> %s on %s" % (reply.getlayer(Ether).dst, req.summary(), reply.summary(), self.iff))
  64. #### Packet Processing Functions ####
  65. class MitmSocket(L2Socket):
  66. def __init__(self, **kwargs):
  67. super(MitmSocket, self).__init__(**kwargs)
  68. def send(self, p):
  69. # Hack: set the More Data flag so we can detect injected frames (and so clients stay awake longer)
  70. p.FCfield |= 0x20
  71. L2Socket.send(self, RadioTap()/p)
  72. def _strip_fcs(self, p):
  73. # Older scapy can't handle the optional Frame Check Sequence (FCS) field automatically
  74. if p[RadioTap].present & 2 != 0 and not Dot11FCS in p:
  75. rawframe = str(p[RadioTap])
  76. pos = 8
  77. while ord(rawframe[pos - 1]) & 0x80 != 0: pos += 4
  78. # If the TSFT field is present, it must be 8-bytes aligned
  79. if p[RadioTap].present & 1 != 0:
  80. pos += (8 - (pos % 8))
  81. pos += 8
  82. # Remove FCS if present
  83. if ord(rawframe[pos]) & 0x10 != 0:
  84. return Dot11(str(p[Dot11])[:-4])
  85. return p[Dot11]
  86. def recv(self, x=MTU):
  87. p = L2Socket.recv(self, x)
  88. if p == None or not (Dot11 in p or Dot11FCS in p):
  89. return None
  90. # Hack: ignore frames that we just injected and are echoed back by the kernel
  91. if p.FCfield & 0x20 != 0:
  92. return None
  93. # Strip the FCS if present, and drop the RadioTap header
  94. if Dot11FCS in p:
  95. return p
  96. else:
  97. return self._strip_fcs(p)
  98. def close(self):
  99. super(MitmSocket, self).close()
  100. def dot11_get_seqnum(p):
  101. return p.SC >> 4
  102. def dot11_is_encrypted_data(p):
  103. # All these different cases are explicitly tested to handle older scapy versions
  104. return (p.FCfield & 0x40) or Dot11CCMP in p or Dot11TKIP in p or Dot11WEP in p or Dot11Encrypted in p
  105. def payload_to_iv(payload):
  106. iv0 = payload[0]
  107. iv1 = payload[1]
  108. wepdata = payload[4:8]
  109. # FIXME: Only CCMP is supported (TKIP uses a different IV structure)
  110. return ord(iv0) + (ord(iv1) << 8) + (struct.unpack(">I", wepdata)[0] << 16)
  111. def dot11_get_iv(p):
  112. """
  113. Assume it's a CCMP frame. Old scapy can't handle Extended IVs.
  114. This code only works for CCMP frames.
  115. """
  116. if Dot11CCMP in p or Dot11TKIP in p or Dot11Encrypted in p:
  117. # Scapy uses a heuristic to differentiate CCMP/TKIP and this may be wrong.
  118. # So even when we get a Dot11TKIP frame, we should treat it like a Dot11CCMP frame.
  119. payload = str(p[Dot11Encrypted])
  120. return payload_to_iv(payload)
  121. elif Dot11WEP in p:
  122. wep = p[Dot11WEP]
  123. if wep.keyid & 32:
  124. # FIXME: Only CCMP is supported (TKIP uses a different IV structure)
  125. return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (struct.unpack(">I", wep.wepdata[:4])[0] << 16)
  126. else:
  127. return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (ord(wep.iv[2]) << 16)
  128. elif p.FCfield & 0x40:
  129. return payload_to_iv(p[Raw].load)
  130. else:
  131. log(ERROR, "INTERNAL ERROR: Requested IV of plaintext frame")
  132. return 0
  133. def get_tlv_value(p, type):
  134. if not Dot11Elt in p: return None
  135. el = p[Dot11Elt]
  136. while isinstance(el, Dot11Elt):
  137. if el.ID == type:
  138. return el.info
  139. el = el.payload
  140. return None
  141. def dot11_get_priority(p):
  142. if not Dot11QoS in p: return 0
  143. return ord(str(p[Dot11QoS])[0])
  144. #### Crypto functions and util ####
  145. def get_ccmp_payload(p):
  146. if Dot11WEP in p:
  147. # Extract encrypted payload:
  148. # - Skip extended IV (4 bytes in total)
  149. # - Exclude first 4 bytes of the CCMP MIC (note that last 4 are saved in the WEP ICV field)
  150. return str(p.wepdata[4:-4])
  151. elif Dot11CCMP in p or Dot11TKIP in p or Dot11Encrypted in p:
  152. return p[Dot11Encrypted].data
  153. else:
  154. return p[Raw].load
  155. def decrypt_ccmp(p, key):
  156. payload = get_ccmp_payload(p)
  157. sendermac = p.addr2
  158. priority = dot11_get_priority(p)
  159. iv = dot11_get_iv(p)
  160. pn = struct.pack(">I", iv >> 16) + struct.pack(">H", iv & 0xFFFF)
  161. nonce = chr(priority) + sendermac.replace(':','').decode("hex") + pn
  162. cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=8)
  163. plaintext = cipher.decrypt(payload)
  164. return plaintext
  165. class IvInfo():
  166. def __init__(self, p):
  167. self.iv = dot11_get_iv(p)
  168. self.seq = dot11_get_seqnum(p)
  169. self.time = p.time
  170. def is_reused(self, p):
  171. """Return true if frame p reuses an IV and if p is not a retransmitted frame"""
  172. iv = dot11_get_iv(p)
  173. seq = dot11_get_seqnum(p)
  174. return self.iv == iv and self.seq != seq and p.time >= self.time + 1
  175. class IvCollection():
  176. def __init__(self):
  177. self.ivs = dict() # maps IV values to IvInfo objects
  178. def reset(self):
  179. self.ivs = dict()
  180. def track_used_iv(self, p):
  181. iv = dot11_get_iv(p)
  182. self.ivs[iv] = IvInfo(p)
  183. def is_iv_reused(self, p):
  184. """Returns True if this is an *observed* IV reuse and not just a retransmission"""
  185. iv = dot11_get_iv(p)
  186. return iv in self.ivs and self.ivs[iv].is_reused(p)
  187. def is_new_iv(self, p):
  188. """Returns True if the IV in this frame is higher than all previously observed ones"""
  189. iv = dot11_get_iv(p)
  190. if len(self.ivs) == 0: return True
  191. return iv > max(self.ivs.keys())