diff --git a/pcap_anonymize/layers/http.py b/pcap_anonymize/layers/http.py new file mode 100644 index 0000000000000000000000000000000000000000..36cfbd482349d693befe15c892c3242abe41c206 --- /dev/null +++ b/pcap_anonymize/layers/http.py @@ -0,0 +1,72 @@ +""" +Anonymize HTTP packets. +""" + +from enum import Enum +from scapy.all import Packet, Raw +from scapy.layers.http import HTTP, HTTPRequest, HTTPResponse + + +ENCODING = "utf-8" + +class HttpFields(Enum): + """ + HTTP fields. + """ + METHOD = "Method" + PATH = "Path" + + +def get_http_layer(packet: Packet) -> HTTP: + """ + Get the HTTP layer from a packet. + + Args: + packet (scapy.Packet): packet to get the HTTP layer from + Returns: + (scapy.HTTP): HTTP layer + Raises: + AttributeError: if the HTTP layer could not be found in the packet + """ + ## Get HTTP layer directly + # HTTP Request + http = packet.getlayer(HTTPRequest) + if http is not None: + return http + # HTTP Response + http = packet.getlayer(HTTPResponse) + if http is not None: + return http + + # HTTP layer could not be retrieved directly. + # Try to get it from the Raw layer. + raw_load = packet.getlayer(Raw).getfieldval("load") + http = HTTPRequest(raw_load) + if http.haslayer(HTTPRequest): + return http + http = HTTPResponse(raw_load) + if http.haslayer(HTTPResponse): + return http + + raise AttributeError(f"HTTP layer not found in packet {packet.summary()}") + + +def anonymize_http(http: HTTP) -> None: + """ + Anonymize a packet's HTTP layer. + + Args: + http (scapy.HTTP): HTTP layer to anonymize + """ + # Remove request parameters + try: + path = http.getfieldval(HttpFields.PATH.value).decode(ENCODING) + http.setfieldval(HttpFields.PATH.value, path.split("?")[0].encode(ENCODING)) + except AttributeError: + # HTTP packet does not contain the `Path` field + pass + + # Remove all fields other than Method and Path + for field in http.fields.copy(): + if field != HttpFields.METHOD.value and field != HttpFields.PATH.value: + delattr(http, field) diff --git a/test/test_http.py b/test/test_http.py new file mode 100644 index 0000000000000000000000000000000000000000..d1464b4de3e69f27d04295a1a26dab4893dacd08 --- /dev/null +++ b/test/test_http.py @@ -0,0 +1,132 @@ +from scapy.layers.inet import TCP +from scapy.layers.http import HTTP, HTTPRequest, HTTPResponse +from pcap_anonymize.layers.http import ( + HttpFields, + get_http_layer, + anonymize_http +) + + +### TEST CONSTANTS ### + +ENCODING = "utf-8" + +http_request = HTTPRequest( + Method="GET", + Path="/index.html" +) +http_response = HTTPResponse( + Status_Code="200", + Reason_Phrase="OK" +) + + +### TEST FUNCTIONS ### + +def test_get_http_layer_request() -> None: + """ + Test the function `get_http_layer`, + with an HTTP Request packet. + """ + packet = TCP(dport=80) / http_request + http = get_http_layer(packet) + assert http == http_request + assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET" + assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html" + + +def test_get_http_layer_request_indirect() -> None: + """ + Test the function `get_http_layer`, + with an HTTP Request packet + which is not directly accessible by scapy. + """ + packet = TCP(dport=8800) / http_request + http = get_http_layer(packet) + assert isinstance(http, HTTPRequest) + assert http == http_request + assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET" + assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html" + + +def test_get_http_layer_response() -> None: + """ + Test the function `get_http_layer`, + with an HTTP Response packet. + """ + packet = TCP(dport=80) / http_response + http = get_http_layer(packet) + assert http == http_response + assert http.getfieldval("Status_Code").decode(ENCODING) == "200" + assert http.getfieldval("Reason_Phrase").decode(ENCODING) == "OK" + + +def test_get_http_layer_response_indirect() -> None: + """ + Test the function `get_http_layer`, + with an HTTP Response packet + which is not directly accessible by scapy. + """ + packet = TCP(dport=8800) / http_response + http = get_http_layer(packet) + assert isinstance(http, HTTPResponse) + assert http == http_response + assert http.getfieldval("Status_Code").decode(ENCODING) == "200" + assert http.getfieldval("Reason_Phrase").decode(ENCODING) == "OK" + + +def test_anonymize_http_request() -> None: + """ + Test the function `anonymize_http`, + with an HTTP Request packet. + """ + packet = TCP(dport=80) / http_request + http = get_http_layer(packet) + anonymize_http(http) + assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET" + assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html" + + # Ensure other fields have been deleted + for field in http.fields: + assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value + + +def test_anonymize_http_request_indirect() -> None: + """ + Test the function `anonymize_http`, + with an HTTP Request packet. + """ + packet = TCP(dport=8800) / http_request + http = get_http_layer(packet) + anonymize_http(http) + assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET" + assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html" + + # Ensure other fields have been deleted + for field in http.fields: + assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value + + +def test_anonymize_http_response() -> None: + """ + Test the function `anonymize_http`, + with an HTTP Response packet. + """ + packet = TCP(dport=80) / http_response + http = get_http_layer(packet) + anonymize_http(http) + for field in http.fields: + assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value + + +def test_anonymize_http_response_indirect() -> None: + """ + Test the function `anonymize_http`, + with an HTTP Response packet. + """ + packet = TCP(dport=8800) / http_response + http = get_http_layer(packet) + anonymize_http(http) + for field in http.fields: + assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value +