diff --git a/src/packet/DNS.py b/src/packet/DNS.py index 882c82f4022d0f946bad7bd0ac8a99e7bdef6219..7b2de66ab943bdca163ed71d4c025a6bf92d0e98 100644 --- a/src/packet/DNS.py +++ b/src/packet/DNS.py @@ -63,8 +63,7 @@ class DNS(Packet): :return: Dictionary containing tweak information. """ # Get field which will be modified - #field = random.choice(self.fields) - field = "qname" + field = random.choice(self.fields) # Get auxiliary fields qdcount = self.layer.getfieldval("qdcount") question_records = self.layer.getfieldval("qd") if qdcount > 0 else None diff --git a/src/packet/Packet.py b/src/packet/Packet.py index 36d1d14a782f4f4d581e7b5058517519d0f30463..1cab805db0ffb3a832c631314b5a5652cb7cdf04 100644 --- a/src/packet/Packet.py +++ b/src/packet/Packet.py @@ -1,5 +1,4 @@ from __future__ import annotations -from typing import Tuple import importlib import logging import string @@ -153,6 +152,25 @@ class Packet: return self.packet + def get_length(self) -> int: + """ + Get packet length. + + :return: Packet length. + """ + return len(self.packet) + + + def get_length_from_layer(self, layer: int | str) -> int: + """ + Get packet length, starting from a given layer. + + :param layer: Layer index or name. + :return: Packet length starting from the given layer. + """ + return len(self.packet.getlayer(layer)) + + def update_checksums(self) -> None: """ Update packet checksums, if needed. @@ -166,7 +184,7 @@ class Packet: transport_layer.delfieldval("len") if hasattr(transport_layer, "chksum"): transport_layer.delfieldval("chksum") - self.packet.show2(dump=True) + self.packet = self.packet.__class__(bytes(self.packet)) def get_dict_log(self, field: str, old_value: str, new_value: str) -> dict: diff --git a/src/pcap_tweaker.py b/src/pcap_tweaker.py index 1bcf8755b992825bf5ba1b36a993d0176a4f10c8..b4eea556c458ff7881faa9c4b02fa94d2e3b3979 100644 --- a/src/pcap_tweaker.py +++ b/src/pcap_tweaker.py @@ -29,6 +29,20 @@ def strictly_positive_int(value: any) -> int: if ivalue < 1: raise argparse.ArgumentTypeError(f"{value} does not represent a strictly positive integer.") return ivalue + + +def must_edit_packet(i: int, packet_numbers: list, random_range: int) -> bool: + """ + Check if a packet must be edited. + + :param i: packet number (starting from 1) + :param packet_numbers: list of packet numbers to edit + :param random_range: upper bound for random range (not included) + :return: True if packet must be edited, False otherwise + """ + is_specified = packet_numbers is not None and i in packet_numbers + is_random = packet_numbers is None and random.randrange(0, random_range) == 0 + return is_specified or is_random def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers: list = None, dry_run: bool = False) -> None: @@ -50,6 +64,7 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers: # Read input PCAP file packets = scapy.rdpcap(input_pcap) + new_packets = [] logging.info(f"Read input PCAP file: {input_pcap}") # Open log CSV file @@ -66,44 +81,29 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers: writer = csv.DictWriter(csv_file, fieldnames=field_names) writer.writeheader() - if packet_numbers is not None: - # Edit specific packets - for i in packet_numbers: - packet = packets[i - 1] # -1 because packet numbers start at 1 - try: - my_packet = Packet.init_packet(packet, i) - except ValueError: - # No supported protocol found in packet, skip it - pass - else: - d = my_packet.tweak() - if d is not None: - writer.writerow(d) - - else: - # Randomly edit packets - i = 1 - for packet in packets: - - # Choose randomly if we edit this packet - if random.randrange(0, random_range) != 0: - # Packet won't be edited - # Go to next packet - i += 1 - continue + i = 1 + for packet in packets: + if must_edit_packet(i, packet_numbers, random_range): # Edit packet, if possible try: my_packet = Packet.init_packet(packet, i) except ValueError: # No supported protocol found in packet, skip it + new_packets.append(packet) pass else: d = my_packet.tweak() + new_packets.append(my_packet.get_packet()) if d is not None: writer.writerow(d) finally: i += 1 + else: + # Packet won't be edited + # Go to next packet + i += 1 + new_packets.append(packet) # Write output PCAP file output_pcap = "" @@ -117,7 +117,7 @@ def tweak_pcaps(pcaps: list, output: str, random_range: int = 1, packet_numbers: if dry_run: logging.info(f"Dry run: did not write output PCAP file: {output_pcap}") else: - scapy.wrpcap(output_pcap, packets) + scapy.wrpcap(output_pcap, new_packets) logging.info(f"Wrote output PCAP file: {output_pcap}")