Skip to content
Extraits de code Groupes Projets
Valider e3a6249e rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

Fixed packet length bug

parent 4150103d
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
......@@ -63,8 +63,7 @@ class DNS(Packet):
:return: Dictionary containing tweak information.
"""
# Get field which will be modified
#field = random.choice(self.fields)
field = "qname"
field = random.choice(self.fields)
# Get auxiliary fields
qdcount = self.layer.getfieldval("qdcount")
question_records = self.layer.getfieldval("qd") if qdcount > 0 else None
......
from __future__ import annotations
from typing import Tuple
import importlib
import logging
import string
......@@ -153,6 +152,25 @@ class Packet:
return self.packet
def get_length(self) -> int:
"""
Get packet length.
:return: Packet length.
"""
return len(self.packet)
def get_length_from_layer(self, layer: int | str) -> int:
"""
Get packet length, starting from a given layer.
:param layer: Layer index or name.
:return: Packet length starting from the given layer.
"""
return len(self.packet.getlayer(layer))
def update_checksums(self) -> None:
"""
Update packet checksums, if needed.
......@@ -166,7 +184,7 @@ class Packet:
transport_layer.delfieldval("len")
if hasattr(transport_layer, "chksum"):
transport_layer.delfieldval("chksum")
self.packet.show2(dump=True)
self.packet = self.packet.__class__(bytes(self.packet))
def get_dict_log(self, field: str, old_value: str, new_value: str) -> dict:
......
......@@ -29,6 +29,20 @@ def strictly_positive_int(value: any) -> int:
if ivalue < 1:
raise argparse.ArgumentTypeError(f"{value} does not represent a strictly positive integer.")
return ivalue
def must_edit_packet(i: int, packet_numbers: list, random_range: int) -> bool:
"""
Check if a packet must be edited.
:param i: packet number (starting from 1)
:param packet_numbers: list of packet numbers to edit
:param random_range: upper bound for random range (not included)
:return: True if packet must be edited, False otherwise
"""
is_specified = packet_numbers is not None and i in packet_numbers
is_random = packet_numbers is None and random.randrange(0, random_range) == 0
return is_specified or is_random
def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers: list = None, dry_run: bool = False) -> None:
......@@ -50,6 +64,7 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers:
# Read input PCAP file
packets = scapy.rdpcap(input_pcap)
new_packets = []
logging.info(f"Read input PCAP file: {input_pcap}")
# Open log CSV file
......@@ -66,44 +81,29 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers:
writer = csv.DictWriter(csv_file, fieldnames=field_names)
writer.writeheader()
if packet_numbers is not None:
# Edit specific packets
for i in packet_numbers:
packet = packets[i - 1] # -1 because packet numbers start at 1
try:
my_packet = Packet.init_packet(packet, i)
except ValueError:
# No supported protocol found in packet, skip it
pass
else:
d = my_packet.tweak()
if d is not None:
writer.writerow(d)
else:
# Randomly edit packets
i = 1
for packet in packets:
# Choose randomly if we edit this packet
if random.randrange(0, random_range) != 0:
# Packet won't be edited
# Go to next packet
i += 1
continue
i = 1
for packet in packets:
if must_edit_packet(i, packet_numbers, random_range):
# Edit packet, if possible
try:
my_packet = Packet.init_packet(packet, i)
except ValueError:
# No supported protocol found in packet, skip it
new_packets.append(packet)
pass
else:
d = my_packet.tweak()
new_packets.append(my_packet.get_packet())
if d is not None:
writer.writerow(d)
finally:
i += 1
else:
# Packet won't be edited
# Go to next packet
i += 1
new_packets.append(packet)
# Write output PCAP file
output_pcap = ""
......@@ -117,7 +117,7 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers:
if dry_run:
logging.info(f"Dry run: did not write output PCAP file: {output_pcap}")
else:
scapy.wrpcap(output_pcap, packets)
scapy.wrpcap(output_pcap, new_packets)
logging.info(f"Wrote output PCAP file: {output_pcap}")
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter