From 4c5c9d325dbd0462cc3d389cedf0b607c8c89bf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20De=20Keersmaeker?= <francois.dekeersmaeker@uclouvain.be> Date: Thu, 19 Dec 2024 12:29:17 +0100 Subject: [PATCH] App-layer anonymize + refactor --- pcap_anonymize/app_layer/__init__.py | 41 +++ pcap_anonymize/{layers => app_layer}/coap.py | 0 pcap_anonymize/{layers => app_layer}/http.py | 0 .../{layers => app_layer}/tplink.py | 0 pcap_anonymize/layers/__init__.py | 3 - pcap_anonymize/mac/__init__.py | 35 ++ pcap_anonymize/mac/arp.py | 16 + pcap_anonymize/mac/dhcp.py | 48 +++ pcap_anonymize/mac/ether.py | 16 + .../{layers/mac.py => mac/utils.py} | 110 +----- pcap_anonymize/pcap_anonymize.py | 6 +- test/app_layer/__init__.py | 3 + test/{ => app_layer}/test_coap.py | 2 +- test/{ => app_layer}/test_http.py | 2 +- test/{ => app_layer}/test_tplink.py | 2 +- test/mac/__init__.py | 3 + test/mac/test_arp.py | 72 ++++ test/mac/test_dhcp.py | 97 ++++++ test/mac/test_ether.py | 72 ++++ test/mac/test_utils.py | 110 ++++++ test/test_mac.py | 312 ------------------ 21 files changed, 524 insertions(+), 426 deletions(-) create mode 100644 pcap_anonymize/app_layer/__init__.py rename pcap_anonymize/{layers => app_layer}/coap.py (100%) rename pcap_anonymize/{layers => app_layer}/http.py (100%) rename pcap_anonymize/{layers => app_layer}/tplink.py (100%) delete mode 100644 pcap_anonymize/layers/__init__.py create mode 100644 pcap_anonymize/mac/__init__.py create mode 100644 pcap_anonymize/mac/arp.py create mode 100644 pcap_anonymize/mac/dhcp.py create mode 100644 pcap_anonymize/mac/ether.py rename pcap_anonymize/{layers/mac.py => mac/utils.py} (60%) create mode 100644 test/app_layer/__init__.py rename test/{ => app_layer}/test_coap.py (91%) rename test/{ => app_layer}/test_http.py (98%) rename test/{ => app_layer}/test_tplink.py (88%) create mode 100644 test/mac/__init__.py create mode 100644 test/mac/test_arp.py create mode 100644 test/mac/test_dhcp.py create mode 100644 test/mac/test_ether.py create mode 100644 test/mac/test_utils.py delete mode 100644 test/test_mac.py diff --git a/pcap_anonymize/app_layer/__init__.py b/pcap_anonymize/app_layer/__init__.py new file mode 100644 index 0000000..f5f408e --- /dev/null +++ b/pcap_anonymize/app_layer/__init__.py @@ -0,0 +1,41 @@ +# Scapy +from scapy.packet import Packet +from scapy.layers.inet import TCP +from scapy.contrib.coap import CoAP + +# Custom +from .http import get_http_layer, anonymize_http +from .coap import anonymize_coap +from .tplink import anonymize_tplink + + + +def anonymize_app_layer(packet: Packet) -> None: + """ + Anonymize a packet's application layer. + + Args: + packet (scapy.Packet): packet to anonymize + """ + # HTTP layer + try: + anonymize_http(get_http_layer(packet)) + except AttributeError: + pass + + # CoAP layer + try: + anonymize_coap(packet.getlayer(CoAP)) + except AttributeError: + pass + + # TP-Link Smart Home protocol layer + # (i.e. TCP port 9999) + try: + tcp = packet.getlayer(TCP) + sport = tcp.getfieldval("sport") + dport = tcp.getfieldval("dport") + if sport == 9999 or dport == 9999: + anonymize_tplink(packet) + except: + pass diff --git a/pcap_anonymize/layers/coap.py b/pcap_anonymize/app_layer/coap.py similarity index 100% rename from pcap_anonymize/layers/coap.py rename to pcap_anonymize/app_layer/coap.py diff --git a/pcap_anonymize/layers/http.py b/pcap_anonymize/app_layer/http.py similarity index 100% rename from pcap_anonymize/layers/http.py rename to pcap_anonymize/app_layer/http.py diff --git a/pcap_anonymize/layers/tplink.py b/pcap_anonymize/app_layer/tplink.py similarity index 100% rename from pcap_anonymize/layers/tplink.py rename to pcap_anonymize/app_layer/tplink.py diff --git a/pcap_anonymize/layers/__init__.py b/pcap_anonymize/layers/__init__.py deleted file mode 100644 index e25c399..0000000 --- a/pcap_anonymize/layers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Submodule `layers`. -""" diff --git a/pcap_anonymize/mac/__init__.py b/pcap_anonymize/mac/__init__.py new file mode 100644 index 0000000..5f569ac --- /dev/null +++ b/pcap_anonymize/mac/__init__.py @@ -0,0 +1,35 @@ +# Scapy +from scapy.packet import Packet +from scapy.layers.l2 import Ether, ARP +from scapy.layers.dhcp import BOOTP + +# Custom +from .ether import anonymize_ether +from .arp import anonymize_arp +from .dhcp import anonymize_dhcp + + +def anonymize_pkt_macs(packet: Packet) -> None: + """ + Anonymize a packet's MAC addresses. + + Args: + packet: scapy packet to anonymize + """ + # Ethernet + try: + anonymize_ether(packet.getlayer(Ether)) + except AttributeError: + pass + + # ARP + try: + anonymize_arp(packet.getlayer(ARP)) + except: + pass + + # DHCP + try: + anonymize_dhcp(packet.getlayer(BOOTP)) + except AttributeError: + pass diff --git a/pcap_anonymize/mac/arp.py b/pcap_anonymize/mac/arp.py new file mode 100644 index 0000000..f0f292c --- /dev/null +++ b/pcap_anonymize/mac/arp.py @@ -0,0 +1,16 @@ +from scapy.layers.l2 import ARP +from .utils import anonymize_mac + + +def anonymize_arp(arp: ARP) -> ARP: + """ + Anonymize a packet's ARP layer. + + Args: + packet (scapy.ARP): ARP layer to anonymize + Returns: + scapy.ARP: anonymized ARP layer + """ + arp.setfieldval("hwsrc", anonymize_mac(arp.getfieldval("hwsrc"))) + arp.setfieldval("hwdst", anonymize_mac(arp.getfieldval("hwdst"))) + return arp \ No newline at end of file diff --git a/pcap_anonymize/mac/dhcp.py b/pcap_anonymize/mac/dhcp.py new file mode 100644 index 0000000..c8e91b5 --- /dev/null +++ b/pcap_anonymize/mac/dhcp.py @@ -0,0 +1,48 @@ +from scapy.layers.dhcp import BOOTP, DHCP +from .utils import mac_bytes_to_str, mac_str_to_bytes, anonymize_mac + + +BYTE_ORDER = "big" +DHCP_MAGIC_COOKIE = 0x63825363 +DHCP_OPTION_CLIENT_ID = "client_id" +DHCP_CLIENT_ID_TYPE_ETH = 1 + + +def anonymize_dhcp(dhcp: BOOTP) -> BOOTP: + """ + Anonymize a packet's DHCP layer MAC addresses. + + Args: + dhcp (scapy.BOOTP): DHCP layer to anonymize + Returns: + scapy.BOOTP: anonymized DHCP layer + """ + # Anonymize client hardware address + chaddr = mac_bytes_to_str(dhcp.getfieldval("chaddr")[0:6]) + dhcp.setfieldval("chaddr", mac_str_to_bytes(anonymize_mac(chaddr))) + + # Check if BOOTP layer contains DHCP options + options = dhcp.getfieldval("options") + cookie = int.from_bytes(options[:4], BYTE_ORDER) + if cookie != DHCP_MAGIC_COOKIE: + return dhcp + + # BOOTP layer contains DHCP options + # Anonymize Client Identifier option + dhcp = dhcp.getlayer(DHCP) + + if dhcp is None or dhcp.options is None: + return dhcp + + for i, option in enumerate(dhcp.options): + # Option is not of format (code, value), skip + if len(option) != 2: + continue + + code, value = option + if code == DHCP_OPTION_CLIENT_ID and value[0] == DHCP_CLIENT_ID_TYPE_ETH: + mac_anon = mac_str_to_bytes(anonymize_mac(value[1:7])) + dhcp.options[i] = (code, value[0].to_bytes(1, BYTE_ORDER) + mac_anon) + break + + return dhcp \ No newline at end of file diff --git a/pcap_anonymize/mac/ether.py b/pcap_anonymize/mac/ether.py new file mode 100644 index 0000000..a9192b0 --- /dev/null +++ b/pcap_anonymize/mac/ether.py @@ -0,0 +1,16 @@ +from scapy.layers.l2 import Ether +from .utils import anonymize_mac + + +def anonymize_ether(ether: Ether) -> Ether: + """ + Anonymize a packet's Ether layer. + + Args: + ether (scapy.Ether): Ether layer to anonymize + Returns: + scapy.Ether: anonymized Ether layer + """ + ether.setfieldval("src", anonymize_mac(ether.getfieldval("src"))) + ether.setfieldval("dst", anonymize_mac(ether.getfieldval("dst"))) + return ether diff --git a/pcap_anonymize/layers/mac.py b/pcap_anonymize/mac/utils.py similarity index 60% rename from pcap_anonymize/layers/mac.py rename to pcap_anonymize/mac/utils.py index 74dff52..25be41d 100644 --- a/pcap_anonymize/layers/mac.py +++ b/pcap_anonymize/mac/utils.py @@ -1,19 +1,13 @@ """ -Anonymize MAC addresses. +Util functions for MAC address manipulation. """ from hashlib import sha256 -from scapy.layers.l2 import Ether, ARP -from scapy.layers.dhcp import BOOTP, DHCP + BASE_HEX = 16 BYTE_ORDER = "big" -# DHCP-related constants -DHCP_MAGIC_COOKIE = 0x63825363 -DHCP_OPTION_CLIENT_ID = "client_id" -DHCP_CLIENT_ID_TYPE_ETH = 1 - # Special, well-known MAC addresses special_macs = [ "00:00:00:00:00:00", # Default @@ -49,7 +43,7 @@ def mac_bytes_to_str(mac: bytes) -> str: return ":".join(f"{byte:02x}" for byte in mac) -def _get_first_byte(mac: str | bytes) -> int: +def get_first_byte(mac: str | bytes) -> int: """ Get the first byte of a MAC address. @@ -82,7 +76,7 @@ def get_ig_bit(mac: str | bytes) -> int: TypeError: if the MAC address is of an unsupported type """ ig_mask = 0b00000001 - return _get_first_byte(mac) & ig_mask + return get_first_byte(mac) & ig_mask def get_ul_bit(mac: str | bytes) -> int: @@ -98,7 +92,7 @@ def get_ul_bit(mac: str | bytes) -> int: TypeError: if the MAC address is of an unsupported type """ ul_mask = 0b00000010 - return _get_first_byte(mac) & ul_mask + return get_first_byte(mac) & ul_mask def anonymize_mac(mac: str) -> str: @@ -166,97 +160,3 @@ def anonymize_mac(mac: str) -> str: ':' + ':'.join(f"{digest[i]:02x}" for i in range(0, 3)) # Hashed last 3 bytes ) - - -def anonymize_ether(ether: Ether) -> Ether: - """ - Anonymize a packet's Ether layer. - - Args: - ether (scapy.Ether): Ether layer to anonymize - Returns: - scapy.Ether: anonymized Ether layer - """ - ether.setfieldval("src", anonymize_mac(ether.getfieldval("src"))) - ether.setfieldval("dst", anonymize_mac(ether.getfieldval("dst"))) - return ether - - -def anonymize_arp(arp: ARP) -> ARP: - """ - Anonymize a packet's ARP layer. - - Args: - packet (scapy.ARP): ARP layer to anonymize - Returns: - scapy.ARP: anonymized ARP layer - """ - arp.setfieldval("hwsrc", anonymize_mac(arp.getfieldval("hwsrc"))) - arp.setfieldval("hwdst", anonymize_mac(arp.getfieldval("hwdst"))) - return arp - - -def anonymize_dhcp(dhcp: BOOTP) -> BOOTP: - """ - Anonymize a packet's DHCP layer MAC addresses. - - Args: - dhcp (scapy.BOOTP): DHCP layer to anonymize - Returns: - scapy.BOOTP: anonymized DHCP layer - """ - # Anonymize client hardware address - chaddr = mac_bytes_to_str(dhcp.getfieldval("chaddr")[0:6]) - dhcp.setfieldval("chaddr", mac_str_to_bytes(anonymize_mac(chaddr))) - - # Check if BOOTP layer contains DHCP options - options = dhcp.getfieldval("options") - cookie = int.from_bytes(options[:4], BYTE_ORDER) - if cookie != DHCP_MAGIC_COOKIE: - return dhcp - - # BOOTP layer contains DHCP options - # Anonymize Client Identifier option - dhcp = dhcp.getlayer(DHCP) - - if dhcp is None or dhcp.options is None: - return dhcp - - for i, option in enumerate(dhcp.options): - # Option is not of format (code, value), skip - if len(option) != 2: - continue - - code, value = option - if code == DHCP_OPTION_CLIENT_ID and value[0] == DHCP_CLIENT_ID_TYPE_ETH: - mac_anon = mac_str_to_bytes(anonymize_mac(value[1:7])) - dhcp.options[i] = (code, value[0].to_bytes(1, BYTE_ORDER) + mac_anon) - break - - return dhcp - - -def anonymize_pkt_macs(packet) -> None: - """ - Anonymize a packet's MAC addresses. - - Args: - packet: scapy packet to anonymize - """ - # Ethernet - try: - anonymize_ether(packet.getlayer(Ether)) - except AttributeError: - pass - - # ARP - try: - anonymize_arp(packet.getlayer(ARP)) - except: - pass - - # DHCP - try: - anonymize_dhcp(packet.getlayer(BOOTP)) - except AttributeError: - pass diff --git a/pcap_anonymize/pcap_anonymize.py b/pcap_anonymize/pcap_anonymize.py index 0845bd7..873987c 100644 --- a/pcap_anonymize/pcap_anonymize.py +++ b/pcap_anonymize/pcap_anonymize.py @@ -6,7 +6,8 @@ import os from pathlib import Path from scapy.all import Packet, sniff, wrpcap # Packet layers -from .layers.mac import anonymize_pkt_macs +from .mac import anonymize_pkt_macs +from .app_layer import anonymize_app_layer ### GLOBAL VARIABLES ### @@ -14,7 +15,6 @@ from .layers.mac import anonymize_pkt_macs packets = [] - ### FUNCTIONS ### def recompute_checksums(packet: Packet) -> Packet: @@ -50,7 +50,7 @@ def anonymize_packet(packet: Packet) -> None: anonymize_pkt_macs(packet) # Anonymize application layer - # TODO + anonymize_app_layer(packet) # Recompute packet checksums packet = recompute_checksums(packet) diff --git a/test/app_layer/__init__.py b/test/app_layer/__init__.py new file mode 100644 index 0000000..f747fc1 --- /dev/null +++ b/test/app_layer/__init__.py @@ -0,0 +1,3 @@ +""" +Unit tests for the submodule `app_layer`. +""" diff --git a/test/test_coap.py b/test/app_layer/test_coap.py similarity index 91% rename from test/test_coap.py rename to test/app_layer/test_coap.py index b0d5800..2d93470 100644 --- a/test/test_coap.py +++ b/test/app_layer/test_coap.py @@ -1,5 +1,5 @@ from scapy.contrib.coap import CoAP -from pcap_anonymize.layers.coap import CoapFields, anonymize_coap +from pcap_anonymize.app_layer.coap import CoapFields, anonymize_coap ### TEST FUNCTIONS ### diff --git a/test/test_http.py b/test/app_layer/test_http.py similarity index 98% rename from test/test_http.py rename to test/app_layer/test_http.py index 2ac2ede..b21ba83 100644 --- a/test/test_http.py +++ b/test/app_layer/test_http.py @@ -1,6 +1,6 @@ from scapy.layers.inet import TCP from scapy.layers.http import HTTPRequest, HTTPResponse -from pcap_anonymize.layers.http import ( +from pcap_anonymize.app_layer.http import ( HttpFields, get_http_layer, anonymize_http diff --git a/test/test_tplink.py b/test/app_layer/test_tplink.py similarity index 88% rename from test/test_tplink.py rename to test/app_layer/test_tplink.py index 53812c3..35afc45 100644 --- a/test/test_tplink.py +++ b/test/app_layer/test_tplink.py @@ -1,6 +1,6 @@ from scapy.packet import Raw from scapy.layers.inet import TCP -from pcap_anonymize.layers.tplink import anonymize_tplink +from pcap_anonymize.app_layer.tplink import anonymize_tplink ### TEST FUNCTIONS ### diff --git a/test/mac/__init__.py b/test/mac/__init__.py new file mode 100644 index 0000000..dd5f8c6 --- /dev/null +++ b/test/mac/__init__.py @@ -0,0 +1,3 @@ +""" +Unit tests for the submodule `mac`. +""" diff --git a/test/mac/test_arp.py b/test/mac/test_arp.py new file mode 100644 index 0000000..401468b --- /dev/null +++ b/test/mac/test_arp.py @@ -0,0 +1,72 @@ +from scapy.layers.l2 import ARP +from pcap_anonymize.mac import anonymize_arp, anonymize_pkt_macs +from pcap_anonymize.mac.utils import get_ig_bit, get_ul_bit + + +### TEST CONSTANTS ### + +mac_multicast = "01:00:00:00:00:00" +mac_multicast_bytes = b"\x01\x00\x00\x00\x00\x00" +mac_laa = "02:00:00:00:00:00" +mac_laa_bytes = b"\x02\x00\x00\x00\x00\x00" +mac_uaa = "00:11:22:33:44:55" +mac_uaa_bytes = b"\x00\x11\x22\x33\x44\x55" + + +### TEST FUNCTIONS ### + +def test_anonymize_arp_multicast() -> None: + """ + Test the function `anonymize_arp`, + with multicast addresses. + """ + arp_multicast = ARP(hwsrc=mac_multicast, hwdst=mac_multicast) + anonymize_arp(arp_multicast) + assert arp_multicast.hwsrc == mac_multicast + assert arp_multicast.hwdst == mac_multicast + + anonymize_pkt_macs(arp_multicast) + assert arp_multicast.hwsrc == mac_multicast + assert arp_multicast.hwdst == mac_multicast + + +def test_anonymize_arp_laa() -> None: + """ + Test the function `anonymize_arp`, + with locally administered addresses. + """ + arp_laa = ARP(hwsrc=mac_laa, hwdst=mac_laa) + anonymize_arp(arp_laa) + assert arp_laa.hwsrc != mac_laa + assert get_ig_bit(arp_laa.hwsrc) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwsrc) == get_ul_bit(mac_laa) + assert arp_laa.hwdst != mac_laa + assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) + + anonymize_pkt_macs(arp_laa) + assert arp_laa.hwsrc != mac_laa + assert get_ig_bit(arp_laa.hwsrc) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwsrc) == get_ul_bit(mac_laa) + assert arp_laa.hwdst != mac_laa + assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) + + +def test_anonymize_arp_uaa() -> None: + """ + Test the function `anonymize_arp`, + with universally administered addresses. + """ + arp_uaa = ARP(hwsrc=mac_uaa, hwdst=mac_uaa) + anonymize_arp(arp_uaa) + assert arp_uaa.hwsrc.startswith(mac_uaa[:8]) + assert arp_uaa.hwsrc[10:] != mac_uaa[10:] + assert arp_uaa.hwdst.startswith(mac_uaa[:8]) + assert arp_uaa.hwdst[10:] != mac_uaa[10:] + + anonymize_pkt_macs(arp_uaa) + assert arp_uaa.hwsrc.startswith(mac_uaa[:8]) + assert arp_uaa.hwsrc[10:] != mac_uaa[10:] + assert arp_uaa.hwdst.startswith(mac_uaa[:8]) + assert arp_uaa.hwdst[10:] != mac_uaa[10:] diff --git a/test/mac/test_dhcp.py b/test/mac/test_dhcp.py new file mode 100644 index 0000000..d67c112 --- /dev/null +++ b/test/mac/test_dhcp.py @@ -0,0 +1,97 @@ +from scapy.layers.dhcp import BOOTP, DHCP +from pcap_anonymize.mac import anonymize_dhcp, anonymize_pkt_macs +from pcap_anonymize.mac.utils import mac_str_to_bytes, get_ig_bit, get_ul_bit + + +### TEST CONSTANTS ### + +mac_multicast = "01:00:00:00:00:00" +mac_multicast_bytes = b"\x01\x00\x00\x00\x00\x00" +mac_laa = "02:00:00:00:00:00" +mac_laa_bytes = b"\x02\x00\x00\x00\x00\x00" +mac_uaa = "00:11:22:33:44:55" +mac_uaa_bytes = b"\x00\x11\x22\x33\x44\x55" + + +### TEST FUNCTIONS ### + +def test_anonymize_dhcp_multicast() -> None: + """ + Test the function `anonymize_dhcp`, + with multicast addresses. + """ + # Client hardware address + dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) + anonymize_dhcp(dhcp) + assert dhcp.chaddr == mac_multicast_bytes + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr == mac_multicast_bytes + + # Option: Client Identifier + dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_multicast))]) + anonymize_dhcp(dhcp) + assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes + anonymize_pkt_macs(dhcp) + assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes + + +def test_anonymize_dhcp_laa() -> None: + """ + Test the function `anonymize_dhcp`, + with locally administered addresses. + """ + # Client hardware address + dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_laa)) + anonymize_dhcp(dhcp) + assert dhcp.chaddr != mac_laa_bytes + assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) + + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr != mac_laa_bytes + assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) + + + # Option: Client Identifier + dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_laa))]) + anonymize_dhcp(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon != mac_laa_bytes + assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) + + anonymize_pkt_macs(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon != mac_laa_bytes + assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) + + +def test_anonymize_dhcp_uaa() -> None: + """ + Test the function `anonymize_dhcp`, + with universally administered addresses. + """ + # Client hardware address + dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_uaa)) + anonymize_dhcp(dhcp) + assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] + assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] + + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] + assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] + + + # Option: Client Identifier + dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_uaa))]) + anonymize_dhcp(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon[:3] == mac_uaa_bytes[:3] + assert mac_anon[3:] != mac_uaa_bytes[3:] + + anonymize_pkt_macs(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon[:3] == mac_uaa_bytes[:3] + assert mac_anon[3:] != mac_uaa_bytes[3:] diff --git a/test/mac/test_ether.py b/test/mac/test_ether.py new file mode 100644 index 0000000..82e99ca --- /dev/null +++ b/test/mac/test_ether.py @@ -0,0 +1,72 @@ +from scapy.layers.l2 import Ether +from pcap_anonymize.mac import anonymize_ether, anonymize_pkt_macs +from pcap_anonymize.mac.utils import get_ig_bit, get_ul_bit + + +### TEST CONSTANTS ### + +mac_multicast = "01:00:00:00:00:00" +mac_multicast_bytes = b"\x01\x00\x00\x00\x00\x00" +mac_laa = "02:00:00:00:00:00" +mac_laa_bytes = b"\x02\x00\x00\x00\x00\x00" +mac_uaa = "00:11:22:33:44:55" +mac_uaa_bytes = b"\x00\x11\x22\x33\x44\x55" + + +### TEST FUNCTIONS ### + +def test_anonymize_ether_multicast() -> None: + """ + Test the function `anonymize_ether`, + with multicast addresses. + """ + ether_multicast = Ether(src=mac_multicast, dst=mac_multicast) + anonymize_ether(ether_multicast) + assert ether_multicast.src == mac_multicast + assert ether_multicast.dst == mac_multicast + + anonymize_pkt_macs(ether_multicast) + assert ether_multicast.src == mac_multicast + assert ether_multicast.dst == mac_multicast + + +def test_anonymize_ether_laa() -> None: + """ + Test the function `anonymize_ether`, + with locally administered addresses. + """ + ether_laa = Ether(src=mac_laa, dst=mac_laa) + anonymize_ether(ether_laa) + assert ether_laa.src != mac_laa + assert get_ig_bit(ether_laa.src) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.src) == get_ul_bit(mac_laa) + assert ether_laa.dst != mac_laa + assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) + + anonymize_pkt_macs(ether_laa) + assert ether_laa.src != mac_laa + assert get_ig_bit(ether_laa.src) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.src) == get_ul_bit(mac_laa) + assert ether_laa.dst != mac_laa + assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) + + +def test_anonymize_ether_uaa() -> None: + """ + Test the function `anonymize_ether`, + with universally administered addresses. + """ + ether_laa = Ether(src=mac_uaa, dst=mac_uaa) + anonymize_ether(ether_laa) + assert ether_laa.src.startswith(mac_uaa[:8]) + assert ether_laa.src[10:] != mac_uaa[10:] + assert ether_laa.dst.startswith(mac_uaa[:8]) + assert ether_laa.dst[10:] != mac_uaa[10:] + + anonymize_pkt_macs(ether_laa) + assert ether_laa.src.startswith(mac_uaa[:8]) + assert ether_laa.src[10:] != mac_uaa[10:] + assert ether_laa.dst.startswith(mac_uaa[:8]) + assert ether_laa.dst[10:] != mac_uaa[10:] diff --git a/test/mac/test_utils.py b/test/mac/test_utils.py new file mode 100644 index 0000000..04645a6 --- /dev/null +++ b/test/mac/test_utils.py @@ -0,0 +1,110 @@ +from pcap_anonymize.mac.utils import ( + mac_str_to_bytes, mac_bytes_to_str, + get_ig_bit, get_ul_bit, + anonymize_mac +) + + +### TEST CONSTANTS ### + +mac_multicast = "01:00:00:00:00:00" +mac_multicast_bytes = b"\x01\x00\x00\x00\x00\x00" +mac_laa = "02:00:00:00:00:00" +mac_laa_bytes = b"\x02\x00\x00\x00\x00\x00" +mac_uaa = "00:11:22:33:44:55" +mac_uaa_bytes = b"\x00\x11\x22\x33\x44\x55" + + +### TEST FUNCTIONS ### + +def test_mac_str_to_bytes() -> None: + """ + Test the function `mac_str_to_bytes`, + which converts a MAC address' string representation to bytes. + """ + assert mac_str_to_bytes(mac_multicast) == mac_multicast_bytes + assert mac_str_to_bytes(mac_laa) == mac_laa_bytes + assert mac_str_to_bytes(mac_uaa) == mac_uaa_bytes + + +def test_mac_bytes_to_str() -> None: + """ + Test the function `mac_bytes_to_str`, + which converts a MAC address' bytes representation to a string. + """ + assert mac_bytes_to_str(mac_multicast_bytes) == mac_multicast + assert mac_bytes_to_str(mac_laa_bytes) == mac_laa + assert mac_bytes_to_str(mac_uaa_bytes) == mac_uaa + + +def test_get_ig_bit() -> None: + """ + Test the function `get_ig_bit`, + both with string and bytes representations of MAC addresses. + """ + # String representation + assert get_ig_bit(mac_multicast) == 0b00000001 + assert get_ig_bit(mac_laa) == 0b00000000 + assert get_ig_bit(mac_uaa) == 0b00000000 + # Bytes representation + assert get_ig_bit(mac_multicast_bytes) == 0b00000001 + assert get_ig_bit(mac_laa_bytes) == 0b00000000 + assert get_ig_bit(mac_uaa_bytes) == 0b00000000 + + +def test_get_ul_bit() -> None: + """ + Test the function `get_ul_bit`, + both with string and bytes representations of MAC addresses. + """ + # String representation + assert get_ul_bit(mac_multicast) == 0b00000000 + assert get_ul_bit(mac_laa) == 0b00000010 + assert get_ul_bit(mac_uaa) == 0b00000000 + # Bytes representation + assert get_ul_bit(mac_multicast_bytes) == 0b00000000 + assert get_ul_bit(mac_laa_bytes) == 0b00000010 + assert get_ul_bit(mac_uaa_bytes) == 0b00000000 + + +def test_anonymize_mac_multicast() -> None: + """ + Test the function `anonymize_mac` + with a multicast MAC address. + The MAC address should not be anonymized. + """ + assert anonymize_mac(mac_multicast) == mac_multicast + assert mac_str_to_bytes(anonymize_mac(mac_multicast_bytes)) == mac_multicast_bytes + + +def test_anonymize_mac_laa() -> None: + """ + Test the function `anonymize_mac` + with a locally administered MAC address. + All bits should be anonymized except the I/G and U/L bits. + """ + mac_laa_anon = anonymize_mac(mac_laa) + assert mac_laa_anon != mac_laa + assert get_ig_bit(mac_laa) == get_ig_bit(mac_laa_anon) + assert get_ul_bit(mac_laa) == get_ul_bit(mac_laa_anon) + + mac_laa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_laa_bytes)) + assert mac_laa_bytes_anon != mac_laa_bytes + assert get_ig_bit(mac_laa_bytes) == get_ig_bit(mac_laa_bytes_anon) + assert get_ul_bit(mac_laa_bytes) == get_ul_bit(mac_laa_bytes_anon) + + +def test_anonymize_mac_uaa() -> None: + """ + Test the function `anonymize_mac` + with an universally administered MAC address. + The 3 first bytes (vendor's OUI) should be kept, + and the 3 last bytes should be anonymized. + """ + mac_uaa_anon = anonymize_mac(mac_uaa) + assert mac_uaa_anon.startswith(mac_uaa[:8]) # Vendor's OUI is kept + assert mac_uaa_anon[10:] != mac_uaa[10:] # Last 3 bytes are anonymized + + mac_uaa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_uaa_bytes)) + assert mac_uaa_bytes_anon[:3] == mac_uaa_bytes[:3] # Vendor's OUI is kept + assert mac_uaa_bytes_anon[3:] != mac_uaa_bytes[3:] # Last 3 bytes are anonymized diff --git a/test/test_mac.py b/test/test_mac.py deleted file mode 100644 index 7c8fb1f..0000000 --- a/test/test_mac.py +++ /dev/null @@ -1,312 +0,0 @@ -from scapy.layers.l2 import Ether, ARP -from scapy.layers.dhcp import BOOTP, DHCP -from pcap_anonymize.layers.mac import ( - mac_str_to_bytes, mac_bytes_to_str, - get_ig_bit, get_ul_bit, - anonymize_mac, - anonymize_ether, - anonymize_arp, - anonymize_dhcp, - anonymize_pkt_macs -) - - -### TEST CONSTANTS ### - -mac_multicast = "01:00:00:00:00:00" -mac_multicast_bytes = b"\x01\x00\x00\x00\x00\x00" -mac_laa = "02:00:00:00:00:00" -mac_laa_bytes = b"\x02\x00\x00\x00\x00\x00" -mac_uaa = "00:11:22:33:44:55" -mac_uaa_bytes = b"\x00\x11\x22\x33\x44\x55" - - -### TEST FUNCTIONS ### - -def test_mac_str_to_bytes() -> None: - """ - Test the function `mac_str_to_bytes`, - which converts a MAC address' string representation to bytes. - """ - assert mac_str_to_bytes(mac_multicast) == mac_multicast_bytes - assert mac_str_to_bytes(mac_laa) == mac_laa_bytes - assert mac_str_to_bytes(mac_uaa) == mac_uaa_bytes - - -def test_mac_bytes_to_str() -> None: - """ - Test the function `mac_bytes_to_str`, - which converts a MAC address' bytes representation to a string. - """ - assert mac_bytes_to_str(mac_multicast_bytes) == mac_multicast - assert mac_bytes_to_str(mac_laa_bytes) == mac_laa - assert mac_bytes_to_str(mac_uaa_bytes) == mac_uaa - - -def test_get_ig_bit() -> None: - """ - Test the function `get_ig_bit`, - both with string and bytes representations of MAC addresses. - """ - # String representation - assert get_ig_bit(mac_multicast) == 0b00000001 - assert get_ig_bit(mac_laa) == 0b00000000 - assert get_ig_bit(mac_uaa) == 0b00000000 - # Bytes representation - assert get_ig_bit(mac_multicast_bytes) == 0b00000001 - assert get_ig_bit(mac_laa_bytes) == 0b00000000 - assert get_ig_bit(mac_uaa_bytes) == 0b00000000 - - -def test_get_ul_bit() -> None: - """ - Test the function `get_ul_bit`, - both with string and bytes representations of MAC addresses. - """ - # String representation - assert get_ul_bit(mac_multicast) == 0b00000000 - assert get_ul_bit(mac_laa) == 0b00000010 - assert get_ul_bit(mac_uaa) == 0b00000000 - # Bytes representation - assert get_ul_bit(mac_multicast_bytes) == 0b00000000 - assert get_ul_bit(mac_laa_bytes) == 0b00000010 - assert get_ul_bit(mac_uaa_bytes) == 0b00000000 - - -def test_anonymize_mac_multicast() -> None: - """ - Test the function `anonymize_mac` - with a multicast MAC address. - The MAC address should not be anonymized. - """ - assert anonymize_mac(mac_multicast) == mac_multicast - assert mac_str_to_bytes(anonymize_mac(mac_multicast_bytes)) == mac_multicast_bytes - - -def test_anonymize_mac_laa() -> None: - """ - Test the function `anonymize_mac` - with a locally administered MAC address. - All bits should be anonymized except the I/G and U/L bits. - """ - mac_laa_anon = anonymize_mac(mac_laa) - assert mac_laa_anon != mac_laa - assert get_ig_bit(mac_laa) == get_ig_bit(mac_laa_anon) - assert get_ul_bit(mac_laa) == get_ul_bit(mac_laa_anon) - - mac_laa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_laa_bytes)) - assert mac_laa_bytes_anon != mac_laa_bytes - assert get_ig_bit(mac_laa_bytes) == get_ig_bit(mac_laa_bytes_anon) - assert get_ul_bit(mac_laa_bytes) == get_ul_bit(mac_laa_bytes_anon) - - -def test_anonymize_mac_uaa() -> None: - """ - Test the function `anonymize_mac` - with an universally administered MAC address. - The 3 first bytes (vendor's OUI) should be kept, - and the 3 last bytes should be anonymized. - """ - mac_uaa_anon = anonymize_mac(mac_uaa) - assert mac_uaa_anon.startswith(mac_uaa[:8]) # Vendor's OUI is kept - assert mac_uaa_anon[10:] != mac_uaa[10:] # Last 3 bytes are anonymized - - mac_uaa_bytes_anon = mac_str_to_bytes(anonymize_mac(mac_uaa_bytes)) - assert mac_uaa_bytes_anon[:3] == mac_uaa_bytes[:3] # Vendor's OUI is kept - assert mac_uaa_bytes_anon[3:] != mac_uaa_bytes[3:] # Last 3 bytes are anonymized - - -def test_anonymize_ether_multicast() -> None: - """ - Test the function `anonymize_ether`, - with multicast addresses. - """ - ether_multicast = Ether(src=mac_multicast, dst=mac_multicast) - anonymize_ether(ether_multicast) - assert ether_multicast.src == mac_multicast - assert ether_multicast.dst == mac_multicast - - anonymize_pkt_macs(ether_multicast) - assert ether_multicast.src == mac_multicast - assert ether_multicast.dst == mac_multicast - - -def test_anonymize_ether_laa() -> None: - """ - Test the function `anonymize_ether`, - with locally administered addresses. - """ - ether_laa = Ether(src=mac_laa, dst=mac_laa) - anonymize_ether(ether_laa) - assert ether_laa.src != mac_laa - assert get_ig_bit(ether_laa.src) == get_ig_bit(mac_laa) - assert get_ul_bit(ether_laa.src) == get_ul_bit(mac_laa) - assert ether_laa.dst != mac_laa - assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) - assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) - - anonymize_pkt_macs(ether_laa) - assert ether_laa.src != mac_laa - assert get_ig_bit(ether_laa.src) == get_ig_bit(mac_laa) - assert get_ul_bit(ether_laa.src) == get_ul_bit(mac_laa) - assert ether_laa.dst != mac_laa - assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) - assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) - - -def test_anonymize_ether_uaa() -> None: - """ - Test the function `anonymize_ether`, - with universally administered addresses. - """ - ether_laa = Ether(src=mac_uaa, dst=mac_uaa) - anonymize_ether(ether_laa) - assert ether_laa.src.startswith(mac_uaa[:8]) - assert ether_laa.src[10:] != mac_uaa[10:] - assert ether_laa.dst.startswith(mac_uaa[:8]) - assert ether_laa.dst[10:] != mac_uaa[10:] - - anonymize_pkt_macs(ether_laa) - assert ether_laa.src.startswith(mac_uaa[:8]) - assert ether_laa.src[10:] != mac_uaa[10:] - assert ether_laa.dst.startswith(mac_uaa[:8]) - assert ether_laa.dst[10:] != mac_uaa[10:] - - -def test_anonymize_arp_multicast() -> None: - """ - Test the function `anonymize_arp`, - with multicast addresses. - """ - arp_multicast = ARP(hwsrc=mac_multicast, hwdst=mac_multicast) - anonymize_arp(arp_multicast) - assert arp_multicast.hwsrc == mac_multicast - assert arp_multicast.hwdst == mac_multicast - - anonymize_pkt_macs(arp_multicast) - assert arp_multicast.hwsrc == mac_multicast - assert arp_multicast.hwdst == mac_multicast - - -def test_anonymize_arp_laa() -> None: - """ - Test the function `anonymize_arp`, - with locally administered addresses. - """ - arp_laa = ARP(hwsrc=mac_laa, hwdst=mac_laa) - anonymize_arp(arp_laa) - assert arp_laa.hwsrc != mac_laa - assert get_ig_bit(arp_laa.hwsrc) == get_ig_bit(mac_laa) - assert get_ul_bit(arp_laa.hwsrc) == get_ul_bit(mac_laa) - assert arp_laa.hwdst != mac_laa - assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) - assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) - - anonymize_pkt_macs(arp_laa) - assert arp_laa.hwsrc != mac_laa - assert get_ig_bit(arp_laa.hwsrc) == get_ig_bit(mac_laa) - assert get_ul_bit(arp_laa.hwsrc) == get_ul_bit(mac_laa) - assert arp_laa.hwdst != mac_laa - assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) - assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) - - -def test_anonymize_arp_uaa() -> None: - """ - Test the function `anonymize_arp`, - with universally administered addresses. - """ - arp_uaa = ARP(hwsrc=mac_uaa, hwdst=mac_uaa) - anonymize_arp(arp_uaa) - assert arp_uaa.hwsrc.startswith(mac_uaa[:8]) - assert arp_uaa.hwsrc[10:] != mac_uaa[10:] - assert arp_uaa.hwdst.startswith(mac_uaa[:8]) - assert arp_uaa.hwdst[10:] != mac_uaa[10:] - - anonymize_pkt_macs(arp_uaa) - assert arp_uaa.hwsrc.startswith(mac_uaa[:8]) - assert arp_uaa.hwsrc[10:] != mac_uaa[10:] - assert arp_uaa.hwdst.startswith(mac_uaa[:8]) - assert arp_uaa.hwdst[10:] != mac_uaa[10:] - - -def test_anonymize_dhcp_multicast() -> None: - """ - Test the function `anonymize_dhcp`, - with multicast addresses. - """ - # Client hardware address - dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) - anonymize_dhcp(dhcp) - assert dhcp.chaddr == mac_multicast_bytes - anonymize_pkt_macs(dhcp) - assert dhcp.chaddr == mac_multicast_bytes - - # Option: Client Identifier - dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_multicast))]) - anonymize_dhcp(dhcp) - assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes - anonymize_pkt_macs(dhcp) - assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes - - -def test_anonymize_dhcp_laa() -> None: - """ - Test the function `anonymize_dhcp`, - with locally administered addresses. - """ - # Client hardware address - dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_laa)) - anonymize_dhcp(dhcp) - assert dhcp.chaddr != mac_laa_bytes - assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) - assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) - - anonymize_pkt_macs(dhcp) - assert dhcp.chaddr != mac_laa_bytes - assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) - assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) - - - # Option: Client Identifier - dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_laa))]) - anonymize_dhcp(dhcp) - mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] - assert mac_anon != mac_laa_bytes - assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) - assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) - - anonymize_pkt_macs(dhcp) - mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] - assert mac_anon != mac_laa_bytes - assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) - assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) - - -def test_anonymize_dhcp_uaa() -> None: - """ - Test the function `anonymize_dhcp`, - with universally administered addresses. - """ - # Client hardware address - dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_uaa)) - anonymize_dhcp(dhcp) - assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] - assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] - - anonymize_pkt_macs(dhcp) - assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] - assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] - - - # Option: Client Identifier - dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_uaa))]) - anonymize_dhcp(dhcp) - mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] - assert mac_anon[:3] == mac_uaa_bytes[:3] - assert mac_anon[3:] != mac_uaa_bytes[3:] - - anonymize_pkt_macs(dhcp) - mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] - assert mac_anon[:3] == mac_uaa_bytes[:3] - assert mac_anon[3:] != mac_uaa_bytes[3:] -- GitLab