Skip to content
Extraits de code Groupes Projets
Valider 35bb2fcd rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

App layer: HTTP

parent 96a7b890
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
Pipeline #60590 annulé
"""
Anonymize HTTP packets.
"""
from enum import Enum
from scapy.all import Packet, Raw
from scapy.layers.http import HTTP, HTTPRequest, HTTPResponse
ENCODING = "utf-8"
class HttpFields(Enum):
"""
HTTP fields.
"""
METHOD = "Method"
PATH = "Path"
def get_http_layer(packet: Packet) -> HTTP:
"""
Get the HTTP layer from a packet.
Args:
packet (scapy.Packet): packet to get the HTTP layer from
Returns:
(scapy.HTTP): HTTP layer
Raises:
AttributeError: if the HTTP layer could not be found in the packet
"""
## Get HTTP layer directly
# HTTP Request
http = packet.getlayer(HTTPRequest)
if http is not None:
return http
# HTTP Response
http = packet.getlayer(HTTPResponse)
if http is not None:
return http
# HTTP layer could not be retrieved directly.
# Try to get it from the Raw layer.
raw_load = packet.getlayer(Raw).getfieldval("load")
http = HTTPRequest(raw_load)
if http.haslayer(HTTPRequest):
return http
http = HTTPResponse(raw_load)
if http.haslayer(HTTPResponse):
return http
raise AttributeError(f"HTTP layer not found in packet {packet.summary()}")
def anonymize_http(http: HTTP) -> None:
"""
Anonymize a packet's HTTP layer.
Args:
http (scapy.HTTP): HTTP layer to anonymize
"""
# Remove request parameters
try:
path = http.getfieldval(HttpFields.PATH.value).decode(ENCODING)
http.setfieldval(HttpFields.PATH.value, path.split("?")[0].encode(ENCODING))
except AttributeError:
# HTTP packet does not contain the `Path` field
pass
# Remove all fields other than Method and Path
for field in http.fields.copy():
if field != HttpFields.METHOD.value and field != HttpFields.PATH.value:
delattr(http, field)
from scapy.layers.inet import TCP
from scapy.layers.http import HTTP, HTTPRequest, HTTPResponse
from pcap_anonymize.layers.http import (
HttpFields,
get_http_layer,
anonymize_http
)
### TEST CONSTANTS ###
ENCODING = "utf-8"
http_request = HTTPRequest(
Method="GET",
Path="/index.html"
)
http_response = HTTPResponse(
Status_Code="200",
Reason_Phrase="OK"
)
### TEST FUNCTIONS ###
def test_get_http_layer_request() -> None:
"""
Test the function `get_http_layer`,
with an HTTP Request packet.
"""
packet = TCP(dport=80) / http_request
http = get_http_layer(packet)
assert http == http_request
assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET"
assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html"
def test_get_http_layer_request_indirect() -> None:
"""
Test the function `get_http_layer`,
with an HTTP Request packet
which is not directly accessible by scapy.
"""
packet = TCP(dport=8800) / http_request
http = get_http_layer(packet)
assert isinstance(http, HTTPRequest)
assert http == http_request
assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET"
assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html"
def test_get_http_layer_response() -> None:
"""
Test the function `get_http_layer`,
with an HTTP Response packet.
"""
packet = TCP(dport=80) / http_response
http = get_http_layer(packet)
assert http == http_response
assert http.getfieldval("Status_Code").decode(ENCODING) == "200"
assert http.getfieldval("Reason_Phrase").decode(ENCODING) == "OK"
def test_get_http_layer_response_indirect() -> None:
"""
Test the function `get_http_layer`,
with an HTTP Response packet
which is not directly accessible by scapy.
"""
packet = TCP(dport=8800) / http_response
http = get_http_layer(packet)
assert isinstance(http, HTTPResponse)
assert http == http_response
assert http.getfieldval("Status_Code").decode(ENCODING) == "200"
assert http.getfieldval("Reason_Phrase").decode(ENCODING) == "OK"
def test_anonymize_http_request() -> None:
"""
Test the function `anonymize_http`,
with an HTTP Request packet.
"""
packet = TCP(dport=80) / http_request
http = get_http_layer(packet)
anonymize_http(http)
assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET"
assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html"
# Ensure other fields have been deleted
for field in http.fields:
assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value
def test_anonymize_http_request_indirect() -> None:
"""
Test the function `anonymize_http`,
with an HTTP Request packet.
"""
packet = TCP(dport=8800) / http_request
http = get_http_layer(packet)
anonymize_http(http)
assert http.getfieldval(HttpFields.METHOD.value).decode(ENCODING) == "GET"
assert http.getfieldval(HttpFields.PATH.value).decode(ENCODING) == "/index.html"
# Ensure other fields have been deleted
for field in http.fields:
assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value
def test_anonymize_http_response() -> None:
"""
Test the function `anonymize_http`,
with an HTTP Response packet.
"""
packet = TCP(dport=80) / http_response
http = get_http_layer(packet)
anonymize_http(http)
for field in http.fields:
assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value
def test_anonymize_http_response_indirect() -> None:
"""
Test the function `anonymize_http`,
with an HTTP Response packet.
"""
packet = TCP(dport=8800) / http_response
http = get_http_layer(packet)
anonymize_http(http)
for field in http.fields:
assert field == HttpFields.METHOD.value or field == HttpFields.PATH.value
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter