diff --git a/src/translator/protocols/Custom.py b/src/translator/protocols/Custom.py deleted file mode 100644 index 93f36495ebb469040b9db295f65a936c51820491..0000000000000000000000000000000000000000 --- a/src/translator/protocols/Custom.py +++ /dev/null @@ -1,76 +0,0 @@ -from protocols.Protocol import Protocol - -class Custom(Protocol): - - # Class variables - custom_parser = True # Whether the protocol has a custom parser - - - @staticmethod - def build_nfq_list_match(l: list, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> dict: - """ - Produce a nfqueue match for a list of values. - - :param l: List of values. - :param template_rules: Dictionary containing the protocol-specific rules to add. - :param is_backward: Whether the field to add is for a backward rule. - :param func: Function to apply to the field value before writing it. - Optional, default is the identity function. - :param backward_func: Function to apply to the field value in the case of a backwards rule. - Will be applied after `func`. - Optional, default is the identity function. - """ - template = [] - match = [] - # Value is a list - for v in l: - if not is_backward: - template.append(template_rules["forward"]) - match.append(func(v)) - elif is_backward and "backward" in template_rules: - template.append(template_rules["backward"]) - match.append(backward_func(func(v))) - return {"template": template, "match": match} - - - def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None: - """ - Add a new nfqueue match to the accumulator. - Overrides the nftables version. - - :param field: Field to add the rule for. - :param template_rules: Dictionary containing the protocol-specific rules to add. - :param is_backward: Whether the field to add is for a backward rule. - :param func: Function to apply to the field value before writing it. - Optional, default is the identity function. - :param backward_func: Function to apply to the field value in the case of a backwards rule. - Will be applied after `func`. - Optional, default is the identity function. - Args: - field (str): Field to add the rule for. - template_rules (dict): Dictionary containing the protocol-specific rules to add. - is_backward (bool): Whether the field to add is for a backward rule. - func (lambda): Function to apply to the field value before writing it. - Optional, default is the identity function. - backward_func (lambda): Function to apply to the field value in the case of a backwards rule. - Will be applied after `func`. - Optional, default is the identity function. - """ - if field in self.protocol_data: - value = self.protocol_data[field] - rules = {} - - # If value from YAML profile is a list, produce disjunction of all elements - if type(value) == list: - rules = Custom.build_nfq_list_match(value, template_rules, is_backward, func, backward_func) - else: - # Value is a single element - value = Protocol.convert_value(value) - if not is_backward: - rules = {"template": template_rules["forward"], "match": func(value)} - elif is_backward and "backward" in template_rules: - rules = {"template": template_rules["backward"], "match": backward_func(func(value))} - - # Append rules - if rules: - self.rules["nfq"].append(rules) diff --git a/src/translator/protocols/Protocol.py b/src/translator/protocols/Protocol.py deleted file mode 100644 index 36587a3a6d84e4f0ac5b5b6a87f5ea9de2eed911..0000000000000000000000000000000000000000 --- a/src/translator/protocols/Protocol.py +++ /dev/null @@ -1,132 +0,0 @@ -from __future__ import annotations -from typing import Union -import importlib - -class Protocol: - """ - Generic protocol, inherited by all concrete - """ - - - def __init__(self, protocol_data: dict, device: dict) -> None: - """ - Generic protocol constructor. - - :param protocol_data: Dictionary containing the protocol data. - :param device: Dictionary containing the device metadata. - """ - self.protocol_data = protocol_data - self.device = device - self.rules = { - "nft": [], - "nfq": [] - } - - - @staticmethod - def convert_value(value: str) -> Union[str, int]: - """ - Convert a string value to an int if possible. - - :param value: Value to convert. - :return: Converted value as int if possible, or the original string value otherwise. - """ - try: - result = int(value) - except ValueError: - result = value - return result - - - @classmethod - def init_protocol(c, protocol_name: str, protocol_data: dict, device: dict) -> Protocol: - """ - Factory method for a specific protocol. - - :param protocol_name: Name of the protocol. - :param protocol_data: Dictionary containing the protocol data. - :param device: Dictionary containing the device metadata. - """ - module = importlib.import_module(f"protocols.{protocol_name}") - cls = getattr(module, protocol_name) - return cls(protocol_data, device) - - - def format_list(self, l: list, func = lambda x: x) -> str: - """ - Format a list of values. - - :param l: List of values. - :param func: Function to apply to each value. - Optional, default is the identity function. - :return: Formatted list. - """ - value = "{ " - for i in range(len(l)): - if i != 0: - value += ", " - value += str(func(l[i])) - value += " }" - return value - - - def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None: - """ - Add a new nftables rule to the nftables rules accumulator. - - :param field: Field to add the rule for. - :param template_rules: Dictionary containing the protocol-specific rules to add. - :param is_backward (optional): Whether the field to add is for a backward rule. - Optional, default is `False`. - :param func (optional): Function to apply to the field value before writing it. - Optional, default is the identity function. - :param backward_func (optional): a to apply to the field value in the case of a backwards rule. - Will be applied after the forward function. - Optional, default is the identity function. - Args: - field (str): Field to add the rule for. - rules (dict): Dictionary containing the protocol-specific rules to add. - is_backward (bool): Whether the field to add is for a backward rule. - Optional, default is `False`. - func (lambda): Function to apply to the field value before writing it. - Optional, default is the identity function. - backward_func (lambda): a to apply to the field value in the case of a backwards rule. - Will be applied after the forward function. - Optional, default is the identity function. - """ - if self.protocol_data is not None and field in self.protocol_data: - value = self.protocol_data[field] - - # If value from YAML profile is a list, add each element - if type(value) == list: - # Value is a list - value = self.format_list(value, func) - else: - # Value is a single element - value = func(value) - - # Build rule - rule = {} - value = Protocol.convert_value(value) - if not is_backward: - rule = {"template": template_rules["forward"], "match": value} - elif is_backward and "backward" in template_rules: - rule = {"template": template_rules["backward"], "match": backward_func(value)} - - # Add rule to the list of rules - if rule: - self.rules["nft"].append(rule) - - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Default parsing method. - Must be updated in the children class. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - return self.rules diff --git a/src/translator/protocols/Transport.py b/src/translator/protocols/Transport.py deleted file mode 100644 index 321a1d32c9bab12e8066481452a9e519ddb0559b..0000000000000000000000000000000000000000 --- a/src/translator/protocols/Transport.py +++ /dev/null @@ -1,57 +0,0 @@ -from protocols.Protocol import Protocol - -class Transport(Protocol): - - # Class variables - layer = 4 # Protocol OSI layer - custom_parser = False # Whether the protocol has a custom parser - - # Supported keys in YAML profile - supported_keys = [ - "src-port", - "dst-port" - ] - - def parse(self, is_backward: bool = False, initiator: str = "") -> dict: - """ - Parse a layer 4 protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Add protocol match - protocol_match = { - "template": "meta l4proto {}", - "match": self.protocol_name - } - self.rules["nft"].append(protocol_match) - - # Connection initiator is specified - if initiator: - # Template rules - template_rules = { - "src-port": {"forward": "ct original proto-src {}", "backward": "ct original proto-dst {}"}, - "dst-port": {"forward": "ct original proto-dst {}", "backward": "ct original proto-src {}"} - } - if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward): - # Connection initiator is the source device - self.add_field("src-port", template_rules["src-port"], is_backward) - self.add_field("dst-port", template_rules["dst-port"], is_backward) - elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward): - # Connection initiator is the destination device - self.add_field("src-port", template_rules["dst-port"], is_backward) - self.add_field("dst-port", template_rules["src-port"], is_backward) - - # Connection initiator is not specified - else: - # Handle source port - rules = {"forward": self.protocol_name + " sport {}", "backward": self.protocol_name + " dport {}"} - self.add_field("src-port", rules, is_backward) - # Handle destination port - rules = {"forward": self.protocol_name + " dport {}", "backward": self.protocol_name + " sport {}"} - self.add_field("dst-port", rules, is_backward) - - return self.rules diff --git a/src/translator/protocols/arp.py b/src/translator/protocols/arp.py deleted file mode 100644 index d5a6a21b7a56f602d9fcad146cfa538749f2671e..0000000000000000000000000000000000000000 --- a/src/translator/protocols/arp.py +++ /dev/null @@ -1,95 +0,0 @@ -from protocols.Protocol import Protocol - -class arp(Protocol): - - # Class variables - protocol_name = "arp" # Protocol name - layer = 3 # Protocol OSI layer - custom_parser = False # Whether the protocol has a custom parser - - # Supported keys in YAML profile - supported_keys = [ - "type", # ARP message type - "sha", # ARP source hardware address - "tha", # ARP target hardware address - "spa", # ARP source protocol address - "tpa" # ARP target protocol address - ] - - # Well-known addresses - mac_addrs = { - "gateway": "c0:56:27:73:46:0b", - "default": "00:00:00:00:00:00", - "broadcast": "ff:ff:ff:ff:ff:ff", - "phone": "3c:cd:5d:a2:a9:d7" - } - ip_addrs = { - "local": "192.168.1.0/24", - "gateway": "192.168.1.1", - "phone": "192.168.1.222" - } - - - def explicit_address(self, addr: str, type: str = "ipv4") -> str: - """ - Return the explicit version of an IPv4 or MAC address alias. - Example: "local" -> "192.168.0.0/16" - - :param addr: IPv4 or MAC address alias to explicit. - :param type: Type of address (ipv4 or mac). - :return: Explicit IPv4 or MAC address. - :raises ValueError: If the address is not a well-known alias or an explicit address. - """ - if addr == "self": - # Address is "self" - return self.device[type] - - # Address is not "self" - - # Get dictionary of well-known addresses, based on type - addrs = None - if type == "ipv4": - addrs = self.ip_addrs - elif type == "mac": - addrs = self.mac_addrs - - if addr in addrs: - # Address is a well-known address alias - return addrs[addr] - else: - # Address is an explicit address - return addr - - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the ARP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Lambda function to explicit a self or a well-known MAC address - func_mac = lambda mac: self.device['mac'] if mac == "self" else ( self.mac_addrs[mac] if mac in self.mac_addrs else mac ) - # Lambda function to explicit a self or a well-known IPv4 address - func_ip = lambda ip: self.device['ipv4'] if ip == "self" else ( self.ip_addrs[ip] if ip in self.ip_addrs else ip ) - # Handle ARP message type - rules = {"forward": "arp operation {}", "backward": "arp operation {}"} - # Lambda function to flip the ARP type (for the backward rule) - backward_func = lambda arp_type: "reply" if arp_type == "request" else ( "request" if arp_type == "reply" else arp_type ) - self.add_field("type", rules, is_backward, backward_func=backward_func) - # Handle ARP source hardware address - rules = {"forward": "arp saddr ether {}", "backward": "arp daddr ether {}"} - self.add_field("sha", rules, is_backward, func_mac) - # Handle ARP target hardware address - rules = {"forward": "arp daddr ether {}", "backward": "arp saddr ether {}"} - self.add_field("tha", rules, is_backward, func_mac) - # Handle ARP source protocol address - rules = {"forward": "arp saddr ip {}", "backward": "arp daddr ip {}"} - self.add_field("spa", rules, is_backward, func_ip) - # Handle ARP target protocol address - rules = {"forward": "arp daddr ip {}", "backward": "arp saddr ip {}"} - self.add_field("tpa", rules, is_backward, func_ip) - return self.rules diff --git a/src/translator/protocols/coap.py b/src/translator/protocols/coap.py deleted file mode 100644 index 20e32926d9948286a82cdb998e6f0de1f5bd38f1..0000000000000000000000000000000000000000 --- a/src/translator/protocols/coap.py +++ /dev/null @@ -1,42 +0,0 @@ -from protocols.Custom import Custom - -class coap(Custom): - - # Class variables - layer = 7 # Protocol OSI layer - protocol_name = "coap" # Protocol name - - # Supported keys in YAML profile - supported_keys = [ - "type", - "method", - "uri" - ] - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the CoAP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Lambda functions to convert a CoAP type or method to its C representation (upper case and separated by underscores) - func_coap_type = lambda type: f"COAP_{type.upper().replace('-', '_')}" - func_coap_method = lambda method: f"HTTP_{method.upper().replace('-', '_')}" - - # Handle CoAP message type - rule = {"forward": "coap_message.type == {}"} - self.add_field("type", rule, is_backward, func_coap_type) - - # Handle CoAP method - rule = {"forward": "coap_message.method == {}"} - self.add_field("method", rule, is_backward, func_coap_method) - - # Handle CoAP URI - rule = {"forward": "strcmp(coap_message.uri, \"{}\") == 0"} - self.add_field("uri", rule, is_backward) - - return self.rules diff --git a/src/translator/protocols/dhcp.py b/src/translator/protocols/dhcp.py deleted file mode 100644 index 719238cd8f77b1c8735673c6df763f4443f8fad9..0000000000000000000000000000000000000000 --- a/src/translator/protocols/dhcp.py +++ /dev/null @@ -1,35 +0,0 @@ -from protocols.Custom import Custom - -class dhcp(Custom): - - # Class variables - layer = 7 # Protocol OSI layer - protocol_name = "dhcp" # Protocol name - - # Supported keys in YAML profile - supported_keys = [ - "type", - "client-mac" - ] - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the DHCP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Handle DHCP message type - rules = {"forward": "dhcp_message.options.message_type == {}"} - # Lambda function to convert a DHCP type to its C representation (upper case) - func = lambda dhcp_type: f"DHCP_{dhcp_type.upper()}" - self.add_field("type", rules, is_backward, func) - # Handle DHCP client MAC address - rules = {"forward": "strcmp(mac_hex_to_str(dhcp_message.chaddr), \"{}\") == 0"} - # Lambda function to explicit a self MAC address - func = lambda mac: self.device['mac'] if mac == "self" else mac - self.add_field("client-mac", rules, is_backward, func) - return self.rules diff --git a/src/translator/protocols/dns.py b/src/translator/protocols/dns.py deleted file mode 100644 index 1b73f5d2901d9a7505b9714f778a5ffd1d53850e..0000000000000000000000000000000000000000 --- a/src/translator/protocols/dns.py +++ /dev/null @@ -1,85 +0,0 @@ -from protocols.Custom import Custom - -class dns(Custom): - - # Class variables - layer = 7 # Protocol OSI layer - protocol_name = "dns" # Protocol name - WILDCARD = "$" # Wildcard character for domain names - - # Supported keys in YAML profile - supported_keys = [ - "type", # DNS query type - "domain-name" # DNS domain name - ] - - - @staticmethod - def get_domain_name_rule(domain_name: str) -> dict: - """ - Retrieves the NFQueue rule to match a given domain name. - - :param domain_name: Domain name to match. - :return: Dictionary containing the NFQueue rule to match the given domain name. - """ - if domain_name.startswith(dns.WILDCARD): - suffix = domain_name[len(dns.WILDCARD):] - return { - "template": f"dns_contains_suffix_domain_name(dns_message.questions, dns_message.header.qdcount, \"{{}}\", {len(suffix)})", - "match": suffix - } - else: - return { - "template": "dns_contains_full_domain_name(dns_message.questions, dns_message.header.qdcount, \"{}\")", - "match": domain_name - } - - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the DNS protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Handle QR flag - qr_rules = {} - if "response" in self.protocol_data and self.protocol_data["response"]: - if is_backward: - qr_rules = {"template": "dns_message.header.qr == {}", "match": 0} - else: - qr_rules = {"template": "dns_message.header.qr == {}", "match": 1} - else: - if is_backward: - qr_rules = {"template": "dns_message.header.qr == {}", "match": 1} - else: - qr_rules = {"template": "dns_message.header.qr == {}", "match": 0} - self.rules["nfq"].append(qr_rules) - - # Handle DNS query type - rule = "( dns_message.header.qdcount > 0 && dns_message.questions->qtype == {} )" - # Lambda function to convert an DNS query type to its C representation (upper case) - func = lambda dns_qtype: dns_qtype.upper() - rules = {"forward": rule, "backward": rule} - self.add_field("qtype", rules, is_backward, func) - - # Handle DNS domain name - domain_name = self.protocol_data.get("domain-name", None) - if domain_name is not None: - domain_name_rule = {} - if isinstance(domain_name, list): - template = [] - match = [] - for dname in domain_name: - single_rule = dns.get_domain_name_rule(dname) - template.append(single_rule["template"]) - match.append(single_rule["match"]) - domain_name_rule = {"template": template, "match": match} - else: - domain_name_rule = dns.get_domain_name_rule(domain_name) - self.rules["nfq"].append(domain_name_rule) - - return self.rules diff --git a/src/translator/protocols/http.py b/src/translator/protocols/http.py deleted file mode 100644 index e75c375f668d5b20f45ec3dac794e34f38c1e79f..0000000000000000000000000000000000000000 --- a/src/translator/protocols/http.py +++ /dev/null @@ -1,56 +0,0 @@ -from protocols.Custom import Custom - -class http(Custom): - - # Class variables - layer = 7 # Protocol OSI layer - protocol_name = "http" # Protocol name - - # Supported keys in YAML profile - supported_keys = [ - "method", - "uri", - "response" - ] - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the HTTP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Request or response - http_type_rule = {} - if "response" in self.protocol_data and self.protocol_data["response"]: - if is_backward: - http_type_rule = {"template": "{}http_message.is_request", "match": ""} - else: - http_type_rule = {"template": "{}http_message.is_request", "match": "!"} - else: - if is_backward: - http_type_rule = {"template": "{}http_message.is_request", "match": "!"} - else: - http_type_rule = {"template": "{}http_message.is_request", "match": ""} - self.rules["nfq"].append(http_type_rule) - - # Handle HTTP method - rule = {"forward": "http_message.method == {}"} - # Lambda function to convert an HTTP method to its C representation (upper case) - func = lambda http_method: f"HTTP_{http_method.upper()}" - self.add_field("method", rule, is_backward, func) - - # Handle HTTP URI - # URI can take two forms: - # - Complete URI: exact string match - # - URI prefix: string match with the beginning of the URI - uri = self.protocol_data.get("uri", None) - if uri is not None: - length = len(uri) - 1 if uri.endswith("*") or uri.endswith("$") else len(uri) + 1 - rule = {"forward": f"strncmp(http_message.uri, \"{{}}\", {length}) == 0"} - self.add_field("uri", rule, is_backward) - - return self.rules diff --git a/src/translator/protocols/icmp.py b/src/translator/protocols/icmp.py deleted file mode 100644 index 7cab213ddaa0a831bd7cac068198d4903b7f7bbc..0000000000000000000000000000000000000000 --- a/src/translator/protocols/icmp.py +++ /dev/null @@ -1,40 +0,0 @@ -from protocols.Protocol import Protocol - -class icmp(Protocol): - - # Class variables - layer = 4 # Protocol OSI layer - protocol_name = "icmp" # Protocol name - l4proto = 1 # Layer 4 protocol number - custom_parser = False # Whether the protocol has a custom parser - - # Supported keys in YAML profile - supported_keys = [ - "type" # ICMP message type - ] - - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the ICMP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Add protocol match - protocol_match = { - "template": "meta l4proto {}", - "match": self.l4proto - } - self.rules["nft"].append(protocol_match) - - # Handle ICMP message type - icmp_rule = f"{self.protocol_name} type {{}}" - rules = {"forward": icmp_rule, "backward": icmp_rule} - # Lambda function to flip the ICMP type (for the backward rule) - backward_func = lambda icmp_type: icmp_type.replace("request", "reply") if "request" in icmp_type else ( icmp_type.replace("reply", "request") if "reply" in icmp_type else icmp_type ) - self.add_field("type", rules, is_backward, backward_func=backward_func) - return self.rules diff --git a/src/translator/protocols/icmpv6.py b/src/translator/protocols/icmpv6.py deleted file mode 100644 index 851fe0201c7dfddb418954473c3158a9bda1bf6a..0000000000000000000000000000000000000000 --- a/src/translator/protocols/icmpv6.py +++ /dev/null @@ -1,41 +0,0 @@ -from protocols.Protocol import Protocol - -class icmpv6(Protocol): - - # Class variables - layer = 4 # Protocol OSI layer - protocol_name = "icmpv6" # Protocol name - l4proto = 58 # Layer 4 protocol number - custom_parser = False # Whether the protocol has a custom parser - - # IPv6 multicast groups - groups = { - "multicast": "ff02::/16", - "all-nodes": "ff02::1", - "all-routers": "ff02::2", - "all-mldv2-routers": "ff02::16", - "mdns": "ff02::fb", - "coap": "ff02::158" - } - - # Supported keys in YAML profile - # For now, no support for ICMPv6 options, as the router does not support them - supported_keys = [] - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the ICMP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Add protocol match - protocol_match = { - "template": "meta l4proto {}", - "match": self.l4proto - } - self.rules["nft"].append(protocol_match) - return self.rules diff --git a/src/translator/protocols/igmp.py b/src/translator/protocols/igmp.py deleted file mode 100644 index e2e422659fa869d4ea347ff0a1309d63cbfc98a5..0000000000000000000000000000000000000000 --- a/src/translator/protocols/igmp.py +++ /dev/null @@ -1,66 +0,0 @@ -from protocols.Custom import Custom - -class igmp(Custom): - - # Class variables - layer = 4 # Protocol OSI layer - protocol_name = "igmp" # Protocol name - l4proto = 2 # Layer 4 protocol number - custom_parser = True # Whether the protocol has a custom parser - - # Supported keys in YAML profile - supported_keys = [ - "version", - 'type', - 'group' - ] - - # Well-known groups - groups = { - "all": "224.0.0.2", - "mdns": "224.0.0.251", - "ssdp": "239.255.255.250", - "coap": "224.0.1.187" - } - - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the IGMP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Add protocol match - protocol_match = { - "template": "meta l4proto {}", - "match": igmp.l4proto - } - self.rules["nft"].append(protocol_match) - - # Retrieve IGMP version - version = self.protocol_data.get("version", 2) - - # Handle IGMP message type - message_type = self.protocol_data["type"] - message_type = f"V{version}_{{}}" if "report" in message_type else "{}" - rules = {"forward": f"igmp_message.type == {message_type}"} - # Lambda function to convert an IGMP type to its C representation (upper case and separated by underscores) - func = lambda igmp_type: igmp_type.upper().replace(" ", "_") - self.add_field("type", rules, is_backward, func) - - # Handle IGMP group - if version == 3: - # IGMPv3: consider only the first group record's multicast address - rules = {"forward": "strcmp(ipv4_net_to_str((igmp_message.body.v3_membership_report.groups)->group_address), \"{}\") == 0"} - else: - # IGMPv1 and IGMPv2 - rules = {"forward": "strcmp(ipv4_net_to_str(igmp_message.body.v2_message.group_address), \"{}\") == 0"} - # Lambda function to explicit the address of a well-known group - func = lambda igmp_group: self.groups[igmp_group] if igmp_group in self.groups else igmp_group - self.add_field("group", rules, is_backward, func) - - return self.rules diff --git a/src/translator/protocols/ip.py b/src/translator/protocols/ip.py deleted file mode 100644 index f2f5fa24382bf56fedfd23d6ef8878f27a33d570..0000000000000000000000000000000000000000 --- a/src/translator/protocols/ip.py +++ /dev/null @@ -1,249 +0,0 @@ -from typing import Union -import ipaddress -from protocols.Protocol import Protocol -from protocols.igmp import igmp - -class ip(Protocol): - - # Class variables - layer = 3 # Protocol OSI layer - custom_parser = False # Whether the protocol has a custom parser - - # Supported keys in YAML profile - supported_keys = [ - "src", - "dst" - ] - - # Well-known addresses - addrs = { - "ipv4": { - "local": "192.168.0.0/16", - "external": "!= 192.168.0.0/16", - "gateway": "192.168.1.1", - "phone": "192.168.1.222", - "broadcast": "255.255.255.255", - "udp-broadcast": "192.168.1.255", - "igmpv3": "224.0.0.22", - **igmp.groups - }, - "ipv6": { - "default": "::", - "local": ["fe80::/10", "fc00::/7"], - "gateway": "fddd:ed18:f05b::1", - "gateway-local": "fe80::c256:27ff:fe73:460b", - "phone": "fe80::db22:fbec:a6b4:44fe", - } - } - - @staticmethod - def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool: - """ - Check whether a (list of) string is a well-known IP alias or an explicit IP address. - - :param addr: (list of) string to check. - :param version: IP version (ipv4 or ipv6). Default is "ipv4". - :return: True if the (list of) string is an IP address, False otherwise. - """ - if type(addr) == list: - # List of addresses - return all([ip.is_ip_static(a) for a in addr]) - - # Single address - if addr == "self" or addr in ip.addrs[version]: - # Address is a well-known alias - return True - # Address is not a well-known alias - try: - ipaddress.ip_address(addr) - return True - except ValueError: - # Address is not an explicit address - return False - - - def is_ip(self, addr: Union[str, list]) -> bool: - """ - Check whether a (list of) string is a well-known IP alias or an explicit IP address. - - :param addr: (list of) string to check. - :return: True if the (list of) string is an IP address, False otherwise. - """ - if type(addr) == list: - # List of addresses - return all([self.is_ip(a) for a in addr]) - - # Single address - if addr == "self" or addr in self.addrs: - # Address is a well-known alias - return True - - # Address is not a well-known alias - - try: - ipaddress.ip_network(addr) - except ValueError: - # Address is not an explicit address or CIDR subnet - return False - else: - # Address is an explicit address or CIDR subnet - return True - - - def explicit_address(self, addr: Union[str,list]) -> str: - """ - Return the explicit version of an IP address alias, - or a list of IP address aliases. - Example: "local" -> "192.168.0.0/16" - - :param addr: IP address alias(es) to explicit. - :return: Explicit IP address(es). - :raises ValueError: If the address is not a well-known alias or an explicit address. - """ - # First check if address(es) correspond(s) to well-known alias(es) - if not self.is_ip(addr): - # Address(es) is/are invalid - raise ValueError(f"Unknown address: {str(addr)}") - - # Check if given address(es) is/are a list - if isinstance(addr, list): - # List of IP address aliases, process each of them - return self.format_list([self.explicit_address(a) for a in addr]) - - # Single IP address alias - - # Address is valid - if addr == "self": - # Address is "self" - return self.device[self.protocol_name] - elif addr in self.addrs: - # Address is a well-known address alias - explicit = self.addrs[addr] - if type(explicit) == list: - # List of corresponding explicit addresses - return self.format_list(explicit) - else: - # Single corresponding explicit address - return explicit - else: - # Address is an explicit address - return addr - - - def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None: - """ - Add a new IP address match to the nfqueue accumulator. - - :param addr_dir: Address direction to add the rule to (src or dst) - :param is_backward: Whether the field to add is for a backward rule. - """ - other_dir = "src" if addr_dir == "dst" else "dst" - version = int(self.protocol_name[3]) - # Parts of the rules - domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_" - domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = " - domain_name_rule_suffix = "_addr}})" - ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = " - ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))" - cached_ip_rule_suffix = "_addr}}, interactions_data[{}].cached_ip)" - # Template rules for a domain name - rules_domain_name = { - "forward": "( " + ip_addr_rule_prefix + addr_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )", - "backward": "( " + ip_addr_rule_prefix + other_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )" - } - # Template rules for an IP address - rules_address = { - "forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix, - "backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix - } - - value = self.protocol_data[addr_dir] - rules = {} - # If value from YAML profile is a list, produce disjunction of all elements - if isinstance(value, list): - template = [] - match = [] - # Value is a list - for v in value: - is_ip = self.is_ip(v) - template_rules = rules_address if is_ip else rules_domain_name - func = self.explicit_address if is_ip else lambda x: x - match.append(func(v)) - if not is_backward: - template.append(template_rules["forward"]) - elif is_backward and "backward" in template_rules: - template.append(template_rules["backward"]) - rules = {"template": template, "match": match} - else: - # Value is a single element - is_ip = self.is_ip(value) - template_rules = rules_address if is_ip else rules_domain_name - func = self.explicit_address if is_ip else lambda x: x - if not is_backward: - rules = {"template": template_rules["forward"], "match": func(value)} - elif is_backward and "backward" in template_rules: - rules = {"template": template_rules["backward"], "match": func(value)} - - # Append rules - if rules: - self.rules["nfq"].append(rules) - - - def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None: - """ - Add a new IP address match to the accumulator, in two possible ways: - - If the address is a well-known alias or an explicit IP address, add an nftables match. - - If the address is a domain name, add an nfqueue match. - - :param addr_dir: Address direction to add the rule to (src or dst) - :param is_backward: Whether the field to add is for a backward rule. - :param initiator: Optional, initiator of the connection (src or dst). - """ - other_dir = "src" if addr_dir == "dst" else "dst" - addr = self.protocol_data[addr_dir] - - if self.is_ip(addr): # Source address is a well-known alias or an explicit IP address - tpl_addr_matches = { - "src": "saddr {}", - "dst": "daddr {}" - } - if initiator: # Connection initiator is specified - if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward): - # Connection initiator is the source device - rules = { - "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}", - "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}" - } - elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward): - # Connection initiator is the destination device - rules = { - "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}", - "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}" - } - - else: # Connection initiator is not specified - rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"} - - self.add_field(addr_dir, rules, is_backward, self.explicit_address) - - else: # Source address is potentially a domain name - self.add_addr_nfqueue(addr_dir, is_backward) - - - def parse(self, is_backward: bool = False, initiator: str = "") -> dict: - """ - Parse the IP (v4 or v6) protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - if "src" in self.protocol_data: - # Source address is specified - self.add_addr("src", is_backward, initiator) - if "dst" in self.protocol_data: - # Destination address is specified - self.add_addr("dst", is_backward, initiator) - return self.rules diff --git a/src/translator/protocols/ipv4.py b/src/translator/protocols/ipv4.py deleted file mode 100644 index 5dcf06db11b5dcf52110074fd15c7a75ed2ccf9c..0000000000000000000000000000000000000000 --- a/src/translator/protocols/ipv4.py +++ /dev/null @@ -1,11 +0,0 @@ -from protocols.ip import ip -from protocols.igmp import igmp - -class ipv4(ip): - - # Class variables - protocol_name = "ipv4" # Protocol name - nft_prefix = "ip" # Prefix for nftables rules - - # Well-known addresses - addrs = ip.addrs["ipv4"] diff --git a/src/translator/protocols/ipv6.py b/src/translator/protocols/ipv6.py deleted file mode 100644 index 78f9625ea92d03f45e9d83617e5c60badb2803df..0000000000000000000000000000000000000000 --- a/src/translator/protocols/ipv6.py +++ /dev/null @@ -1,14 +0,0 @@ -from protocols.ip import ip -from protocols.icmpv6 import icmpv6 - -class ipv6(ip): - - # Class variables - protocol_name = "ipv6" # Protocol name - nft_prefix = "ip6" # Prefix for nftables rules - - # Well-known addresses - addrs = { - **ip.addrs["ipv6"], - **icmpv6.groups - } diff --git a/src/translator/protocols/mdns.py b/src/translator/protocols/mdns.py deleted file mode 100644 index 7572f696210d6e9786f5b82797b0f573729f57de..0000000000000000000000000000000000000000 --- a/src/translator/protocols/mdns.py +++ /dev/null @@ -1,6 +0,0 @@ -from protocols.dns import dns - -class mdns(dns): - - # Class variables - protocol_name = "mdns" # Protocol name diff --git a/src/translator/protocols/ssdp.py b/src/translator/protocols/ssdp.py deleted file mode 100644 index 7cad8961fffc96c3c72ec0b488c2f0c9964fc50e..0000000000000000000000000000000000000000 --- a/src/translator/protocols/ssdp.py +++ /dev/null @@ -1,45 +0,0 @@ -from protocols.Custom import Custom - -class ssdp(Custom): - - # Class variables - layer = 7 # Protocol OSI layer - protocol_name = "ssdp" # Protocol name - - # Supported keys in YAML profile - supported_keys = [ - "method", - "response" - ] - - def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: - """ - Parse the SSDP protocol. - - :param is_backward (optional): Whether the protocol must be parsed for a backward rule. - Optional, default is `False`. - :param initiator (optional): Connection initiator (src or dst). - Optional, default is "src". - :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. - """ - # Request or response - ssdp_type_rule = {} - if "response" in self.protocol_data and self.protocol_data["response"]: - if is_backward: - ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""} - else: - ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"} - else: - if is_backward: - ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"} - else: - ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""} - self.rules["nfq"].append(ssdp_type_rule) - - # Handle SSDP method - rule = {"forward": "ssdp_message.method == {}"} - # Lambda function to convert an SSDP method to its C representation (upper case and separated by underscores) - func = lambda ssdp_method: f"SSDP_{ssdp_method.upper().replace('-', '_')}" - self.add_field("method", rule, is_backward, func) - - return self.rules diff --git a/src/translator/protocols/tcp.py b/src/translator/protocols/tcp.py deleted file mode 100644 index 934c313493bee89f8f3dbb2423fe19fe71582f53..0000000000000000000000000000000000000000 --- a/src/translator/protocols/tcp.py +++ /dev/null @@ -1,9 +0,0 @@ -from protocols.Transport import Transport - -class tcp(Transport): - - # Class variables - protocol_name = "tcp" # Protocol name - - # Supported keys in YAML profile - supported_keys = Transport.supported_keys + ["initiated-by"] diff --git a/src/translator/protocols/udp.py b/src/translator/protocols/udp.py deleted file mode 100644 index 2776718039e86a4d40a40ff0470aff32fa32354e..0000000000000000000000000000000000000000 --- a/src/translator/protocols/udp.py +++ /dev/null @@ -1,6 +0,0 @@ -from protocols.Transport import Transport - -class udp(Transport): - - # Class variables - protocol_name = "udp" # Protocol name