diff --git a/pcap_anonymize/layers/mac.py b/pcap_anonymize/layers/mac.py index 6b7557f77ebd113800978218b771eb5a8bdd579d..f03836d9405c5d5d2916f3d9528608e10c30bb5c 100644 --- a/pcap_anonymize/layers/mac.py +++ b/pcap_anonymize/layers/mac.py @@ -16,8 +16,10 @@ DHCP_CLIENT_ID_TYPE_ETH = 1 # Special, well-known MAC addresses special_macs = [ - "00:00:00:00:00:00", # Default - "ff:ff:ff:ff:ff:ff" # Broadcast + "00:00:00:00:00:00", # Default + b"\x00\x00\x00\x00\x00\x00", # Default, as bytes + "ff:ff:ff:ff:ff:ff", # Broadcast + b"\xff\xff\xff\xff\xff\xff" # Broadcast, as bytes ] @@ -47,34 +49,56 @@ def mac_bytes_to_str(mac: bytes) -> str: return ":".join(f"{byte:02x}" for byte in mac) -def get_ig_bit(mac: str) -> int: +def _get_first_byte(mac: str | bytes) -> int: + """ + Get the first byte of a MAC address. + + Args: + mac (str | bytes): MAC address to get the first byte from + Returns: + int: first byte of the MAC address + Raises: + TypeError: if the MAC address is of an unsupported type + """ + # Dispatch based on the type of the MAC address + if isinstance(mac, str): + return int(mac.split(":")[0], BASE_HEX) + elif isinstance(mac, bytes): + return int(mac[0]) + else: + raise TypeError(f"Unsupported type for MAC address: {type(mac)}") + + +def get_ig_bit(mac: str | bytes) -> int: """ Get the I/G bit of a given MAC address. Args: - mac (str): MAC address to get the I/G bit from + mac (str | bytes): MAC address to get the I/G bit from Returns: int: 8-bit integer with the I/G bit set to its corresponding value, and all other bits set to 0 + Raises: + TypeError: if the MAC address is of an unsupported type """ - first_byte = int(mac.split(":")[0], BASE_HEX) ig_mask = 0b00000001 - return first_byte & ig_mask + return _get_first_byte(mac) & ig_mask -def get_ul_bit(mac: str) -> int: +def get_ul_bit(mac: str | bytes) -> int: """ Get the U/L bit of a given MAC address. Args: - mac (str): MAC address to get the U/L bit from + mac (str | bytes): MAC address to get the U/L bit from Returns: int: 8-bit integer with the U/L bit set to its corresponding value, and all other bits set to 0 + Raises: + TypeError: if the MAC address is of an unsupported type """ - first_byte = int(mac.split(":")[0], BASE_HEX) ul_mask = 0b00000010 - return first_byte & ul_mask + return _get_first_byte(mac) & ul_mask def anonymize_mac(mac: str) -> str: @@ -90,8 +114,7 @@ def anonymize_mac(mac: str) -> str: if mac in special_macs: return mac - ## Classic MAC address - mac_split = mac.split(":") + ### Classic MAC address ## I/G bit: first byte, least-significant bit # I/G bit = 0 ==> Unicast address @@ -110,14 +133,16 @@ def anonymize_mac(mac: str) -> str: ul_bit = get_ul_bit(mac) is_local = bool(ul_bit) # True ==> LAA, False ==> UAA + mac_bytes = mac if isinstance(mac, bytes) else mac_str_to_bytes(mac) + ## Locally administered address if is_local: bit_mask = ig_bit | ul_bit # Compute SHA-256 hash of the MAC address mac_sha256 = sha256() - for byte in mac_split: - mac_sha256.update(int(byte, BASE_HEX).to_bytes(1, BYTE_ORDER)) + for byte in mac_bytes: + mac_sha256.update(byte.to_bytes(1, BYTE_ORDER)) digest = mac_sha256.digest() first_byte = (digest[0] & 0b11111100) | bit_mask # Keep I/G and U/L bits @@ -128,13 +153,13 @@ def anonymize_mac(mac: str) -> str: # Compute SHA-256 hash based on the three least-significant bytes mac_sha256 = sha256() - for byte in mac_split[3:]: - mac_sha256.update(int(byte, BASE_HEX).to_bytes(1, BYTE_ORDER)) + for byte in mac_bytes[3:]: + mac_sha256.update(byte.to_bytes(1, BYTE_ORDER)) digest = mac_sha256.digest() # Keep OUI and anonymize the rest return ( - ':'.join(mac_split[:3]) + # Keep OUI + ':'.join(f"{byte:02x}" for byte in mac_bytes[:3]) + # Keep OUI ':' + ':'.join(f"{digest[i]:02x}" for i in range(0, 3)) # Hashed last 3 bytes ) @@ -179,7 +204,7 @@ def anonymize_dhcp(dhcp: BOOTP) -> BOOTP: """ # Anonymize client hardware address chaddr = mac_bytes_to_str(dhcp.getfieldval("chaddr")[0:6]) - dhcp.setfieldval("chaddr", anonymize_mac(chaddr)) + dhcp.setfieldval("chaddr", mac_str_to_bytes(anonymize_mac(chaddr))) # Check if BOOTP layer contains DHCP options options = dhcp.getfieldval("options") diff --git a/test/test_mac.py b/test/test_mac.py index d8b8b4c1757a71062e4b73598463aeefce62deba..71795d7fb529775a13e3795d989a4693ec8bde6b 100644 --- a/test/test_mac.py +++ b/test/test_mac.py @@ -44,20 +44,32 @@ def test_mac_bytes_to_str() -> None: def test_get_ig_bit() -> None: """ - Test the function `get_ig_bit`. + Test the function `get_ig_bit`, + both with string and bytes representations of MAC addresses. """ - assert get_ig_bit("00:00:00:00:00:00") == 0b00000000 - assert get_ig_bit("01:00:00:00:00:00") == 0b00000001 - assert get_ig_bit("12:34:56:78:9a:bc") == 0b00000000 + # String representation + assert get_ig_bit(mac_multicast) == 0b00000001 + assert get_ig_bit(mac_laa) == 0b00000000 + assert get_ig_bit(mac_uaa) == 0b00000000 + # Bytes representation + assert get_ig_bit(mac_multicast_bytes) == 0b00000001 + assert get_ig_bit(mac_laa_bytes) == 0b00000000 + assert get_ig_bit(mac_uaa_bytes) == 0b00000000 def test_get_ul_bit() -> None: """ - Test the function `get_ul_bit`. + Test the function `get_ul_bit`, + both with string and bytes representations of MAC addresses. """ - assert get_ul_bit("00:00:00:00:00:00") == 0b00000000 - assert get_ul_bit("02:00:00:00:00:00") == 0b00000010 - assert get_ul_bit("12:34:56:78:9a:bc") == 0b00000010 + # String representation + assert get_ul_bit(mac_multicast) == 0b00000000 + assert get_ul_bit(mac_laa) == 0b00000010 + assert get_ul_bit(mac_uaa) == 0b00000000 + # Bytes representation + assert get_ul_bit(mac_multicast_bytes) == 0b00000000 + assert get_ul_bit(mac_laa_bytes) == 0b00000010 + assert get_ul_bit(mac_uaa_bytes) == 0b00000000 def test_anonymize_mac_multicast() -> None: @@ -67,6 +79,7 @@ def test_anonymize_mac_multicast() -> None: The MAC address should not be anonymized. """ assert anonymize_mac(mac_multicast) == mac_multicast + assert anonymize_mac(mac_multicast_bytes) == mac_multicast_bytes def test_anonymize_mac_laa() -> None: @@ -77,10 +90,14 @@ def test_anonymize_mac_laa() -> None: """ mac_laa_anon = anonymize_mac(mac_laa) assert mac_laa_anon != mac_laa - # Verify I/G and U/L bits assert get_ig_bit(mac_laa) == get_ig_bit(mac_laa_anon) assert get_ul_bit(mac_laa) == get_ul_bit(mac_laa_anon) + mac_laa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_laa_bytes)) + assert mac_laa_bytes_anon != mac_laa_bytes + assert get_ig_bit(mac_laa_bytes) == get_ig_bit(mac_laa_bytes_anon) + assert get_ul_bit(mac_laa_bytes) == get_ul_bit(mac_laa_bytes_anon) + def test_anonymize_mac_uaa() -> None: """ @@ -93,6 +110,10 @@ def test_anonymize_mac_uaa() -> None: assert mac_uaa_anon.startswith(mac_uaa[:8]) # Vendor's OUI is kept assert mac_uaa_anon[10:] != mac_uaa[10:] # Last 3 bytes are anonymized + mac_uaa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_uaa_bytes)) + assert mac_uaa_bytes_anon[:3] == mac_uaa_bytes[:3] # Vendor's OUI is kept + assert mac_uaa_bytes_anon[3:] != mac_uaa_bytes[3:] # Last 3 bytes are anonymized + def test_anonymize_ether_multicast() -> None: """ @@ -177,6 +198,26 @@ def test_anonymize_arp_uaa() -> None: # Test the function `anonymize_dhcp`, # with multicast addresses. # """ -# dhcp = BOOTP(chaddr=mac_multicast) +# dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) +# anonymize_dhcp(dhcp) +# assert dhcp.chaddr == mac_multicast_bytes + + +# def test_anonymize_dhcp_laa() -> None: +# """ +# Test the function `anonymize_dhcp`, +# with locally administered addresses. +# """ +# dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) +# anonymize_dhcp(dhcp) +# assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_multicast_bytes) + + +# def test_anonymize_dhcp_multicast() -> None: +# """ +# Test the function `anonymize_dhcp`, +# with multicast addresses. +# """ +# dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) # anonymize_dhcp(dhcp) -# assert dhcp.chaddr == mac_multicast +# assert dhcp.chaddr == mac_multicast_bytes