diff --git a/.gitignore b/.gitignore index 2f0e4863f45088542b1af67020c4d3c5d5cf6074..f84c7803f7b4912c184225bf394093872abaf8f4 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ __pycache__ build dist *.egg-info + +# Local test scripts +test.py diff --git a/pcap_anonymize/__init__.py b/pcap_anonymize/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..31f5153b0a64a9060ff01bb2dfc310494c964601 100644 --- a/pcap_anonymize/__init__.py +++ b/pcap_anonymize/__init__.py @@ -0,0 +1 @@ +from .pcap_anonymize import anonymize_pcap diff --git a/pcap_anonymize/layers/__init__.py b/pcap_anonymize/layers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e25c399a2011f096d2411dd1ac0c5bfd6b38e169 --- /dev/null +++ b/pcap_anonymize/layers/__init__.py @@ -0,0 +1,3 @@ +""" +Submodule `layers`. +""" diff --git a/pcap_anonymize/layers/mac.py b/pcap_anonymize/layers/mac.py new file mode 100644 index 0000000000000000000000000000000000000000..1ac3924f9bbef716f7f68e4a33c58bcf556d63fa --- /dev/null +++ b/pcap_anonymize/layers/mac.py @@ -0,0 +1,91 @@ +""" +Anonymize MAC addresses. +""" + +import secrets +from scapy.layers.l2 import Ether, ARP + +BASE_HEX = 16 + +# Special, well-known MAC addresses +special_macs = [ + "00:00:00:00:00:00", # Default + "ff:ff:ff:ff:ff:ff" # Broadcast +] + + +def anonymize_mac(mac: str) -> str: + """ + Anonymize a given MAC address. + + Args: + mac (str): MAC address to anonymize + Returns: + str: anonymized MAC address + """ + # Special MAC address + if mac in special_macs: + return mac + + ## Classic MAC address + mac_split = mac.split(":") + + ## I/G bit: first byte, least-significant bit + # I/G bit = 0 ==> Unicast address + # I/G bit = 1 ==> Multicast address + first_byte = int(mac_split[0], BASE_HEX) + ig_mask = 0b00000001 + ig_bit = first_byte & ig_mask + is_multicast = bool(ig_bit) # True ==> Multicast, False ==> Unicast + + # Multicast address: + # do not anonymize + if is_multicast: + return mac + + ## U/L bit: first byte, second least-significant bit + # U/L bit = 0 ==> Universally administered address (UAA) + # U/L bit = 1 ==> Locally administered address (LAA) + ul_mask = 0b00000010 + ul_bit = first_byte & ul_mask + is_local = bool(ul_bit) # True ==> LAA, False ==> UAA + + # Locally administered address + if is_local: + first_byte = (secrets.token_hex(1) & ig_bit) & ul_bit # Keep I/G and U/L bits + return f"{first_byte:x}" + ':'.join(secrets.token_hex(1) for _ in range(5)) + + # Universally administered address + return ( + ':'.join(mac_split[:3]) + # Keep OUI + ':' + + ':'.join(secrets.token_hex(1) for _ in range(3)) # Random last 3 bytes + ) + + +def anonymize_ether(ether: Ether) -> Ether: + """ + Anonymize a packet's Ether layer. + + Args: + ether (scapy.Ether): Ether layer to anonymize + Returns: + scapy.Ether: anonymized Ether layer + """ + ether.setfieldval("src", anonymize_mac(ether.getfieldval("src"))) + ether.setfieldval("dst", anonymize_mac(ether.getfieldval("dst"))) + return ether + + +def anonymize_arp(arp: ARP) -> ARP: + """ + Anonymize a packet's ARP layer. + + Args: + packet (scapy.ARP): ARP layer to anonymize + Returns: + scapy.ARP: anonymized ARP layer + """ + arp.setfieldval("hwsrc", anonymize_mac(arp.getfieldval("hwsrc"))) + arp.setfieldval("hwdst", anonymize_mac(arp.getfieldval("hwdst"))) + return arp diff --git a/pcap_anonymize/pcap_anonymize.py b/pcap_anonymize/pcap_anonymize.py new file mode 100644 index 0000000000000000000000000000000000000000..f2bbd6327dedada974031d417c260fefc947a759 --- /dev/null +++ b/pcap_anonymize/pcap_anonymize.py @@ -0,0 +1,84 @@ +""" +Anonymize all packets in a PCAP file. +""" + +import os +from pathlib import Path +from scapy.all import Packet, sniff, wrpcap +from scapy.layers.l2 import Ether, ARP +# Packet layers +from .layers.mac import anonymize_ether, anonymize_arp + + +### GLOBAL VARIABLES ### + +packets = [] + + + +### FUNCTIONS ### + +def recompute_checksums(packet: Packet) -> Packet: + """ + Recompute a given packet's checksums. + + Args: + packet (scapy.Packet): scapy packet to recompute checksums for + Returns: + (scapy.Packet): packet with recomputed checksums + """ + for layer_class in packet.layers(): + layer = packet.getlayer(layer_class) + try: + delattr(layer, "chksum") + except AttributeError: + pass + + return packet.__class__(bytes(packet)) + + +def anonymize_packet(packet: Packet) -> None: + """ + Anonymize a packet, + and append the anonymized packet to the global list 'packets'. + + Args: + packet: scapy packet to anonymize + """ + global packets + + # Anonymize MAC addresses + try: + anonymize_ether(packet.getlayer(Ether)) + except AttributeError: + pass + + # Anonymize MAC addresses in ARP packets + try: + anonymize_arp(packet.getlayer(ARP)) + except AttributeError: + pass + + # Recompute packet checksums + packet = recompute_checksums(packet) + + packets.append(packet) + + +def anonymize_pcap(input: os.PathLike, output: os.PathLike = None) -> None: + """ + Anonymize all packets in a PCAP file. + + Args: + input: path to the input PCAP file + output: path to the output PCAP file. + If None, create a new file having the same name as the input file with the suffix '.anonymized.pcap'. + """ + if output is None: + output = str(Path(input).with_suffix('.anonymized.pcap')) + + # Read and anonymize packets from the input file + sniff(offline=input, prn=anonymize_packet, store=False) + + # Write anonymized packets to the output file + wrpcap(output, packets)