Newer
Older
from hashlib import sha256
from scapy.layers.dhcp import BOOTP, DHCP
BYTE_ORDER = "big"
# DHCP-related constants
DHCP_MAGIC_COOKIE = 0x63825363
DHCP_OPTION_CLIENT_ID = "client_id"
DHCP_CLIENT_ID_TYPE_ETH = 1
# Special, well-known MAC addresses
special_macs = [
"00:00:00:00:00:00", # Default
b"\x00\x00\x00\x00\x00\x00", # Default, as bytes
"ff:ff:ff:ff:ff:ff", # Broadcast
b"\xff\xff\xff\xff\xff\xff" # Broadcast, as bytes
def mac_str_to_bytes(mac: str) -> bytes:
"""
Convert a MAC address string representation
to a bytes object.
Args:
mac (str): MAC address to convert
Returns:
bytes: MAC address as a bytes object
"""
return bytes.fromhex(mac.replace(":", ""))
def mac_bytes_to_str(mac: bytes) -> str:
"""
Convert a MAC address bytes object
to its string representation.
Args:
mac (bytes): MAC address to convert
Returns:
str: MAC address as a string
"""
return ":".join(f"{byte:02x}" for byte in mac)
def _get_first_byte(mac: str | bytes) -> int:
"""
Get the first byte of a MAC address.
Args:
mac (str | bytes): MAC address to get the first byte from
Returns:
int: first byte of the MAC address
Raises:
TypeError: if the MAC address is of an unsupported type
"""
# Dispatch based on the type of the MAC address
if isinstance(mac, str):
return int(mac.split(":")[0], BASE_HEX)
elif isinstance(mac, bytes):
return int(mac[0])
else:
raise TypeError(f"Unsupported type for MAC address: {type(mac)}")
def get_ig_bit(mac: str | bytes) -> int:
"""
Get the I/G bit of a given MAC address.
Args:
mac (str | bytes): MAC address to get the I/G bit from
Returns:
int: 8-bit integer with the I/G bit set to its corresponding value,
and all other bits set to 0
Raises:
TypeError: if the MAC address is of an unsupported type
return _get_first_byte(mac) & ig_mask
def get_ul_bit(mac: str | bytes) -> int:
"""
Get the U/L bit of a given MAC address.
Args:
mac (str | bytes): MAC address to get the U/L bit from
Returns:
int: 8-bit integer with the U/L bit set to its corresponding value,
and all other bits set to 0
Raises:
TypeError: if the MAC address is of an unsupported type
return _get_first_byte(mac) & ul_mask
def anonymize_mac(mac: str) -> str:
"""
Anonymize a given MAC address.
Args:
mac (str): MAC address to anonymize
Returns:
str: anonymized MAC address
"""
# Special MAC address
if mac in special_macs:
return mac
## I/G bit: first byte, least-significant bit
# I/G bit = 0 ==> Unicast address
# I/G bit = 1 ==> Multicast address
is_multicast = bool(ig_bit) # True ==> Multicast, False ==> Unicast
# Multicast address:
# do not anonymize
if is_multicast:
if isinstance(mac, bytes):
return mac_bytes_to_str(mac)
elif isinstance(mac, str):
return mac
## U/L bit: first byte, second least-significant bit
# U/L bit = 0 ==> Universally administered address (UAA)
# U/L bit = 1 ==> Locally administered address (LAA)
is_local = bool(ul_bit) # True ==> LAA, False ==> UAA
mac_bytes = mac if isinstance(mac, bytes) else mac_str_to_bytes(mac)
## Locally administered address
# Compute SHA-256 hash of the MAC address
mac_sha256 = sha256()
for byte in mac_bytes:
mac_sha256.update(byte.to_bytes(1, BYTE_ORDER))
digest = mac_sha256.digest()
first_byte = (digest[0] & 0b11111100) | bit_mask # Keep I/G and U/L bits
return f"{first_byte:02x}:" + ':'.join(f"{digest[i]:02x}" for i in range(1, 6))
## Universally administered address
# Compute SHA-256 hash based on the three least-significant bytes
mac_sha256 = sha256()
for byte in mac_bytes[3:]:
mac_sha256.update(byte.to_bytes(1, BYTE_ORDER))
digest = mac_sha256.digest()
# Keep OUI and anonymize the rest
':'.join(f"{byte:02x}" for byte in mac_bytes[:3]) + # Keep OUI
':'.join(f"{digest[i]:02x}" for i in range(0, 3)) # Hashed last 3 bytes
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
)
def anonymize_ether(ether: Ether) -> Ether:
"""
Anonymize a packet's Ether layer.
Args:
ether (scapy.Ether): Ether layer to anonymize
Returns:
scapy.Ether: anonymized Ether layer
"""
ether.setfieldval("src", anonymize_mac(ether.getfieldval("src")))
ether.setfieldval("dst", anonymize_mac(ether.getfieldval("dst")))
return ether
def anonymize_arp(arp: ARP) -> ARP:
"""
Anonymize a packet's ARP layer.
Args:
packet (scapy.ARP): ARP layer to anonymize
Returns:
scapy.ARP: anonymized ARP layer
"""
arp.setfieldval("hwsrc", anonymize_mac(arp.getfieldval("hwsrc")))
arp.setfieldval("hwdst", anonymize_mac(arp.getfieldval("hwdst")))
return arp
def anonymize_dhcp(dhcp: BOOTP) -> BOOTP:
Anonymize a packet's DHCP layer MAC addresses.
Args:
dhcp (scapy.BOOTP): DHCP layer to anonymize
Returns:
scapy.BOOTP: anonymized DHCP layer
# Anonymize client hardware address
chaddr = mac_bytes_to_str(dhcp.getfieldval("chaddr")[0:6])
dhcp.setfieldval("chaddr", mac_str_to_bytes(anonymize_mac(chaddr)))
# Check if BOOTP layer contains DHCP options
options = dhcp.getfieldval("options")
cookie = int.from_bytes(options[:4], BYTE_ORDER)
if cookie != DHCP_MAGIC_COOKIE:
return dhcp
# BOOTP layer contains DHCP options
# Anonymize Client Identifier option
dhcp = dhcp.getlayer(DHCP)
if dhcp is None or dhcp.options is None:
return dhcp
for i, option in enumerate(dhcp.options):
# Option is not of format (code, value), skip
if len(option) != 2:
continue
code, value = option
if code == DHCP_OPTION_CLIENT_ID and value[0] == DHCP_CLIENT_ID_TYPE_ETH:
mac_anon = mac_str_to_bytes(anonymize_mac(value[1:7]))
dhcp.options[i] = (code, value[0].to_bytes(1, BYTE_ORDER) + mac_anon)
break
return dhcp
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def anonymize_pkt_macs(packet) -> None:
"""
Anonymize a packet's MAC addresses.
Args:
packet: scapy packet to anonymize
"""
# Ethernet
try:
anonymize_ether(packet.getlayer(Ether))
except AttributeError:
pass
# ARP
try:
anonymize_arp(packet.getlayer(ARP))
except:
pass
# DHCP
try:
anonymize_dhcp(packet.getlayer(BOOTP))
except AttributeError:
pass