From 3a22b8a485b31bab31aba1284b2f6a5fd4f16a4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20De=20Keersmaeker?= <francois.dekeersmaeker@uclouvain.be> Date: Tue, 9 Jul 2024 15:40:24 +0200 Subject: [PATCH] Added protocol translators --- .gitignore | 2 + protocols/Custom.py | 76 +++++++++++++ protocols/Protocol.py | 132 ++++++++++++++++++++++ protocols/Transport.py | 57 ++++++++++ protocols/arp.py | 95 ++++++++++++++++ protocols/coap.py | 42 +++++++ protocols/dhcp.py | 35 ++++++ protocols/dns.py | 85 ++++++++++++++ protocols/http.py | 56 +++++++++ protocols/icmp.py | 40 +++++++ protocols/icmpv6.py | 41 +++++++ protocols/igmp.py | 66 +++++++++++ protocols/ip.py | 249 +++++++++++++++++++++++++++++++++++++++++ protocols/ipv4.py | 11 ++ protocols/ipv6.py | 14 +++ protocols/mdns.py | 6 + protocols/ssdp.py | 45 ++++++++ protocols/tcp.py | 9 ++ protocols/udp.py | 6 + 19 files changed, 1067 insertions(+) create mode 100644 .gitignore create mode 100644 protocols/Custom.py create mode 100644 protocols/Protocol.py create mode 100644 protocols/Transport.py create mode 100644 protocols/arp.py create mode 100644 protocols/coap.py create mode 100644 protocols/dhcp.py create mode 100644 protocols/dns.py create mode 100644 protocols/http.py create mode 100644 protocols/icmp.py create mode 100644 protocols/icmpv6.py create mode 100644 protocols/igmp.py create mode 100644 protocols/ip.py create mode 100644 protocols/ipv4.py create mode 100644 protocols/ipv6.py create mode 100644 protocols/mdns.py create mode 100644 protocols/ssdp.py create mode 100644 protocols/tcp.py create mode 100644 protocols/udp.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1ec7f20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +# Python caches +__pycache__ diff --git a/protocols/Custom.py b/protocols/Custom.py new file mode 100644 index 0000000..93f3649 --- /dev/null +++ b/protocols/Custom.py @@ -0,0 +1,76 @@ +from protocols.Protocol import Protocol + +class Custom(Protocol): + + # Class variables + custom_parser = True # Whether the protocol has a custom parser + + + @staticmethod + def build_nfq_list_match(l: list, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> dict: + """ + Produce a nfqueue match for a list of values. + + :param l: List of values. + :param template_rules: Dictionary containing the protocol-specific rules to add. + :param is_backward: Whether the field to add is for a backward rule. + :param func: Function to apply to the field value before writing it. + Optional, default is the identity function. + :param backward_func: Function to apply to the field value in the case of a backwards rule. + Will be applied after `func`. + Optional, default is the identity function. + """ + template = [] + match = [] + # Value is a list + for v in l: + if not is_backward: + template.append(template_rules["forward"]) + match.append(func(v)) + elif is_backward and "backward" in template_rules: + template.append(template_rules["backward"]) + match.append(backward_func(func(v))) + return {"template": template, "match": match} + + + def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None: + """ + Add a new nfqueue match to the accumulator. + Overrides the nftables version. + + :param field: Field to add the rule for. + :param template_rules: Dictionary containing the protocol-specific rules to add. + :param is_backward: Whether the field to add is for a backward rule. + :param func: Function to apply to the field value before writing it. + Optional, default is the identity function. + :param backward_func: Function to apply to the field value in the case of a backwards rule. + Will be applied after `func`. + Optional, default is the identity function. + Args: + field (str): Field to add the rule for. + template_rules (dict): Dictionary containing the protocol-specific rules to add. + is_backward (bool): Whether the field to add is for a backward rule. + func (lambda): Function to apply to the field value before writing it. + Optional, default is the identity function. + backward_func (lambda): Function to apply to the field value in the case of a backwards rule. + Will be applied after `func`. + Optional, default is the identity function. + """ + if field in self.protocol_data: + value = self.protocol_data[field] + rules = {} + + # If value from YAML profile is a list, produce disjunction of all elements + if type(value) == list: + rules = Custom.build_nfq_list_match(value, template_rules, is_backward, func, backward_func) + else: + # Value is a single element + value = Protocol.convert_value(value) + if not is_backward: + rules = {"template": template_rules["forward"], "match": func(value)} + elif is_backward and "backward" in template_rules: + rules = {"template": template_rules["backward"], "match": backward_func(func(value))} + + # Append rules + if rules: + self.rules["nfq"].append(rules) diff --git a/protocols/Protocol.py b/protocols/Protocol.py new file mode 100644 index 0000000..e68ebc1 --- /dev/null +++ b/protocols/Protocol.py @@ -0,0 +1,132 @@ +from __future__ import annotations +from typing import Union +import importlib + +class Protocol: + """ + Generic protocol, inherited by all concrete protocols. + """ + + + def __init__(self, protocol_data: dict, device: dict) -> None: + """ + Generic protocol constructor. + + :param protocol_data: Dictionary containing the protocol data. + :param device: Dictionary containing the device metadata. + """ + self.protocol_data = protocol_data + self.device = device + self.rules = { + "nft": [], + "nfq": [] + } + + + @staticmethod + def convert_value(value: str) -> Union[str, int]: + """ + Convert a string value to an int if possible. + + :param value: Value to convert. + :return: Converted value as int if possible, or the original string value otherwise. + """ + try: + result = int(value) + except ValueError: + result = value + return result + + + @classmethod + def init_protocol(c, protocol_name: str, protocol_data: dict, device: dict) -> Protocol: + """ + Factory method for a specific protocol. + + :param protocol_name: Name of the protocol. + :param protocol_data: Dictionary containing the protocol data. + :param device: Dictionary containing the device metadata. + """ + module = importlib.import_module(f"protocols.{protocol_name}") + cls = getattr(module, protocol_name) + return cls(protocol_data, device) + + + def format_list(self, l: list, func = lambda x: x) -> str: + """ + Format a list of values. + + :param l: List of values. + :param func: Function to apply to each value. + Optional, default is the identity function. + :return: Formatted list. + """ + value = "{ " + for i in range(len(l)): + if i != 0: + value += ", " + value += str(func(l[i])) + value += " }" + return value + + + def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None: + """ + Add a new nftables rule to the nftables rules accumulator. + + :param field: Field to add the rule for. + :param template_rules: Dictionary containing the protocol-specific rules to add. + :param is_backward (optional): Whether the field to add is for a backward rule. + Optional, default is `False`. + :param func (optional): Function to apply to the field value before writing it. + Optional, default is the identity function. + :param backward_func (optional): a to apply to the field value in the case of a backwards rule. + Will be applied after the forward function. + Optional, default is the identity function. + Args: + field (str): Field to add the rule for. + rules (dict): Dictionary containing the protocol-specific rules to add. + is_backward (bool): Whether the field to add is for a backward rule. + Optional, default is `False`. + func (lambda): Function to apply to the field value before writing it. + Optional, default is the identity function. + backward_func (lambda): a to apply to the field value in the case of a backwards rule. + Will be applied after the forward function. + Optional, default is the identity function. + """ + if self.protocol_data is not None and field in self.protocol_data: + value = self.protocol_data[field] + + # If value from YAML profile is a list, add each element + if type(value) == list: + # Value is a list + value = self.format_list(value, func) + else: + # Value is a single element + value = func(value) + + # Build rule + rule = {} + value = Protocol.convert_value(value) + if not is_backward: + rule = {"template": template_rules["forward"], "match": value} + elif is_backward and "backward" in template_rules: + rule = {"template": template_rules["backward"], "match": backward_func(value)} + + # Add rule to the list of rules + if rule: + self.rules["nft"].append(rule) + + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Default parsing method. + Must be updated in the children class. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + return self.rules diff --git a/protocols/Transport.py b/protocols/Transport.py new file mode 100644 index 0000000..321a1d3 --- /dev/null +++ b/protocols/Transport.py @@ -0,0 +1,57 @@ +from protocols.Protocol import Protocol + +class Transport(Protocol): + + # Class variables + layer = 4 # Protocol OSI layer + custom_parser = False # Whether the protocol has a custom parser + + # Supported keys in YAML profile + supported_keys = [ + "src-port", + "dst-port" + ] + + def parse(self, is_backward: bool = False, initiator: str = "") -> dict: + """ + Parse a layer 4 protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Add protocol match + protocol_match = { + "template": "meta l4proto {}", + "match": self.protocol_name + } + self.rules["nft"].append(protocol_match) + + # Connection initiator is specified + if initiator: + # Template rules + template_rules = { + "src-port": {"forward": "ct original proto-src {}", "backward": "ct original proto-dst {}"}, + "dst-port": {"forward": "ct original proto-dst {}", "backward": "ct original proto-src {}"} + } + if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward): + # Connection initiator is the source device + self.add_field("src-port", template_rules["src-port"], is_backward) + self.add_field("dst-port", template_rules["dst-port"], is_backward) + elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward): + # Connection initiator is the destination device + self.add_field("src-port", template_rules["dst-port"], is_backward) + self.add_field("dst-port", template_rules["src-port"], is_backward) + + # Connection initiator is not specified + else: + # Handle source port + rules = {"forward": self.protocol_name + " sport {}", "backward": self.protocol_name + " dport {}"} + self.add_field("src-port", rules, is_backward) + # Handle destination port + rules = {"forward": self.protocol_name + " dport {}", "backward": self.protocol_name + " sport {}"} + self.add_field("dst-port", rules, is_backward) + + return self.rules diff --git a/protocols/arp.py b/protocols/arp.py new file mode 100644 index 0000000..d5a6a21 --- /dev/null +++ b/protocols/arp.py @@ -0,0 +1,95 @@ +from protocols.Protocol import Protocol + +class arp(Protocol): + + # Class variables + protocol_name = "arp" # Protocol name + layer = 3 # Protocol OSI layer + custom_parser = False # Whether the protocol has a custom parser + + # Supported keys in YAML profile + supported_keys = [ + "type", # ARP message type + "sha", # ARP source hardware address + "tha", # ARP target hardware address + "spa", # ARP source protocol address + "tpa" # ARP target protocol address + ] + + # Well-known addresses + mac_addrs = { + "gateway": "c0:56:27:73:46:0b", + "default": "00:00:00:00:00:00", + "broadcast": "ff:ff:ff:ff:ff:ff", + "phone": "3c:cd:5d:a2:a9:d7" + } + ip_addrs = { + "local": "192.168.1.0/24", + "gateway": "192.168.1.1", + "phone": "192.168.1.222" + } + + + def explicit_address(self, addr: str, type: str = "ipv4") -> str: + """ + Return the explicit version of an IPv4 or MAC address alias. + Example: "local" -> "192.168.0.0/16" + + :param addr: IPv4 or MAC address alias to explicit. + :param type: Type of address (ipv4 or mac). + :return: Explicit IPv4 or MAC address. + :raises ValueError: If the address is not a well-known alias or an explicit address. + """ + if addr == "self": + # Address is "self" + return self.device[type] + + # Address is not "self" + + # Get dictionary of well-known addresses, based on type + addrs = None + if type == "ipv4": + addrs = self.ip_addrs + elif type == "mac": + addrs = self.mac_addrs + + if addr in addrs: + # Address is a well-known address alias + return addrs[addr] + else: + # Address is an explicit address + return addr + + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the ARP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Lambda function to explicit a self or a well-known MAC address + func_mac = lambda mac: self.device['mac'] if mac == "self" else ( self.mac_addrs[mac] if mac in self.mac_addrs else mac ) + # Lambda function to explicit a self or a well-known IPv4 address + func_ip = lambda ip: self.device['ipv4'] if ip == "self" else ( self.ip_addrs[ip] if ip in self.ip_addrs else ip ) + # Handle ARP message type + rules = {"forward": "arp operation {}", "backward": "arp operation {}"} + # Lambda function to flip the ARP type (for the backward rule) + backward_func = lambda arp_type: "reply" if arp_type == "request" else ( "request" if arp_type == "reply" else arp_type ) + self.add_field("type", rules, is_backward, backward_func=backward_func) + # Handle ARP source hardware address + rules = {"forward": "arp saddr ether {}", "backward": "arp daddr ether {}"} + self.add_field("sha", rules, is_backward, func_mac) + # Handle ARP target hardware address + rules = {"forward": "arp daddr ether {}", "backward": "arp saddr ether {}"} + self.add_field("tha", rules, is_backward, func_mac) + # Handle ARP source protocol address + rules = {"forward": "arp saddr ip {}", "backward": "arp daddr ip {}"} + self.add_field("spa", rules, is_backward, func_ip) + # Handle ARP target protocol address + rules = {"forward": "arp daddr ip {}", "backward": "arp saddr ip {}"} + self.add_field("tpa", rules, is_backward, func_ip) + return self.rules diff --git a/protocols/coap.py b/protocols/coap.py new file mode 100644 index 0000000..20e3292 --- /dev/null +++ b/protocols/coap.py @@ -0,0 +1,42 @@ +from protocols.Custom import Custom + +class coap(Custom): + + # Class variables + layer = 7 # Protocol OSI layer + protocol_name = "coap" # Protocol name + + # Supported keys in YAML profile + supported_keys = [ + "type", + "method", + "uri" + ] + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the CoAP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Lambda functions to convert a CoAP type or method to its C representation (upper case and separated by underscores) + func_coap_type = lambda type: f"COAP_{type.upper().replace('-', '_')}" + func_coap_method = lambda method: f"HTTP_{method.upper().replace('-', '_')}" + + # Handle CoAP message type + rule = {"forward": "coap_message.type == {}"} + self.add_field("type", rule, is_backward, func_coap_type) + + # Handle CoAP method + rule = {"forward": "coap_message.method == {}"} + self.add_field("method", rule, is_backward, func_coap_method) + + # Handle CoAP URI + rule = {"forward": "strcmp(coap_message.uri, \"{}\") == 0"} + self.add_field("uri", rule, is_backward) + + return self.rules diff --git a/protocols/dhcp.py b/protocols/dhcp.py new file mode 100644 index 0000000..719238c --- /dev/null +++ b/protocols/dhcp.py @@ -0,0 +1,35 @@ +from protocols.Custom import Custom + +class dhcp(Custom): + + # Class variables + layer = 7 # Protocol OSI layer + protocol_name = "dhcp" # Protocol name + + # Supported keys in YAML profile + supported_keys = [ + "type", + "client-mac" + ] + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the DHCP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Handle DHCP message type + rules = {"forward": "dhcp_message.options.message_type == {}"} + # Lambda function to convert a DHCP type to its C representation (upper case) + func = lambda dhcp_type: f"DHCP_{dhcp_type.upper()}" + self.add_field("type", rules, is_backward, func) + # Handle DHCP client MAC address + rules = {"forward": "strcmp(mac_hex_to_str(dhcp_message.chaddr), \"{}\") == 0"} + # Lambda function to explicit a self MAC address + func = lambda mac: self.device['mac'] if mac == "self" else mac + self.add_field("client-mac", rules, is_backward, func) + return self.rules diff --git a/protocols/dns.py b/protocols/dns.py new file mode 100644 index 0000000..1b73f5d --- /dev/null +++ b/protocols/dns.py @@ -0,0 +1,85 @@ +from protocols.Custom import Custom + +class dns(Custom): + + # Class variables + layer = 7 # Protocol OSI layer + protocol_name = "dns" # Protocol name + WILDCARD = "$" # Wildcard character for domain names + + # Supported keys in YAML profile + supported_keys = [ + "type", # DNS query type + "domain-name" # DNS domain name + ] + + + @staticmethod + def get_domain_name_rule(domain_name: str) -> dict: + """ + Retrieves the NFQueue rule to match a given domain name. + + :param domain_name: Domain name to match. + :return: Dictionary containing the NFQueue rule to match the given domain name. + """ + if domain_name.startswith(dns.WILDCARD): + suffix = domain_name[len(dns.WILDCARD):] + return { + "template": f"dns_contains_suffix_domain_name(dns_message.questions, dns_message.header.qdcount, \"{{}}\", {len(suffix)})", + "match": suffix + } + else: + return { + "template": "dns_contains_full_domain_name(dns_message.questions, dns_message.header.qdcount, \"{}\")", + "match": domain_name + } + + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the DNS protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Handle QR flag + qr_rules = {} + if "response" in self.protocol_data and self.protocol_data["response"]: + if is_backward: + qr_rules = {"template": "dns_message.header.qr == {}", "match": 0} + else: + qr_rules = {"template": "dns_message.header.qr == {}", "match": 1} + else: + if is_backward: + qr_rules = {"template": "dns_message.header.qr == {}", "match": 1} + else: + qr_rules = {"template": "dns_message.header.qr == {}", "match": 0} + self.rules["nfq"].append(qr_rules) + + # Handle DNS query type + rule = "( dns_message.header.qdcount > 0 && dns_message.questions->qtype == {} )" + # Lambda function to convert an DNS query type to its C representation (upper case) + func = lambda dns_qtype: dns_qtype.upper() + rules = {"forward": rule, "backward": rule} + self.add_field("qtype", rules, is_backward, func) + + # Handle DNS domain name + domain_name = self.protocol_data.get("domain-name", None) + if domain_name is not None: + domain_name_rule = {} + if isinstance(domain_name, list): + template = [] + match = [] + for dname in domain_name: + single_rule = dns.get_domain_name_rule(dname) + template.append(single_rule["template"]) + match.append(single_rule["match"]) + domain_name_rule = {"template": template, "match": match} + else: + domain_name_rule = dns.get_domain_name_rule(domain_name) + self.rules["nfq"].append(domain_name_rule) + + return self.rules diff --git a/protocols/http.py b/protocols/http.py new file mode 100644 index 0000000..e75c375 --- /dev/null +++ b/protocols/http.py @@ -0,0 +1,56 @@ +from protocols.Custom import Custom + +class http(Custom): + + # Class variables + layer = 7 # Protocol OSI layer + protocol_name = "http" # Protocol name + + # Supported keys in YAML profile + supported_keys = [ + "method", + "uri", + "response" + ] + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the HTTP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Request or response + http_type_rule = {} + if "response" in self.protocol_data and self.protocol_data["response"]: + if is_backward: + http_type_rule = {"template": "{}http_message.is_request", "match": ""} + else: + http_type_rule = {"template": "{}http_message.is_request", "match": "!"} + else: + if is_backward: + http_type_rule = {"template": "{}http_message.is_request", "match": "!"} + else: + http_type_rule = {"template": "{}http_message.is_request", "match": ""} + self.rules["nfq"].append(http_type_rule) + + # Handle HTTP method + rule = {"forward": "http_message.method == {}"} + # Lambda function to convert an HTTP method to its C representation (upper case) + func = lambda http_method: f"HTTP_{http_method.upper()}" + self.add_field("method", rule, is_backward, func) + + # Handle HTTP URI + # URI can take two forms: + # - Complete URI: exact string match + # - URI prefix: string match with the beginning of the URI + uri = self.protocol_data.get("uri", None) + if uri is not None: + length = len(uri) - 1 if uri.endswith("*") or uri.endswith("$") else len(uri) + 1 + rule = {"forward": f"strncmp(http_message.uri, \"{{}}\", {length}) == 0"} + self.add_field("uri", rule, is_backward) + + return self.rules diff --git a/protocols/icmp.py b/protocols/icmp.py new file mode 100644 index 0000000..7cab213 --- /dev/null +++ b/protocols/icmp.py @@ -0,0 +1,40 @@ +from protocols.Protocol import Protocol + +class icmp(Protocol): + + # Class variables + layer = 4 # Protocol OSI layer + protocol_name = "icmp" # Protocol name + l4proto = 1 # Layer 4 protocol number + custom_parser = False # Whether the protocol has a custom parser + + # Supported keys in YAML profile + supported_keys = [ + "type" # ICMP message type + ] + + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the ICMP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Add protocol match + protocol_match = { + "template": "meta l4proto {}", + "match": self.l4proto + } + self.rules["nft"].append(protocol_match) + + # Handle ICMP message type + icmp_rule = f"{self.protocol_name} type {{}}" + rules = {"forward": icmp_rule, "backward": icmp_rule} + # Lambda function to flip the ICMP type (for the backward rule) + backward_func = lambda icmp_type: icmp_type.replace("request", "reply") if "request" in icmp_type else ( icmp_type.replace("reply", "request") if "reply" in icmp_type else icmp_type ) + self.add_field("type", rules, is_backward, backward_func=backward_func) + return self.rules diff --git a/protocols/icmpv6.py b/protocols/icmpv6.py new file mode 100644 index 0000000..851fe02 --- /dev/null +++ b/protocols/icmpv6.py @@ -0,0 +1,41 @@ +from protocols.Protocol import Protocol + +class icmpv6(Protocol): + + # Class variables + layer = 4 # Protocol OSI layer + protocol_name = "icmpv6" # Protocol name + l4proto = 58 # Layer 4 protocol number + custom_parser = False # Whether the protocol has a custom parser + + # IPv6 multicast groups + groups = { + "multicast": "ff02::/16", + "all-nodes": "ff02::1", + "all-routers": "ff02::2", + "all-mldv2-routers": "ff02::16", + "mdns": "ff02::fb", + "coap": "ff02::158" + } + + # Supported keys in YAML profile + # For now, no support for ICMPv6 options, as the router does not support them + supported_keys = [] + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the ICMP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Add protocol match + protocol_match = { + "template": "meta l4proto {}", + "match": self.l4proto + } + self.rules["nft"].append(protocol_match) + return self.rules diff --git a/protocols/igmp.py b/protocols/igmp.py new file mode 100644 index 0000000..e2e4226 --- /dev/null +++ b/protocols/igmp.py @@ -0,0 +1,66 @@ +from protocols.Custom import Custom + +class igmp(Custom): + + # Class variables + layer = 4 # Protocol OSI layer + protocol_name = "igmp" # Protocol name + l4proto = 2 # Layer 4 protocol number + custom_parser = True # Whether the protocol has a custom parser + + # Supported keys in YAML profile + supported_keys = [ + "version", + 'type', + 'group' + ] + + # Well-known groups + groups = { + "all": "224.0.0.2", + "mdns": "224.0.0.251", + "ssdp": "239.255.255.250", + "coap": "224.0.1.187" + } + + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the IGMP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Add protocol match + protocol_match = { + "template": "meta l4proto {}", + "match": igmp.l4proto + } + self.rules["nft"].append(protocol_match) + + # Retrieve IGMP version + version = self.protocol_data.get("version", 2) + + # Handle IGMP message type + message_type = self.protocol_data["type"] + message_type = f"V{version}_{{}}" if "report" in message_type else "{}" + rules = {"forward": f"igmp_message.type == {message_type}"} + # Lambda function to convert an IGMP type to its C representation (upper case and separated by underscores) + func = lambda igmp_type: igmp_type.upper().replace(" ", "_") + self.add_field("type", rules, is_backward, func) + + # Handle IGMP group + if version == 3: + # IGMPv3: consider only the first group record's multicast address + rules = {"forward": "strcmp(ipv4_net_to_str((igmp_message.body.v3_membership_report.groups)->group_address), \"{}\") == 0"} + else: + # IGMPv1 and IGMPv2 + rules = {"forward": "strcmp(ipv4_net_to_str(igmp_message.body.v2_message.group_address), \"{}\") == 0"} + # Lambda function to explicit the address of a well-known group + func = lambda igmp_group: self.groups[igmp_group] if igmp_group in self.groups else igmp_group + self.add_field("group", rules, is_backward, func) + + return self.rules diff --git a/protocols/ip.py b/protocols/ip.py new file mode 100644 index 0000000..f2f5fa2 --- /dev/null +++ b/protocols/ip.py @@ -0,0 +1,249 @@ +from typing import Union +import ipaddress +from protocols.Protocol import Protocol +from protocols.igmp import igmp + +class ip(Protocol): + + # Class variables + layer = 3 # Protocol OSI layer + custom_parser = False # Whether the protocol has a custom parser + + # Supported keys in YAML profile + supported_keys = [ + "src", + "dst" + ] + + # Well-known addresses + addrs = { + "ipv4": { + "local": "192.168.0.0/16", + "external": "!= 192.168.0.0/16", + "gateway": "192.168.1.1", + "phone": "192.168.1.222", + "broadcast": "255.255.255.255", + "udp-broadcast": "192.168.1.255", + "igmpv3": "224.0.0.22", + **igmp.groups + }, + "ipv6": { + "default": "::", + "local": ["fe80::/10", "fc00::/7"], + "gateway": "fddd:ed18:f05b::1", + "gateway-local": "fe80::c256:27ff:fe73:460b", + "phone": "fe80::db22:fbec:a6b4:44fe", + } + } + + @staticmethod + def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool: + """ + Check whether a (list of) string is a well-known IP alias or an explicit IP address. + + :param addr: (list of) string to check. + :param version: IP version (ipv4 or ipv6). Default is "ipv4". + :return: True if the (list of) string is an IP address, False otherwise. + """ + if type(addr) == list: + # List of addresses + return all([ip.is_ip_static(a) for a in addr]) + + # Single address + if addr == "self" or addr in ip.addrs[version]: + # Address is a well-known alias + return True + # Address is not a well-known alias + try: + ipaddress.ip_address(addr) + return True + except ValueError: + # Address is not an explicit address + return False + + + def is_ip(self, addr: Union[str, list]) -> bool: + """ + Check whether a (list of) string is a well-known IP alias or an explicit IP address. + + :param addr: (list of) string to check. + :return: True if the (list of) string is an IP address, False otherwise. + """ + if type(addr) == list: + # List of addresses + return all([self.is_ip(a) for a in addr]) + + # Single address + if addr == "self" or addr in self.addrs: + # Address is a well-known alias + return True + + # Address is not a well-known alias + + try: + ipaddress.ip_network(addr) + except ValueError: + # Address is not an explicit address or CIDR subnet + return False + else: + # Address is an explicit address or CIDR subnet + return True + + + def explicit_address(self, addr: Union[str,list]) -> str: + """ + Return the explicit version of an IP address alias, + or a list of IP address aliases. + Example: "local" -> "192.168.0.0/16" + + :param addr: IP address alias(es) to explicit. + :return: Explicit IP address(es). + :raises ValueError: If the address is not a well-known alias or an explicit address. + """ + # First check if address(es) correspond(s) to well-known alias(es) + if not self.is_ip(addr): + # Address(es) is/are invalid + raise ValueError(f"Unknown address: {str(addr)}") + + # Check if given address(es) is/are a list + if isinstance(addr, list): + # List of IP address aliases, process each of them + return self.format_list([self.explicit_address(a) for a in addr]) + + # Single IP address alias + + # Address is valid + if addr == "self": + # Address is "self" + return self.device[self.protocol_name] + elif addr in self.addrs: + # Address is a well-known address alias + explicit = self.addrs[addr] + if type(explicit) == list: + # List of corresponding explicit addresses + return self.format_list(explicit) + else: + # Single corresponding explicit address + return explicit + else: + # Address is an explicit address + return addr + + + def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None: + """ + Add a new IP address match to the nfqueue accumulator. + + :param addr_dir: Address direction to add the rule to (src or dst) + :param is_backward: Whether the field to add is for a backward rule. + """ + other_dir = "src" if addr_dir == "dst" else "dst" + version = int(self.protocol_name[3]) + # Parts of the rules + domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_" + domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = " + domain_name_rule_suffix = "_addr}})" + ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = " + ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))" + cached_ip_rule_suffix = "_addr}}, interactions_data[{}].cached_ip)" + # Template rules for a domain name + rules_domain_name = { + "forward": "( " + ip_addr_rule_prefix + addr_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )", + "backward": "( " + ip_addr_rule_prefix + other_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )" + } + # Template rules for an IP address + rules_address = { + "forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix, + "backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix + } + + value = self.protocol_data[addr_dir] + rules = {} + # If value from YAML profile is a list, produce disjunction of all elements + if isinstance(value, list): + template = [] + match = [] + # Value is a list + for v in value: + is_ip = self.is_ip(v) + template_rules = rules_address if is_ip else rules_domain_name + func = self.explicit_address if is_ip else lambda x: x + match.append(func(v)) + if not is_backward: + template.append(template_rules["forward"]) + elif is_backward and "backward" in template_rules: + template.append(template_rules["backward"]) + rules = {"template": template, "match": match} + else: + # Value is a single element + is_ip = self.is_ip(value) + template_rules = rules_address if is_ip else rules_domain_name + func = self.explicit_address if is_ip else lambda x: x + if not is_backward: + rules = {"template": template_rules["forward"], "match": func(value)} + elif is_backward and "backward" in template_rules: + rules = {"template": template_rules["backward"], "match": func(value)} + + # Append rules + if rules: + self.rules["nfq"].append(rules) + + + def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None: + """ + Add a new IP address match to the accumulator, in two possible ways: + - If the address is a well-known alias or an explicit IP address, add an nftables match. + - If the address is a domain name, add an nfqueue match. + + :param addr_dir: Address direction to add the rule to (src or dst) + :param is_backward: Whether the field to add is for a backward rule. + :param initiator: Optional, initiator of the connection (src or dst). + """ + other_dir = "src" if addr_dir == "dst" else "dst" + addr = self.protocol_data[addr_dir] + + if self.is_ip(addr): # Source address is a well-known alias or an explicit IP address + tpl_addr_matches = { + "src": "saddr {}", + "dst": "daddr {}" + } + if initiator: # Connection initiator is specified + if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward): + # Connection initiator is the source device + rules = { + "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}", + "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}" + } + elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward): + # Connection initiator is the destination device + rules = { + "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}", + "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}" + } + + else: # Connection initiator is not specified + rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"} + + self.add_field(addr_dir, rules, is_backward, self.explicit_address) + + else: # Source address is potentially a domain name + self.add_addr_nfqueue(addr_dir, is_backward) + + + def parse(self, is_backward: bool = False, initiator: str = "") -> dict: + """ + Parse the IP (v4 or v6) protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + if "src" in self.protocol_data: + # Source address is specified + self.add_addr("src", is_backward, initiator) + if "dst" in self.protocol_data: + # Destination address is specified + self.add_addr("dst", is_backward, initiator) + return self.rules diff --git a/protocols/ipv4.py b/protocols/ipv4.py new file mode 100644 index 0000000..5dcf06d --- /dev/null +++ b/protocols/ipv4.py @@ -0,0 +1,11 @@ +from protocols.ip import ip +from protocols.igmp import igmp + +class ipv4(ip): + + # Class variables + protocol_name = "ipv4" # Protocol name + nft_prefix = "ip" # Prefix for nftables rules + + # Well-known addresses + addrs = ip.addrs["ipv4"] diff --git a/protocols/ipv6.py b/protocols/ipv6.py new file mode 100644 index 0000000..78f9625 --- /dev/null +++ b/protocols/ipv6.py @@ -0,0 +1,14 @@ +from protocols.ip import ip +from protocols.icmpv6 import icmpv6 + +class ipv6(ip): + + # Class variables + protocol_name = "ipv6" # Protocol name + nft_prefix = "ip6" # Prefix for nftables rules + + # Well-known addresses + addrs = { + **ip.addrs["ipv6"], + **icmpv6.groups + } diff --git a/protocols/mdns.py b/protocols/mdns.py new file mode 100644 index 0000000..7572f69 --- /dev/null +++ b/protocols/mdns.py @@ -0,0 +1,6 @@ +from protocols.dns import dns + +class mdns(dns): + + # Class variables + protocol_name = "mdns" # Protocol name diff --git a/protocols/ssdp.py b/protocols/ssdp.py new file mode 100644 index 0000000..7cad896 --- /dev/null +++ b/protocols/ssdp.py @@ -0,0 +1,45 @@ +from protocols.Custom import Custom + +class ssdp(Custom): + + # Class variables + layer = 7 # Protocol OSI layer + protocol_name = "ssdp" # Protocol name + + # Supported keys in YAML profile + supported_keys = [ + "method", + "response" + ] + + def parse(self, is_backward: bool = False, initiator: str = "src") -> dict: + """ + Parse the SSDP protocol. + + :param is_backward (optional): Whether the protocol must be parsed for a backward rule. + Optional, default is `False`. + :param initiator (optional): Connection initiator (src or dst). + Optional, default is "src". + :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy. + """ + # Request or response + ssdp_type_rule = {} + if "response" in self.protocol_data and self.protocol_data["response"]: + if is_backward: + ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""} + else: + ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"} + else: + if is_backward: + ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"} + else: + ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""} + self.rules["nfq"].append(ssdp_type_rule) + + # Handle SSDP method + rule = {"forward": "ssdp_message.method == {}"} + # Lambda function to convert an SSDP method to its C representation (upper case and separated by underscores) + func = lambda ssdp_method: f"SSDP_{ssdp_method.upper().replace('-', '_')}" + self.add_field("method", rule, is_backward, func) + + return self.rules diff --git a/protocols/tcp.py b/protocols/tcp.py new file mode 100644 index 0000000..934c313 --- /dev/null +++ b/protocols/tcp.py @@ -0,0 +1,9 @@ +from protocols.Transport import Transport + +class tcp(Transport): + + # Class variables + protocol_name = "tcp" # Protocol name + + # Supported keys in YAML profile + supported_keys = Transport.supported_keys + ["initiated-by"] diff --git a/protocols/udp.py b/protocols/udp.py new file mode 100644 index 0000000..2776718 --- /dev/null +++ b/protocols/udp.py @@ -0,0 +1,6 @@ +from protocols.Transport import Transport + +class udp(Transport): + + # Class variables + protocol_name = "udp" # Protocol name -- GitLab