From c6a9ada40bf02c8089d312696722288248578dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20De=20Keersmaeker?= <francois.dekeersmaeker@uclouvain.be> Date: Wed, 18 Dec 2024 10:38:27 +0100 Subject: [PATCH] Added support for CoAP --- pcap_anonymize/layers/coap.py | 37 +++++++++++++++++++++++++++++++++++ test/test_coap.py | 23 ++++++++++++++++++++++ test/test_http.py | 2 +- 3 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 pcap_anonymize/layers/coap.py create mode 100644 test/test_coap.py diff --git a/pcap_anonymize/layers/coap.py b/pcap_anonymize/layers/coap.py new file mode 100644 index 0000000..fb53501 --- /dev/null +++ b/pcap_anonymize/layers/coap.py @@ -0,0 +1,37 @@ +""" +Anonymize CoAP packets. +""" + +from enum import Enum +from scapy.contrib.coap import CoAP + + +class CoapFields(Enum): + """ + CoAP fields. + """ + TYPE = "type" + CODE = "code" + OPTIONS = "options" + URI_PATH = "Uri-Path" + + +def anonymize_coap(coap: CoAP) -> None: + """ + Anonymize a packet's CoAP layer. + + Args: + coap (scapy.contrib.coap.CoAP): CoAP layer to anonymize + """ + # Remove all fields other than type and code + for field in coap.fields.copy(): + if field not in [f.value for f in CoapFields]: + delattr(coap, field) + + # Remove all options other than Uri-Path + options = coap.getfieldval(CoapFields.OPTIONS.value) + new_options = [] + for k, v in options: + if k == CoapFields.URI_PATH.value: + new_options.append((k, v)) + coap.setfieldval(CoapFields.OPTIONS.value, new_options) diff --git a/test/test_coap.py b/test/test_coap.py new file mode 100644 index 0000000..c9b584b --- /dev/null +++ b/test/test_coap.py @@ -0,0 +1,23 @@ +from scapy.contrib.coap import CoAP +from pcap_anonymize.layers.coap import CoapFields, anonymize_coap + + +### TEST FUNCTIONS ### + +def test_anonymize_coap() -> None: + # Build CoAP layer + options = [('Uri-Host', 'host'), ('Uri-Path', 'sensors'), ('Uri-Path', 'temperature')] + coap = CoAP(type=0, code=1, msg_id=0x1234, token=b"token", options=options) + + anonymize_coap(coap) + + # Check remaining fields + assert coap.type == 0 + assert coap.code == 1 + + # Ensure other fields have been deleted + for field in coap.fields: + assert field in [f.value for f in CoapFields] + for k, v in coap.getfieldval(CoapFields.OPTIONS.value): + assert k == CoapFields.URI_PATH.value + assert v in ["sensors", "temperature"] diff --git a/test/test_http.py b/test/test_http.py index d1464b4..2ac2ede 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -1,5 +1,5 @@ from scapy.layers.inet import TCP -from scapy.layers.http import HTTP, HTTPRequest, HTTPResponse +from scapy.layers.http import HTTPRequest, HTTPResponse from pcap_anonymize.layers.http import ( HttpFields, get_http_layer, -- GitLab