diff --git a/pcap_anonymize/__init__.py b/pcap_anonymize/__init__.py index 31f5153b0a64a9060ff01bb2dfc310494c964601..e070d14b13fb6a90f89507eebe0295ee7e92beb7 100644 --- a/pcap_anonymize/__init__.py +++ b/pcap_anonymize/__init__.py @@ -1 +1 @@ -from .pcap_anonymize import anonymize_pcap +from .pcap_anonymize import anonymize_pcap, anonymize_pcaps_in_dir diff --git a/pcap_anonymize/__main__.py b/pcap_anonymize/__main__.py index 9b740f61c139f3764d32a5d2fe931cb8280995b0..3c49610a7dbdee089f805778ea815ba619bae192 100644 --- a/pcap_anonymize/__main__.py +++ b/pcap_anonymize/__main__.py @@ -1,16 +1,19 @@ -import os import argparse -from .pcap_anonymize import anonymize_pcap +from .pcap_anonymize import anonymize_pcap, anonymize_pcaps_in_dir ### MAIN FUNCTION ### def main() -> None: parser = argparse.ArgumentParser(description="Anonymize a PCAP traffic capture.") - parser.add_argument("input", type=str, help="Path to the input PCAP file.") + parser.add_argument("-i", "--input", type=str, help="Path to the input PCAP file.") parser.add_argument("-o", "--output", type=str, help="Path to the output PCAP file.") + parser.add_argument("-d", "--dir", type=str, help="Path to the directory containing the input PCAP files.") args = parser.parse_args() - anonymize_pcap(args.input, args.output) + if args.dir: + anonymize_pcaps_in_dir(args.dir) + else: + anonymize_pcap(args.input, args.output) ### ENTRY POINT ### diff --git a/pcap_anonymize/pcap_anonymize.py b/pcap_anonymize/pcap_anonymize.py index 81c78cc2c661483ab44899b6bb6934785197643d..571fdf489cda5461e32906a4aa0412c39749a30a 100644 --- a/pcap_anonymize/pcap_anonymize.py +++ b/pcap_anonymize/pcap_anonymize.py @@ -3,6 +3,7 @@ Anonymize all packets in a PCAP file. """ import os +import glob from pathlib import Path from scapy.all import Packet, sniff, wrpcap # Packet layers @@ -69,13 +70,34 @@ def anonymize_pcap(input: os.PathLike, output: os.PathLike = None) -> None: Args: input: path to the input PCAP file output: path to the output PCAP file. - If None, create a new file having the same name as the input file with the suffix '.anonymized.pcap'. + If None, create a new file having the same name as the input file with the suffix '.anon.pcap'. """ + global packets + if output is None: - output = str(Path(input).with_suffix('.anonymized.pcap')) + output = str(Path(input).with_suffix(".anon.pcap")) # Read and anonymize packets from the input file sniff(offline=input, prn=anonymize_packet, store=False) # Write anonymized packets to the output file wrpcap(output, packets) + + # Reset global packets list + packets = [] + + +def anonymize_pcaps_in_dir(dir: os.PathLike) -> None: + """ + Anonymize all PCAP files in a directory. + + Args: + dir: path to the directory containing the PCAP files + """ + for pcap_file in glob.glob(os.path.join(dir, "*.pcap")): + + # Skip traces already anonymized + if pcap_file.endswith(".anon.pcap"): + continue + + anonymize_pcap(pcap_file, None) diff --git a/test/test_traces.py b/test/test_traces.py new file mode 100644 index 0000000000000000000000000000000000000000..0af8929ac9b9427187afd7ead1debcfab2584dcd --- /dev/null +++ b/test/test_traces.py @@ -0,0 +1,73 @@ +""" +Test the package with PCAP traces. +""" + +import os +import glob +from pathlib import Path +from pcap_anonymize import anonymize_pcap, anonymize_pcaps_in_dir + + +### TEST CONSTANTS ### + +dir_self = os.path.dirname(os.path.abspath(__file__)) +dir_traces = os.path.join(dir_self, "traces") + + +### TEST FUNCTIONS ### + +def test_anonymize_pcap_http(tmp_path: Path) -> None: + """ + Test the package with a PCAP trace containing HTTP packets. + + Args: + tmp_path (pathlib.Path): temporary directory to store the output traces + """ + input = os.path.join(dir_traces, "http.pcap") + output = os.path.join(tmp_path, "http.anon.pcap") + anonymize_pcap(input, output) + assert os.path.exists(input) + assert os.path.exists(output) + + +def test_anonymize_pcap_dhcp(tmp_path: Path) -> None: + """ + Test the package with a PCAP trace containing DHCP packets. + + Args: + tmp_path (pathlib.Path): temporary directory to store the output traces + """ + input = os.path.join(dir_traces, "dhcp.pcap") + output = os.path.join(tmp_path, "dhcp.anon.pcap") + anonymize_pcap(input, output) + assert os.path.exists(input) + assert os.path.exists(output) + + +def test_anonymize_pcap_tplink(tmp_path: Path) -> None: + """ + Test the package with a PCAP trace containing + TP-Link Smart Home protocol packets. + + Args: + tmp_path (pathlib.Path): temporary directory to store the output traces + """ + input = os.path.join(dir_traces, "tplink.pcap") + output = os.path.join(tmp_path, "tplink.anon.pcap") + anonymize_pcap(input, output) + assert os.path.exists(input) + assert os.path.exists(output) + + +def test_anonymize_pcaps_in_dir() -> None: + """ + Test the package with a directory containing PCAP traces. + """ + input_traces = glob.glob(os.path.join(dir_traces, "*.pcap")) + anonymize_pcaps_in_dir(dir_traces) + output_traces = glob.glob(os.path.join(dir_traces, "*.anon.pcap")) + assert len(output_traces) == len(input_traces) + + # Clean up + for output_trace in output_traces: + os.remove(output_trace) diff --git a/test/traces/.gitignore b/test/traces/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..cac23be0aa7d3ec968237dedc3dbc221b8d49b44 --- /dev/null +++ b/test/traces/.gitignore @@ -0,0 +1,2 @@ +# Anonymized PCAP files +*.anon.pcap diff --git a/test/traces/dhcp.pcap b/test/traces/dhcp.pcap new file mode 100644 index 0000000000000000000000000000000000000000..4ed3d9b4d63dd4dd23a473d19b3f1719f9b195b4 Binary files /dev/null and b/test/traces/dhcp.pcap differ diff --git a/test/traces/http.pcap b/test/traces/http.pcap new file mode 100644 index 0000000000000000000000000000000000000000..98a6d762add2eb51490554e5f63185ef9b471df0 Binary files /dev/null and b/test/traces/http.pcap differ diff --git a/test/traces/tplink.pcap b/test/traces/tplink.pcap new file mode 100644 index 0000000000000000000000000000000000000000..aeed1c0e7971a4c2a639a1e4d98d06694c712b2e Binary files /dev/null and b/test/traces/tplink.pcap differ