diff --git a/pcap_anonymize/layers/mac.py b/pcap_anonymize/layers/mac.py index 656b014966522164942c0287ca7d17fb22100c9d..74dff5241050c043abbd9a2551fbb655ae5f9830 100644 --- a/pcap_anonymize/layers/mac.py +++ b/pcap_anonymize/layers/mac.py @@ -234,3 +234,29 @@ def anonymize_dhcp(dhcp: BOOTP) -> BOOTP: break return dhcp + + +def anonymize_pkt_macs(packet) -> None: + """ + Anonymize a packet's MAC addresses. + + Args: + packet: scapy packet to anonymize + """ + # Ethernet + try: + anonymize_ether(packet.getlayer(Ether)) + except AttributeError: + pass + + # ARP + try: + anonymize_arp(packet.getlayer(ARP)) + except: + pass + + # DHCP + try: + anonymize_dhcp(packet.getlayer(BOOTP)) + except AttributeError: + pass diff --git a/pcap_anonymize/pcap_anonymize.py b/pcap_anonymize/pcap_anonymize.py index ad427fe52ff6a33410594e41ed52f60a237f7514..0845bd77fc12a9abd56b562dfa48e5b3bbf460b1 100644 --- a/pcap_anonymize/pcap_anonymize.py +++ b/pcap_anonymize/pcap_anonymize.py @@ -5,10 +5,8 @@ Anonymize all packets in a PCAP file. import os from pathlib import Path from scapy.all import Packet, sniff, wrpcap -from scapy.layers.l2 import Ether, ARP -from scapy.layers.dhcp import BOOTP # Packet layers -from .layers.mac import anonymize_ether, anonymize_arp, anonymize_dhcp +from .layers.mac import anonymize_pkt_macs ### GLOBAL VARIABLES ### @@ -49,22 +47,10 @@ def anonymize_packet(packet: Packet) -> None: global packets # Anonymize MAC addresses - try: - anonymize_ether(packet.getlayer(Ether)) - except AttributeError: - pass - - # Anonymize MAC addresses in ARP packets - try: - anonymize_arp(packet.getlayer(ARP)) - except AttributeError: - pass - - # Anonymize MAC addresses in DHCP packets - try: - anonymize_dhcp(packet.getlayer(BOOTP)) - except AttributeError: - pass + anonymize_pkt_macs(packet) + + # Anonymize application layer + # TODO # Recompute packet checksums packet = recompute_checksums(packet) diff --git a/test/test_mac.py b/test/test_mac.py index bb0bd836fa6708d95fe73a1d183dac803eade313..7c8fb1f76c218f1d011717934da9a1bcf8c53a42 100644 --- a/test/test_mac.py +++ b/test/test_mac.py @@ -6,7 +6,8 @@ from pcap_anonymize.layers.mac import ( anonymize_mac, anonymize_ether, anonymize_arp, - anonymize_dhcp + anonymize_dhcp, + anonymize_pkt_macs ) @@ -125,6 +126,10 @@ def test_anonymize_ether_multicast() -> None: assert ether_multicast.src == mac_multicast assert ether_multicast.dst == mac_multicast + anonymize_pkt_macs(ether_multicast) + assert ether_multicast.src == mac_multicast + assert ether_multicast.dst == mac_multicast + def test_anonymize_ether_laa() -> None: """ @@ -140,6 +145,14 @@ def test_anonymize_ether_laa() -> None: assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) + anonymize_pkt_macs(ether_laa) + assert ether_laa.src != mac_laa + assert get_ig_bit(ether_laa.src) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.src) == get_ul_bit(mac_laa) + assert ether_laa.dst != mac_laa + assert get_ig_bit(ether_laa.dst) == get_ig_bit(mac_laa) + assert get_ul_bit(ether_laa.dst) == get_ul_bit(mac_laa) + def test_anonymize_ether_uaa() -> None: """ @@ -153,6 +166,12 @@ def test_anonymize_ether_uaa() -> None: assert ether_laa.dst.startswith(mac_uaa[:8]) assert ether_laa.dst[10:] != mac_uaa[10:] + anonymize_pkt_macs(ether_laa) + assert ether_laa.src.startswith(mac_uaa[:8]) + assert ether_laa.src[10:] != mac_uaa[10:] + assert ether_laa.dst.startswith(mac_uaa[:8]) + assert ether_laa.dst[10:] != mac_uaa[10:] + def test_anonymize_arp_multicast() -> None: """ @@ -164,6 +183,10 @@ def test_anonymize_arp_multicast() -> None: assert arp_multicast.hwsrc == mac_multicast assert arp_multicast.hwdst == mac_multicast + anonymize_pkt_macs(arp_multicast) + assert arp_multicast.hwsrc == mac_multicast + assert arp_multicast.hwdst == mac_multicast + def test_anonymize_arp_laa() -> None: """ @@ -179,6 +202,14 @@ def test_anonymize_arp_laa() -> None: assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) + anonymize_pkt_macs(arp_laa) + assert arp_laa.hwsrc != mac_laa + assert get_ig_bit(arp_laa.hwsrc) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwsrc) == get_ul_bit(mac_laa) + assert arp_laa.hwdst != mac_laa + assert get_ig_bit(arp_laa.hwdst) == get_ig_bit(mac_laa) + assert get_ul_bit(arp_laa.hwdst) == get_ul_bit(mac_laa) + def test_anonymize_arp_uaa() -> None: """ @@ -192,6 +223,12 @@ def test_anonymize_arp_uaa() -> None: assert arp_uaa.hwdst.startswith(mac_uaa[:8]) assert arp_uaa.hwdst[10:] != mac_uaa[10:] + anonymize_pkt_macs(arp_uaa) + assert arp_uaa.hwsrc.startswith(mac_uaa[:8]) + assert arp_uaa.hwsrc[10:] != mac_uaa[10:] + assert arp_uaa.hwdst.startswith(mac_uaa[:8]) + assert arp_uaa.hwdst[10:] != mac_uaa[10:] + def test_anonymize_dhcp_multicast() -> None: """ @@ -202,11 +239,15 @@ def test_anonymize_dhcp_multicast() -> None: dhcp = BOOTP(chaddr=mac_str_to_bytes(mac_multicast)) anonymize_dhcp(dhcp) assert dhcp.chaddr == mac_multicast_bytes + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr == mac_multicast_bytes # Option: Client Identifier dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_multicast))]) anonymize_dhcp(dhcp) assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes + anonymize_pkt_macs(dhcp) + assert dhcp.getlayer(DHCP).options[0][1][1:7] == mac_multicast_bytes def test_anonymize_dhcp_laa() -> None: @@ -221,6 +262,12 @@ def test_anonymize_dhcp_laa() -> None: assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr != mac_laa_bytes + assert get_ig_bit(dhcp.chaddr) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(dhcp.chaddr) == get_ul_bit(mac_laa_bytes) + + # Option: Client Identifier dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_laa))]) anonymize_dhcp(dhcp) @@ -229,6 +276,12 @@ def test_anonymize_dhcp_laa() -> None: assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) + anonymize_pkt_macs(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon != mac_laa_bytes + assert get_ig_bit(mac_anon) == get_ig_bit(mac_laa_bytes) + assert get_ul_bit(mac_anon) == get_ul_bit(mac_laa_bytes) + def test_anonymize_dhcp_uaa() -> None: """ @@ -241,9 +294,19 @@ def test_anonymize_dhcp_uaa() -> None: assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] + anonymize_pkt_macs(dhcp) + assert dhcp.chaddr[:3] == mac_uaa_bytes[:3] + assert dhcp.chaddr[3:] != mac_uaa_bytes[3:] + + # Option: Client Identifier dhcp /= DHCP(options=[("client_id", b"\x01" + mac_str_to_bytes(mac_uaa))]) anonymize_dhcp(dhcp) mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] assert mac_anon[:3] == mac_uaa_bytes[:3] assert mac_anon[3:] != mac_uaa_bytes[3:] + + anonymize_pkt_macs(dhcp) + mac_anon = dhcp.getlayer(DHCP).options[0][1][1:7] + assert mac_anon[:3] == mac_uaa_bytes[:3] + assert mac_anon[3:] != mac_uaa_bytes[3:]