Skip to content
Extraits de code Groupes Projets
ip.py 10,51 Kio
from typing import Union
import ipaddress
from protocols.Protocol import Protocol
from protocols.igmp import igmp

class ip(Protocol):

    # Class variables
    layer = 3              # Protocol OSI layer
    custom_parser = False  # Whether the protocol has a custom parser
    
    # Supported keys in YAML profile
    supported_keys = [
        "src",
        "dst"
    ]

    # Well-known addresses
    addrs = {
        "ipv4": {
            "local":         "192.168.0.0/16",
            "external":      "!= 192.168.0.0/16",
            "gateway":       "192.168.1.1",
            "phone":         "192.168.1.222",
            "broadcast":     "255.255.255.255",
            "udp-broadcast": "192.168.1.255",
            "igmpv3":        "224.0.0.22",
            **igmp.groups
        },
        "ipv6": {
            "default":       "::",
            "local":         ["fe80::/10", "fc00::/7"],
            "gateway":       "fddd:ed18:f05b::1",
            "gateway-local": "fe80::c256:27ff:fe73:460b",
            "phone":         "fe80::db22:fbec:a6b4:44fe",
        }
    }

    @staticmethod
    def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool:
        """
        Check whether a (list of) string is a well-known IP alias or an explicit IP address.

        :param addr: (list of) string to check.
        :param version: IP version (ipv4 or ipv6). Default is "ipv4".
        :return: True if the (list of) string is an IP address, False otherwise.
        """
        if type(addr) == list:
            # List of addresses
            return all([ip.is_ip_static(a) for a in addr])
        
        # Single address
        if addr == "self" or addr in ip.addrs[version]:
            # Address is a well-known alias
            return True
        # Address is not a well-known alias
        try:
            ipaddress.ip_address(addr)
            return True
        except ValueError:
            # Address is not an explicit address
            return False


    def is_ip(self, addr: Union[str, list]) -> bool:
        """
        Check whether a (list of) string is a well-known IP alias or an explicit IP address.

        :param addr: (list of) string to check.
        :return: True if the (list of) string is an IP address, False otherwise.
        """
        if type(addr) == list:
            # List of addresses
            return all([self.is_ip(a) for a in addr])
        
        # Single address
        if addr == "self" or addr in self.addrs:
            # Address is a well-known alias
            return True
        
        # Address is not a well-known alias
        
        try:
            ipaddress.ip_network(addr)
        except ValueError:
            # Address is not an explicit address or CIDR subnet
            return False
        else:
            # Address is an explicit address or CIDR subnet
            return True


    def explicit_address(self, addr: Union[str,list]) -> str:
        """
        Return the explicit version of an IP address alias,
        or a list of IP address aliases.
        Example: "local" -> "192.168.0.0/16"

        :param addr: IP address alias(es) to explicit.
        :return: Explicit IP address(es).
        :raises ValueError: If the address is not a well-known alias or an explicit address.
        """
        # First check if address(es) correspond(s) to well-known alias(es)
        if not self.is_ip(addr):
            # Address(es) is/are invalid
            raise ValueError(f"Unknown address: {str(addr)}")

        # Check if given address(es) is/are a list
        if isinstance(addr, list):
            # List of IP address aliases, process each of them
            return self.format_list([self.explicit_address(a) for a in addr])
        
        # Single IP address alias
        
        # Address is valid
        if addr == "self":
            # Address is "self"
            return self.device[self.protocol_name]
        elif addr in self.addrs:
            # Address is a well-known address alias
            explicit = self.addrs[addr]
            if type(explicit) == list:
                # List of corresponding explicit addresses
                return self.format_list(explicit)
            else:
                # Single corresponding explicit address
                return explicit
        else:
            # Address is an explicit address
            return addr

    
    def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None:
        """
        Add a new IP address match to the nfqueue accumulator.

        :param addr_dir: Address direction to add the rule to (src or dst)
        :param is_backward: Whether the field to add is for a backward rule.
        """
        other_dir = "src" if addr_dir == "dst" else "dst"
        version = int(self.protocol_name[3])
        # Parts of the rules
        domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_"
        domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
        domain_name_rule_suffix = "_addr}})"
        ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
        ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))"
        cached_ip_rule_suffix = "_addr}}, interactions_data[{}].cached_ip)"
        # Template rules for a domain name
        rules_domain_name = {
            "forward": "( " + ip_addr_rule_prefix + addr_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )",
            "backward": "( " + ip_addr_rule_prefix + other_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )"
        }
        # Template rules for an IP address
        rules_address = {
            "forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix,
            "backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix
        }

        value = self.protocol_data[addr_dir]
        rules = {}
        # If value from YAML profile is a list, produce disjunction of all elements
        if isinstance(value, list):
            template = []
            match = []
            # Value is a list
            for v in value:
                is_ip = self.is_ip(v)
                template_rules = rules_address if is_ip else rules_domain_name
                func = self.explicit_address if is_ip else lambda x: x
                match.append(func(v))
                if not is_backward:
                    template.append(template_rules["forward"])
                elif is_backward and "backward" in template_rules:
                    template.append(template_rules["backward"])
            rules = {"template": template, "match": match}
        else:
            # Value is a single element
            is_ip = self.is_ip(value)
            template_rules = rules_address if is_ip else rules_domain_name
            func = self.explicit_address if is_ip else lambda x: x
            if not is_backward:
                rules = {"template": template_rules["forward"], "match": func(value)}
            elif is_backward and "backward" in template_rules:
                rules = {"template": template_rules["backward"], "match": func(value)}

        # Append rules
        if rules:
            self.rules["nfq"].append(rules)
            
    
    def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None:
        """
        Add a new IP address match to the accumulator, in two possible ways:
            - If the address is a well-known alias or an explicit IP address, add an nftables match.
            - If the address is a domain name, add an nfqueue match.

        :param addr_dir: Address direction to add the rule to (src or dst)
        :param is_backward: Whether the field to add is for a backward rule.
        :param initiator: Optional, initiator of the connection (src or dst).
        """
        other_dir = "src" if addr_dir == "dst" else "dst"
        addr = self.protocol_data[addr_dir]

        if self.is_ip(addr):  # Source address is a well-known alias or an explicit IP address
            tpl_addr_matches = {
                "src": "saddr {}",
                "dst": "daddr {}"
            }
            if initiator:  # Connection initiator is specified
                if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward):
                    # Connection initiator is the source device
                    rules = {
                        "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}",
                        "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}"
                    }
                elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward):
                    # Connection initiator is the destination device
                    rules = {
                        "forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}",
                        "backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}"
                    }
            
            else:  # Connection initiator is not specified
                rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"}
            
            self.add_field(addr_dir, rules, is_backward, self.explicit_address)

        else:  # Source address is potentially a domain name
            self.add_addr_nfqueue(addr_dir, is_backward)


    def parse(self, is_backward: bool = False, initiator: str = "") -> dict:
        """
        Parse the IP (v4 or v6) protocol.

        :param is_backward (optional): Whether the protocol must be parsed for a backward rule.
                                       Optional, default is `False`.
        :param initiator (optional): Connection initiator (src or dst).
                                     Optional, default is "src".
        :return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
        """
        if "src" in self.protocol_data:
            # Source address is specified
            self.add_addr("src", is_backward, initiator)
        if "dst" in self.protocol_data:
            # Destination address is specified
            self.add_addr("dst", is_backward, initiator)
        return self.rules