-
François De Keersmaeker a rédigéFrançois De Keersmaeker a rédigé
ip.py 10,51 Kio
from typing import Union
import ipaddress
from protocols.Protocol import Protocol
from protocols.igmp import igmp
class ip(Protocol):
# Class variables
layer = 3 # Protocol OSI layer
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"src",
"dst"
]
# Well-known addresses
addrs = {
"ipv4": {
"local": "192.168.0.0/16",
"external": "!= 192.168.0.0/16",
"gateway": "192.168.1.1",
"phone": "192.168.1.222",
"broadcast": "255.255.255.255",
"udp-broadcast": "192.168.1.255",
"igmpv3": "224.0.0.22",
**igmp.groups
},
"ipv6": {
"default": "::",
"local": ["fe80::/10", "fc00::/7"],
"gateway": "fddd:ed18:f05b::1",
"gateway-local": "fe80::c256:27ff:fe73:460b",
"phone": "fe80::db22:fbec:a6b4:44fe",
}
}
@staticmethod
def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:param version: IP version (ipv4 or ipv6). Default is "ipv4".
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([ip.is_ip_static(a) for a in addr])
# Single address
if addr == "self" or addr in ip.addrs[version]:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_address(addr)
return True
except ValueError:
# Address is not an explicit address
return False
def is_ip(self, addr: Union[str, list]) -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([self.is_ip(a) for a in addr])
# Single address
if addr == "self" or addr in self.addrs:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_network(addr)
except ValueError:
# Address is not an explicit address or CIDR subnet
return False
else:
# Address is an explicit address or CIDR subnet
return True
def explicit_address(self, addr: Union[str,list]) -> str:
"""
Return the explicit version of an IP address alias,
or a list of IP address aliases.
Example: "local" -> "192.168.0.0/16"
:param addr: IP address alias(es) to explicit.
:return: Explicit IP address(es).
:raises ValueError: If the address is not a well-known alias or an explicit address.
"""
# First check if address(es) correspond(s) to well-known alias(es)
if not self.is_ip(addr):
# Address(es) is/are invalid
raise ValueError(f"Unknown address: {str(addr)}")
# Check if given address(es) is/are a list
if isinstance(addr, list):
# List of IP address aliases, process each of them
return self.format_list([self.explicit_address(a) for a in addr])
# Single IP address alias
# Address is valid
if addr == "self":
# Address is "self"
return self.device[self.protocol_name]
elif addr in self.addrs:
# Address is a well-known address alias
explicit = self.addrs[addr]
if type(explicit) == list:
# List of corresponding explicit addresses
return self.format_list(explicit)
else:
# Single corresponding explicit address
return explicit
else:
# Address is an explicit address
return addr
def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None:
"""
Add a new IP address match to the nfqueue accumulator.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
"""
other_dir = "src" if addr_dir == "dst" else "dst"
version = int(self.protocol_name[3])
# Parts of the rules
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_"
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
domain_name_rule_suffix = "_addr}})"
ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))"
cached_ip_rule_suffix = "_addr}}, interactions_data[{}].cached_ip)"
# Template rules for a domain name
rules_domain_name = {
"forward": "( " + ip_addr_rule_prefix + addr_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )",
"backward": "( " + ip_addr_rule_prefix + other_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )"
}
# Template rules for an IP address
rules_address = {
"forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix,
"backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix
}
value = self.protocol_data[addr_dir]
rules = {}
# If value from YAML profile is a list, produce disjunction of all elements
if isinstance(value, list):
template = []
match = []
# Value is a list
for v in value:
is_ip = self.is_ip(v)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
match.append(func(v))
if not is_backward:
template.append(template_rules["forward"])
elif is_backward and "backward" in template_rules:
template.append(template_rules["backward"])
rules = {"template": template, "match": match}
else:
# Value is a single element
is_ip = self.is_ip(value)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
if not is_backward:
rules = {"template": template_rules["forward"], "match": func(value)}
elif is_backward and "backward" in template_rules:
rules = {"template": template_rules["backward"], "match": func(value)}
# Append rules
if rules:
self.rules["nfq"].append(rules)
def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None:
"""
Add a new IP address match to the accumulator, in two possible ways:
- If the address is a well-known alias or an explicit IP address, add an nftables match.
- If the address is a domain name, add an nfqueue match.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
:param initiator: Optional, initiator of the connection (src or dst).
"""
other_dir = "src" if addr_dir == "dst" else "dst"
addr = self.protocol_data[addr_dir]
if self.is_ip(addr): # Source address is a well-known alias or an explicit IP address
tpl_addr_matches = {
"src": "saddr {}",
"dst": "daddr {}"
}
if initiator: # Connection initiator is specified
if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward):
# Connection initiator is the source device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}"
}
elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward):
# Connection initiator is the destination device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}"
}
else: # Connection initiator is not specified
rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"}
self.add_field(addr_dir, rules, is_backward, self.explicit_address)
else: # Source address is potentially a domain name
self.add_addr_nfqueue(addr_dir, is_backward)
def parse(self, is_backward: bool = False, initiator: str = "") -> dict:
"""
Parse the IP (v4 or v6) protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
if "src" in self.protocol_data:
# Source address is specified
self.add_addr("src", is_backward, initiator)
if "dst" in self.protocol_data:
# Destination address is specified
self.add_addr("dst", is_backward, initiator)
return self.rules