libwifi.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # Copyright (c) 2017, Mathy Vanhoef <Mathy.Vanhoef@cs.kuleuven.be>
  2. #
  3. # This code may be distributed under the terms of the BSD license.
  4. # See README for more details.
  5. from scapy.all import *
  6. from Cryptodome.Cipher import AES
  7. #### Basic output and logging functionality ####
  8. ALL, DEBUG, INFO, STATUS, WARNING, ERROR = range(6)
  9. COLORCODES = { "gray" : "\033[0;37m",
  10. "green" : "\033[0;32m",
  11. "orange": "\033[0;33m",
  12. "red" : "\033[0;31m" }
  13. global_log_level = INFO
  14. def log(level, msg, color=None, showtime=True):
  15. if level < global_log_level: return
  16. if level == DEBUG and color is None: color="gray"
  17. if level == WARNING and color is None: color="orange"
  18. if level == ERROR and color is None: color="red"
  19. print (datetime.now().strftime('[%H:%M:%S] ') if showtime else " "*11) + COLORCODES.get(color, "") + msg + "\033[1;0m"
  20. #### Packet Processing Functions ####
  21. class DHCP_sock(DHCP_am):
  22. def __init__(self, **kwargs):
  23. self.sock = kwargs.pop("sock")
  24. self.server_ip = kwargs["gw"]
  25. super(DHCP_sock, self).__init__(**kwargs)
  26. def make_reply(self, req):
  27. rep = super(DHCP_sock, self).make_reply(req)
  28. # Fix scapy bug: set broadcast IP if required
  29. if rep is not None and BOOTP in req and IP in rep:
  30. if req[BOOTP].flags & 0x8000 != 0 and req[BOOTP].giaddr == '0.0.0.0' and req[BOOTP].ciaddr == '0.0.0.0':
  31. rep[IP].dst = "255.255.255.255"
  32. # Explicitly set source IP if requested
  33. if not self.server_ip is None:
  34. rep[IP].src = self.server_ip
  35. return rep
  36. def send_reply(self, reply):
  37. self.sock.send(reply, **self.optsend)
  38. def print_reply(self, req, reply):
  39. log(STATUS, "%s: DHCP reply %s to %s" % (reply.getlayer(Ether).dst, reply.getlayer(BOOTP).yiaddr, reply.dst), color="green")
  40. def remove_client(self, clientmac):
  41. clientip = self.leases[clientmac]
  42. self.pool.append(clientip)
  43. del self.leases[clientmac]
  44. class ARP_sock(ARP_am):
  45. def __init__(self, **kwargs):
  46. self.sock = kwargs.pop("sock")
  47. super(ARP_am, self).__init__(**kwargs)
  48. def send_reply(self, reply):
  49. self.sock.send(reply, **self.optsend)
  50. def print_reply(self, req, reply):
  51. log(STATUS, "%s: ARP: %s ==> %s on %s" % (reply.getlayer(Ether).dst, req.summary(), reply.summary(), self.iff))
  52. #### Packet Processing Functions ####
  53. class MitmSocket(L2Socket):
  54. def __init__(self, **kwargs):
  55. super(MitmSocket, self).__init__(**kwargs)
  56. def send(self, p):
  57. # Hack: set the More Data flag so we can detect injected frames (and so clients stay awake longer)
  58. p[Dot11].FCfield |= 0x20
  59. L2Socket.send(self, RadioTap()/p)
  60. def _strip_fcs(self, p):
  61. # Scapy can't handle the optional Frame Check Sequence (FCS) field automatically
  62. if p[RadioTap].present & 2 != 0:
  63. rawframe = str(p[RadioTap])
  64. pos = 8
  65. while ord(rawframe[pos - 1]) & 0x80 != 0: pos += 4
  66. # If the TSFT field is present, it must be 8-bytes aligned
  67. if p[RadioTap].present & 1 != 0:
  68. pos += (8 - (pos % 8))
  69. pos += 8
  70. # Remove FCS if present
  71. if ord(rawframe[pos]) & 0x10 != 0:
  72. return Dot11(str(p[Dot11])[:-4])
  73. return p[Dot11]
  74. def recv(self, x=MTU):
  75. p = L2Socket.recv(self, x)
  76. if p == None or not Dot11 in p: return None
  77. # Hack: ignore frames that we just injected and are echoed back by the kernel
  78. if p[Dot11].FCfield & 0x20 != 0:
  79. return None
  80. # Strip the FCS if present, and drop the RadioTap header
  81. return self._strip_fcs(p)
  82. def close(self):
  83. super(MitmSocket, self).close()
  84. def dot11_get_seqnum(p):
  85. return p[Dot11].SC >> 4
  86. def dot11_get_iv(p):
  87. """Scapy can't handle Extended IVs, so do this properly ourselves (only works for CCMP)"""
  88. if Dot11WEP not in p:
  89. log(ERROR, "INTERNAL ERROR: Requested IV of plaintext frame")
  90. return 0
  91. wep = p[Dot11WEP]
  92. if wep.keyid & 32:
  93. # FIXME: Only CCMP is supported (TKIP uses a different IV structure)
  94. return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (struct.unpack(">I", wep.wepdata[:4])[0] << 16)
  95. else:
  96. return ord(wep.iv[0]) + (ord(wep.iv[1]) << 8) + (ord(wep.iv[2]) << 16)
  97. def get_tlv_value(p, type):
  98. if not Dot11Elt in p: return None
  99. el = p[Dot11Elt]
  100. while isinstance(el, Dot11Elt):
  101. if el.ID == type:
  102. return el.info
  103. el = el.payload
  104. return None
  105. def dot11_get_priority(p):
  106. if not Dot11QoS in p: return 0
  107. return ord(str(p[Dot11QoS])[0])
  108. #### Crypto functions and util ####
  109. def get_ccmp_payload(p):
  110. # Extract encrypted payload:
  111. # - Skip extended IV (4 bytes in total)
  112. # - Exclude first 4 bytes of the CCMP MIC (note that last 4 are saved in the WEP ICV field)
  113. return str(p.wepdata[4:-4])
  114. def decrypt_ccmp(p, key):
  115. payload = get_ccmp_payload(p)
  116. sendermac = p[Dot11].addr2
  117. priority = dot11_get_priority(p)
  118. iv = dot11_get_iv(p)
  119. pn = struct.pack(">I", iv >> 16) + struct.pack(">H", iv & 0xFFFF)
  120. nonce = chr(priority) + sendermac.replace(':','').decode("hex") + pn
  121. cipher = AES.new(key, AES.MODE_CCM, nonce, mac_len=8)
  122. plaintext = cipher.decrypt(payload)
  123. return plaintext
  124. class IvInfo():
  125. def __init__(self, p):
  126. self.iv = dot11_get_iv(p)
  127. self.seq = dot11_get_seqnum(p)
  128. self.time = p.time
  129. def is_reused(self, p):
  130. """Return true if frame p reuses an IV and if p is not a retransmitted frame"""
  131. iv = dot11_get_iv(p)
  132. seq = dot11_get_seqnum(p)
  133. return self.iv == iv and self.seq != seq and p.time >= self.time + 1
  134. class IvCollection():
  135. def __init__(self):
  136. self.ivs = dict() # maps IV values to IvInfo objects
  137. def reset(self):
  138. self.ivs = dict()
  139. def track_used_iv(self, p):
  140. iv = dot11_get_iv(p)
  141. self.ivs[iv] = IvInfo(p)
  142. def is_iv_reused(self, p):
  143. """Returns True if this is an *observed* IV reuse and not just a retransmission"""
  144. iv = dot11_get_iv(p)
  145. return iv in self.ivs and self.ivs[iv].is_reused(p)
  146. def is_new_iv(self, p):
  147. """Returns True if the IV in this frame is higher than all previously observed ones"""
  148. iv = dot11_get_iv(p)
  149. if len(self.ivs) == 0: return True
  150. return iv > max(self.ivs.keys())