Skip to content
Extraits de code Groupes Projets
Valider 3a22b8a4 rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

Added protocol translators

parent f77e9aa8
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
# Python caches
__pycache__
from protocols.Protocol import Protocol
class Custom(Protocol):
# Class variables
custom_parser = True # Whether the protocol has a custom parser
@staticmethod
def build_nfq_list_match(l: list, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> dict:
"""
Produce a nfqueue match for a list of values.
:param l: List of values.
:param template_rules: Dictionary containing the protocol-specific rules to add.
:param is_backward: Whether the field to add is for a backward rule.
:param func: Function to apply to the field value before writing it.
Optional, default is the identity function.
:param backward_func: Function to apply to the field value in the case of a backwards rule.
Will be applied after `func`.
Optional, default is the identity function.
"""
template = []
match = []
# Value is a list
for v in l:
if not is_backward:
template.append(template_rules["forward"])
match.append(func(v))
elif is_backward and "backward" in template_rules:
template.append(template_rules["backward"])
match.append(backward_func(func(v)))
return {"template": template, "match": match}
def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None:
"""
Add a new nfqueue match to the accumulator.
Overrides the nftables version.
:param field: Field to add the rule for.
:param template_rules: Dictionary containing the protocol-specific rules to add.
:param is_backward: Whether the field to add is for a backward rule.
:param func: Function to apply to the field value before writing it.
Optional, default is the identity function.
:param backward_func: Function to apply to the field value in the case of a backwards rule.
Will be applied after `func`.
Optional, default is the identity function.
Args:
field (str): Field to add the rule for.
template_rules (dict): Dictionary containing the protocol-specific rules to add.
is_backward (bool): Whether the field to add is for a backward rule.
func (lambda): Function to apply to the field value before writing it.
Optional, default is the identity function.
backward_func (lambda): Function to apply to the field value in the case of a backwards rule.
Will be applied after `func`.
Optional, default is the identity function.
"""
if field in self.protocol_data:
value = self.protocol_data[field]
rules = {}
# If value from YAML profile is a list, produce disjunction of all elements
if type(value) == list:
rules = Custom.build_nfq_list_match(value, template_rules, is_backward, func, backward_func)
else:
# Value is a single element
value = Protocol.convert_value(value)
if not is_backward:
rules = {"template": template_rules["forward"], "match": func(value)}
elif is_backward and "backward" in template_rules:
rules = {"template": template_rules["backward"], "match": backward_func(func(value))}
# Append rules
if rules:
self.rules["nfq"].append(rules)
from __future__ import annotations
from typing import Union
import importlib
class Protocol:
"""
Generic protocol, inherited by all concrete protocols.
"""
def __init__(self, protocol_data: dict, device: dict) -> None:
"""
Generic protocol constructor.
:param protocol_data: Dictionary containing the protocol data.
:param device: Dictionary containing the device metadata.
"""
self.protocol_data = protocol_data
self.device = device
self.rules = {
"nft": [],
"nfq": []
}
@staticmethod
def convert_value(value: str) -> Union[str, int]:
"""
Convert a string value to an int if possible.
:param value: Value to convert.
:return: Converted value as int if possible, or the original string value otherwise.
"""
try:
result = int(value)
except ValueError:
result = value
return result
@classmethod
def init_protocol(c, protocol_name: str, protocol_data: dict, device: dict) -> Protocol:
"""
Factory method for a specific protocol.
:param protocol_name: Name of the protocol.
:param protocol_data: Dictionary containing the protocol data.
:param device: Dictionary containing the device metadata.
"""
module = importlib.import_module(f"protocols.{protocol_name}")
cls = getattr(module, protocol_name)
return cls(protocol_data, device)
def format_list(self, l: list, func = lambda x: x) -> str:
"""
Format a list of values.
:param l: List of values.
:param func: Function to apply to each value.
Optional, default is the identity function.
:return: Formatted list.
"""
value = "{ "
for i in range(len(l)):
if i != 0:
value += ", "
value += str(func(l[i]))
value += " }"
return value
def add_field(self, field: str, template_rules: dict, is_backward: bool = False, func = lambda x: x, backward_func = lambda x: x) -> None:
"""
Add a new nftables rule to the nftables rules accumulator.
:param field: Field to add the rule for.
:param template_rules: Dictionary containing the protocol-specific rules to add.
:param is_backward (optional): Whether the field to add is for a backward rule.
Optional, default is `False`.
:param func (optional): Function to apply to the field value before writing it.
Optional, default is the identity function.
:param backward_func (optional): a to apply to the field value in the case of a backwards rule.
Will be applied after the forward function.
Optional, default is the identity function.
Args:
field (str): Field to add the rule for.
rules (dict): Dictionary containing the protocol-specific rules to add.
is_backward (bool): Whether the field to add is for a backward rule.
Optional, default is `False`.
func (lambda): Function to apply to the field value before writing it.
Optional, default is the identity function.
backward_func (lambda): a to apply to the field value in the case of a backwards rule.
Will be applied after the forward function.
Optional, default is the identity function.
"""
if self.protocol_data is not None and field in self.protocol_data:
value = self.protocol_data[field]
# If value from YAML profile is a list, add each element
if type(value) == list:
# Value is a list
value = self.format_list(value, func)
else:
# Value is a single element
value = func(value)
# Build rule
rule = {}
value = Protocol.convert_value(value)
if not is_backward:
rule = {"template": template_rules["forward"], "match": value}
elif is_backward and "backward" in template_rules:
rule = {"template": template_rules["backward"], "match": backward_func(value)}
# Add rule to the list of rules
if rule:
self.rules["nft"].append(rule)
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Default parsing method.
Must be updated in the children class.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
return self.rules
from protocols.Protocol import Protocol
class Transport(Protocol):
# Class variables
layer = 4 # Protocol OSI layer
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"src-port",
"dst-port"
]
def parse(self, is_backward: bool = False, initiator: str = "") -> dict:
"""
Parse a layer 4 protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Add protocol match
protocol_match = {
"template": "meta l4proto {}",
"match": self.protocol_name
}
self.rules["nft"].append(protocol_match)
# Connection initiator is specified
if initiator:
# Template rules
template_rules = {
"src-port": {"forward": "ct original proto-src {}", "backward": "ct original proto-dst {}"},
"dst-port": {"forward": "ct original proto-dst {}", "backward": "ct original proto-src {}"}
}
if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward):
# Connection initiator is the source device
self.add_field("src-port", template_rules["src-port"], is_backward)
self.add_field("dst-port", template_rules["dst-port"], is_backward)
elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward):
# Connection initiator is the destination device
self.add_field("src-port", template_rules["dst-port"], is_backward)
self.add_field("dst-port", template_rules["src-port"], is_backward)
# Connection initiator is not specified
else:
# Handle source port
rules = {"forward": self.protocol_name + " sport {}", "backward": self.protocol_name + " dport {}"}
self.add_field("src-port", rules, is_backward)
# Handle destination port
rules = {"forward": self.protocol_name + " dport {}", "backward": self.protocol_name + " sport {}"}
self.add_field("dst-port", rules, is_backward)
return self.rules
from protocols.Protocol import Protocol
class arp(Protocol):
# Class variables
protocol_name = "arp" # Protocol name
layer = 3 # Protocol OSI layer
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"type", # ARP message type
"sha", # ARP source hardware address
"tha", # ARP target hardware address
"spa", # ARP source protocol address
"tpa" # ARP target protocol address
]
# Well-known addresses
mac_addrs = {
"gateway": "c0:56:27:73:46:0b",
"default": "00:00:00:00:00:00",
"broadcast": "ff:ff:ff:ff:ff:ff",
"phone": "3c:cd:5d:a2:a9:d7"
}
ip_addrs = {
"local": "192.168.1.0/24",
"gateway": "192.168.1.1",
"phone": "192.168.1.222"
}
def explicit_address(self, addr: str, type: str = "ipv4") -> str:
"""
Return the explicit version of an IPv4 or MAC address alias.
Example: "local" -> "192.168.0.0/16"
:param addr: IPv4 or MAC address alias to explicit.
:param type: Type of address (ipv4 or mac).
:return: Explicit IPv4 or MAC address.
:raises ValueError: If the address is not a well-known alias or an explicit address.
"""
if addr == "self":
# Address is "self"
return self.device[type]
# Address is not "self"
# Get dictionary of well-known addresses, based on type
addrs = None
if type == "ipv4":
addrs = self.ip_addrs
elif type == "mac":
addrs = self.mac_addrs
if addr in addrs:
# Address is a well-known address alias
return addrs[addr]
else:
# Address is an explicit address
return addr
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the ARP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Lambda function to explicit a self or a well-known MAC address
func_mac = lambda mac: self.device['mac'] if mac == "self" else ( self.mac_addrs[mac] if mac in self.mac_addrs else mac )
# Lambda function to explicit a self or a well-known IPv4 address
func_ip = lambda ip: self.device['ipv4'] if ip == "self" else ( self.ip_addrs[ip] if ip in self.ip_addrs else ip )
# Handle ARP message type
rules = {"forward": "arp operation {}", "backward": "arp operation {}"}
# Lambda function to flip the ARP type (for the backward rule)
backward_func = lambda arp_type: "reply" if arp_type == "request" else ( "request" if arp_type == "reply" else arp_type )
self.add_field("type", rules, is_backward, backward_func=backward_func)
# Handle ARP source hardware address
rules = {"forward": "arp saddr ether {}", "backward": "arp daddr ether {}"}
self.add_field("sha", rules, is_backward, func_mac)
# Handle ARP target hardware address
rules = {"forward": "arp daddr ether {}", "backward": "arp saddr ether {}"}
self.add_field("tha", rules, is_backward, func_mac)
# Handle ARP source protocol address
rules = {"forward": "arp saddr ip {}", "backward": "arp daddr ip {}"}
self.add_field("spa", rules, is_backward, func_ip)
# Handle ARP target protocol address
rules = {"forward": "arp daddr ip {}", "backward": "arp saddr ip {}"}
self.add_field("tpa", rules, is_backward, func_ip)
return self.rules
from protocols.Custom import Custom
class coap(Custom):
# Class variables
layer = 7 # Protocol OSI layer
protocol_name = "coap" # Protocol name
# Supported keys in YAML profile
supported_keys = [
"type",
"method",
"uri"
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the CoAP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Lambda functions to convert a CoAP type or method to its C representation (upper case and separated by underscores)
func_coap_type = lambda type: f"COAP_{type.upper().replace('-', '_')}"
func_coap_method = lambda method: f"HTTP_{method.upper().replace('-', '_')}"
# Handle CoAP message type
rule = {"forward": "coap_message.type == {}"}
self.add_field("type", rule, is_backward, func_coap_type)
# Handle CoAP method
rule = {"forward": "coap_message.method == {}"}
self.add_field("method", rule, is_backward, func_coap_method)
# Handle CoAP URI
rule = {"forward": "strcmp(coap_message.uri, \"{}\") == 0"}
self.add_field("uri", rule, is_backward)
return self.rules
from protocols.Custom import Custom
class dhcp(Custom):
# Class variables
layer = 7 # Protocol OSI layer
protocol_name = "dhcp" # Protocol name
# Supported keys in YAML profile
supported_keys = [
"type",
"client-mac"
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the DHCP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Handle DHCP message type
rules = {"forward": "dhcp_message.options.message_type == {}"}
# Lambda function to convert a DHCP type to its C representation (upper case)
func = lambda dhcp_type: f"DHCP_{dhcp_type.upper()}"
self.add_field("type", rules, is_backward, func)
# Handle DHCP client MAC address
rules = {"forward": "strcmp(mac_hex_to_str(dhcp_message.chaddr), \"{}\") == 0"}
# Lambda function to explicit a self MAC address
func = lambda mac: self.device['mac'] if mac == "self" else mac
self.add_field("client-mac", rules, is_backward, func)
return self.rules
from protocols.Custom import Custom
class dns(Custom):
# Class variables
layer = 7 # Protocol OSI layer
protocol_name = "dns" # Protocol name
WILDCARD = "$" # Wildcard character for domain names
# Supported keys in YAML profile
supported_keys = [
"type", # DNS query type
"domain-name" # DNS domain name
]
@staticmethod
def get_domain_name_rule(domain_name: str) -> dict:
"""
Retrieves the NFQueue rule to match a given domain name.
:param domain_name: Domain name to match.
:return: Dictionary containing the NFQueue rule to match the given domain name.
"""
if domain_name.startswith(dns.WILDCARD):
suffix = domain_name[len(dns.WILDCARD):]
return {
"template": f"dns_contains_suffix_domain_name(dns_message.questions, dns_message.header.qdcount, \"{{}}\", {len(suffix)})",
"match": suffix
}
else:
return {
"template": "dns_contains_full_domain_name(dns_message.questions, dns_message.header.qdcount, \"{}\")",
"match": domain_name
}
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the DNS protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Handle QR flag
qr_rules = {}
if "response" in self.protocol_data and self.protocol_data["response"]:
if is_backward:
qr_rules = {"template": "dns_message.header.qr == {}", "match": 0}
else:
qr_rules = {"template": "dns_message.header.qr == {}", "match": 1}
else:
if is_backward:
qr_rules = {"template": "dns_message.header.qr == {}", "match": 1}
else:
qr_rules = {"template": "dns_message.header.qr == {}", "match": 0}
self.rules["nfq"].append(qr_rules)
# Handle DNS query type
rule = "( dns_message.header.qdcount > 0 && dns_message.questions->qtype == {} )"
# Lambda function to convert an DNS query type to its C representation (upper case)
func = lambda dns_qtype: dns_qtype.upper()
rules = {"forward": rule, "backward": rule}
self.add_field("qtype", rules, is_backward, func)
# Handle DNS domain name
domain_name = self.protocol_data.get("domain-name", None)
if domain_name is not None:
domain_name_rule = {}
if isinstance(domain_name, list):
template = []
match = []
for dname in domain_name:
single_rule = dns.get_domain_name_rule(dname)
template.append(single_rule["template"])
match.append(single_rule["match"])
domain_name_rule = {"template": template, "match": match}
else:
domain_name_rule = dns.get_domain_name_rule(domain_name)
self.rules["nfq"].append(domain_name_rule)
return self.rules
from protocols.Custom import Custom
class http(Custom):
# Class variables
layer = 7 # Protocol OSI layer
protocol_name = "http" # Protocol name
# Supported keys in YAML profile
supported_keys = [
"method",
"uri",
"response"
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the HTTP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Request or response
http_type_rule = {}
if "response" in self.protocol_data and self.protocol_data["response"]:
if is_backward:
http_type_rule = {"template": "{}http_message.is_request", "match": ""}
else:
http_type_rule = {"template": "{}http_message.is_request", "match": "!"}
else:
if is_backward:
http_type_rule = {"template": "{}http_message.is_request", "match": "!"}
else:
http_type_rule = {"template": "{}http_message.is_request", "match": ""}
self.rules["nfq"].append(http_type_rule)
# Handle HTTP method
rule = {"forward": "http_message.method == {}"}
# Lambda function to convert an HTTP method to its C representation (upper case)
func = lambda http_method: f"HTTP_{http_method.upper()}"
self.add_field("method", rule, is_backward, func)
# Handle HTTP URI
# URI can take two forms:
# - Complete URI: exact string match
# - URI prefix: string match with the beginning of the URI
uri = self.protocol_data.get("uri", None)
if uri is not None:
length = len(uri) - 1 if uri.endswith("*") or uri.endswith("$") else len(uri) + 1
rule = {"forward": f"strncmp(http_message.uri, \"{{}}\", {length}) == 0"}
self.add_field("uri", rule, is_backward)
return self.rules
from protocols.Protocol import Protocol
class icmp(Protocol):
# Class variables
layer = 4 # Protocol OSI layer
protocol_name = "icmp" # Protocol name
l4proto = 1 # Layer 4 protocol number
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"type" # ICMP message type
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the ICMP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Add protocol match
protocol_match = {
"template": "meta l4proto {}",
"match": self.l4proto
}
self.rules["nft"].append(protocol_match)
# Handle ICMP message type
icmp_rule = f"{self.protocol_name} type {{}}"
rules = {"forward": icmp_rule, "backward": icmp_rule}
# Lambda function to flip the ICMP type (for the backward rule)
backward_func = lambda icmp_type: icmp_type.replace("request", "reply") if "request" in icmp_type else ( icmp_type.replace("reply", "request") if "reply" in icmp_type else icmp_type )
self.add_field("type", rules, is_backward, backward_func=backward_func)
return self.rules
from protocols.Protocol import Protocol
class icmpv6(Protocol):
# Class variables
layer = 4 # Protocol OSI layer
protocol_name = "icmpv6" # Protocol name
l4proto = 58 # Layer 4 protocol number
custom_parser = False # Whether the protocol has a custom parser
# IPv6 multicast groups
groups = {
"multicast": "ff02::/16",
"all-nodes": "ff02::1",
"all-routers": "ff02::2",
"all-mldv2-routers": "ff02::16",
"mdns": "ff02::fb",
"coap": "ff02::158"
}
# Supported keys in YAML profile
# For now, no support for ICMPv6 options, as the router does not support them
supported_keys = []
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the ICMP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Add protocol match
protocol_match = {
"template": "meta l4proto {}",
"match": self.l4proto
}
self.rules["nft"].append(protocol_match)
return self.rules
from protocols.Custom import Custom
class igmp(Custom):
# Class variables
layer = 4 # Protocol OSI layer
protocol_name = "igmp" # Protocol name
l4proto = 2 # Layer 4 protocol number
custom_parser = True # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"version",
'type',
'group'
]
# Well-known groups
groups = {
"all": "224.0.0.2",
"mdns": "224.0.0.251",
"ssdp": "239.255.255.250",
"coap": "224.0.1.187"
}
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the IGMP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Add protocol match
protocol_match = {
"template": "meta l4proto {}",
"match": igmp.l4proto
}
self.rules["nft"].append(protocol_match)
# Retrieve IGMP version
version = self.protocol_data.get("version", 2)
# Handle IGMP message type
message_type = self.protocol_data["type"]
message_type = f"V{version}_{{}}" if "report" in message_type else "{}"
rules = {"forward": f"igmp_message.type == {message_type}"}
# Lambda function to convert an IGMP type to its C representation (upper case and separated by underscores)
func = lambda igmp_type: igmp_type.upper().replace(" ", "_")
self.add_field("type", rules, is_backward, func)
# Handle IGMP group
if version == 3:
# IGMPv3: consider only the first group record's multicast address
rules = {"forward": "strcmp(ipv4_net_to_str((igmp_message.body.v3_membership_report.groups)->group_address), \"{}\") == 0"}
else:
# IGMPv1 and IGMPv2
rules = {"forward": "strcmp(ipv4_net_to_str(igmp_message.body.v2_message.group_address), \"{}\") == 0"}
# Lambda function to explicit the address of a well-known group
func = lambda igmp_group: self.groups[igmp_group] if igmp_group in self.groups else igmp_group
self.add_field("group", rules, is_backward, func)
return self.rules
from typing import Union
import ipaddress
from protocols.Protocol import Protocol
from protocols.igmp import igmp
class ip(Protocol):
# Class variables
layer = 3 # Protocol OSI layer
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"src",
"dst"
]
# Well-known addresses
addrs = {
"ipv4": {
"local": "192.168.0.0/16",
"external": "!= 192.168.0.0/16",
"gateway": "192.168.1.1",
"phone": "192.168.1.222",
"broadcast": "255.255.255.255",
"udp-broadcast": "192.168.1.255",
"igmpv3": "224.0.0.22",
**igmp.groups
},
"ipv6": {
"default": "::",
"local": ["fe80::/10", "fc00::/7"],
"gateway": "fddd:ed18:f05b::1",
"gateway-local": "fe80::c256:27ff:fe73:460b",
"phone": "fe80::db22:fbec:a6b4:44fe",
}
}
@staticmethod
def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:param version: IP version (ipv4 or ipv6). Default is "ipv4".
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([ip.is_ip_static(a) for a in addr])
# Single address
if addr == "self" or addr in ip.addrs[version]:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_address(addr)
return True
except ValueError:
# Address is not an explicit address
return False
def is_ip(self, addr: Union[str, list]) -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([self.is_ip(a) for a in addr])
# Single address
if addr == "self" or addr in self.addrs:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_network(addr)
except ValueError:
# Address is not an explicit address or CIDR subnet
return False
else:
# Address is an explicit address or CIDR subnet
return True
def explicit_address(self, addr: Union[str,list]) -> str:
"""
Return the explicit version of an IP address alias,
or a list of IP address aliases.
Example: "local" -> "192.168.0.0/16"
:param addr: IP address alias(es) to explicit.
:return: Explicit IP address(es).
:raises ValueError: If the address is not a well-known alias or an explicit address.
"""
# First check if address(es) correspond(s) to well-known alias(es)
if not self.is_ip(addr):
# Address(es) is/are invalid
raise ValueError(f"Unknown address: {str(addr)}")
# Check if given address(es) is/are a list
if isinstance(addr, list):
# List of IP address aliases, process each of them
return self.format_list([self.explicit_address(a) for a in addr])
# Single IP address alias
# Address is valid
if addr == "self":
# Address is "self"
return self.device[self.protocol_name]
elif addr in self.addrs:
# Address is a well-known address alias
explicit = self.addrs[addr]
if type(explicit) == list:
# List of corresponding explicit addresses
return self.format_list(explicit)
else:
# Single corresponding explicit address
return explicit
else:
# Address is an explicit address
return addr
def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None:
"""
Add a new IP address match to the nfqueue accumulator.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
"""
other_dir = "src" if addr_dir == "dst" else "dst"
version = int(self.protocol_name[3])
# Parts of the rules
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_"
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
domain_name_rule_suffix = "_addr}})"
ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))"
cached_ip_rule_suffix = "_addr}}, interactions_data[{}].cached_ip)"
# Template rules for a domain name
rules_domain_name = {
"forward": "( " + ip_addr_rule_prefix + addr_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )",
"backward": "( " + ip_addr_rule_prefix + other_dir + cached_ip_rule_suffix + " || " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )"
}
# Template rules for an IP address
rules_address = {
"forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix,
"backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix
}
value = self.protocol_data[addr_dir]
rules = {}
# If value from YAML profile is a list, produce disjunction of all elements
if isinstance(value, list):
template = []
match = []
# Value is a list
for v in value:
is_ip = self.is_ip(v)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
match.append(func(v))
if not is_backward:
template.append(template_rules["forward"])
elif is_backward and "backward" in template_rules:
template.append(template_rules["backward"])
rules = {"template": template, "match": match}
else:
# Value is a single element
is_ip = self.is_ip(value)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
if not is_backward:
rules = {"template": template_rules["forward"], "match": func(value)}
elif is_backward and "backward" in template_rules:
rules = {"template": template_rules["backward"], "match": func(value)}
# Append rules
if rules:
self.rules["nfq"].append(rules)
def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None:
"""
Add a new IP address match to the accumulator, in two possible ways:
- If the address is a well-known alias or an explicit IP address, add an nftables match.
- If the address is a domain name, add an nfqueue match.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
:param initiator: Optional, initiator of the connection (src or dst).
"""
other_dir = "src" if addr_dir == "dst" else "dst"
addr = self.protocol_data[addr_dir]
if self.is_ip(addr): # Source address is a well-known alias or an explicit IP address
tpl_addr_matches = {
"src": "saddr {}",
"dst": "daddr {}"
}
if initiator: # Connection initiator is specified
if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward):
# Connection initiator is the source device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}"
}
elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward):
# Connection initiator is the destination device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}"
}
else: # Connection initiator is not specified
rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"}
self.add_field(addr_dir, rules, is_backward, self.explicit_address)
else: # Source address is potentially a domain name
self.add_addr_nfqueue(addr_dir, is_backward)
def parse(self, is_backward: bool = False, initiator: str = "") -> dict:
"""
Parse the IP (v4 or v6) protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
if "src" in self.protocol_data:
# Source address is specified
self.add_addr("src", is_backward, initiator)
if "dst" in self.protocol_data:
# Destination address is specified
self.add_addr("dst", is_backward, initiator)
return self.rules
from protocols.ip import ip
from protocols.igmp import igmp
class ipv4(ip):
# Class variables
protocol_name = "ipv4" # Protocol name
nft_prefix = "ip" # Prefix for nftables rules
# Well-known addresses
addrs = ip.addrs["ipv4"]
from protocols.ip import ip
from protocols.icmpv6 import icmpv6
class ipv6(ip):
# Class variables
protocol_name = "ipv6" # Protocol name
nft_prefix = "ip6" # Prefix for nftables rules
# Well-known addresses
addrs = {
**ip.addrs["ipv6"],
**icmpv6.groups
}
from protocols.dns import dns
class mdns(dns):
# Class variables
protocol_name = "mdns" # Protocol name
from protocols.Custom import Custom
class ssdp(Custom):
# Class variables
layer = 7 # Protocol OSI layer
protocol_name = "ssdp" # Protocol name
# Supported keys in YAML profile
supported_keys = [
"method",
"response"
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the SSDP protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Request or response
ssdp_type_rule = {}
if "response" in self.protocol_data and self.protocol_data["response"]:
if is_backward:
ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""}
else:
ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"}
else:
if is_backward:
ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": "!"}
else:
ssdp_type_rule = {"template": "{}ssdp_message.is_request", "match": ""}
self.rules["nfq"].append(ssdp_type_rule)
# Handle SSDP method
rule = {"forward": "ssdp_message.method == {}"}
# Lambda function to convert an SSDP method to its C representation (upper case and separated by underscores)
func = lambda ssdp_method: f"SSDP_{ssdp_method.upper().replace('-', '_')}"
self.add_field("method", rule, is_backward, func)
return self.rules
from protocols.Transport import Transport
class tcp(Transport):
# Class variables
protocol_name = "tcp" # Protocol name
# Supported keys in YAML profile
supported_keys = Transport.supported_keys + ["initiated-by"]
from protocols.Transport import Transport
class udp(Transport):
# Class variables
protocol_name = "udp" # Protocol name
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter