Newer
Older
from typing import Union
import ipaddress
from Protocol import Protocol
from igmp import igmp
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class ip(Protocol):
# Class variables
layer = 3 # Protocol OSI layer
custom_parser = False # Whether the protocol has a custom parser
# Supported keys in YAML profile
supported_keys = [
"src",
"dst"
]
# Well-known addresses
addrs = {
"ipv4": {
"local": "192.168.0.0/16",
"external": "!= 192.168.0.0/16",
"gateway": "192.168.1.1",
"phone": "192.168.1.222",
"broadcast": "255.255.255.255",
"udp-broadcast": "192.168.1.255",
"igmpv3": "224.0.0.22",
**igmp.groups
},
"ipv6": {
"default": "::",
"local": ["fe80::/10", "fc00::/7"],
"gateway": "fddd:ed18:f05b::1",
"gateway-local": "fe80::c256:27ff:fe73:460b",
"phone": "fe80::db22:fbec:a6b4:44fe",
}
}
@staticmethod
def is_ip_static(addr: Union[str, list], version: str = "ipv4") -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:param version: IP version (ipv4 or ipv6). Default is "ipv4".
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([ip.is_ip_static(a) for a in addr])
# Single address
if addr == "self" or addr in ip.addrs[version]:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_address(addr)
return True
except ValueError:
# Address is not an explicit address
return False
def is_ip(self, addr: Union[str, list]) -> bool:
"""
Check whether a (list of) string is a well-known IP alias or an explicit IP address.
:param addr: (list of) string to check.
:return: True if the (list of) string is an IP address, False otherwise.
"""
if type(addr) == list:
# List of addresses
return all([self.is_ip(a) for a in addr])
# Single address
if addr == "self" or addr in self.addrs:
# Address is a well-known alias
return True
# Address is not a well-known alias
try:
ipaddress.ip_network(addr)
except ValueError:
# Address is not an explicit address or CIDR subnet
return False
else:
# Address is an explicit address or CIDR subnet
return True
def explicit_address(self, addr: Union[str,list]) -> str:
"""
Return the explicit version of an IP address alias,
or a list of IP address aliases.
Example: "local" -> "192.168.0.0/16"
:param addr: IP address alias(es) to explicit.
:return: Explicit IP address(es).
:raises ValueError: If the address is not a well-known alias or an explicit address.
"""
# First check if address(es) correspond(s) to well-known alias(es)
if not self.is_ip(addr):
# Address(es) is/are invalid
raise ValueError(f"Unknown address: {str(addr)}")
# Check if given address(es) is/are a list
if isinstance(addr, list):
# List of IP address aliases, process each of them
return self.format_list([self.explicit_address(a) for a in addr])
# Single IP address alias
# Address is valid
if addr == "self":
# Address is "self"
return self.device[self.protocol_name]
elif addr in self.addrs:
# Address is a well-known address alias
explicit = self.addrs[addr]
if type(explicit) == list:
# List of corresponding explicit addresses
return self.format_list(explicit)
else:
# Single corresponding explicit address
return explicit
else:
# Address is an explicit address
return addr
def add_addr_nfqueue(self, addr_dir: str, is_backward: bool = False) -> None:
"""
Add a new IP address match to the nfqueue accumulator.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
"""
other_dir = "src" if addr_dir == "dst" else "dst"
version = int(self.protocol_name[3])
# Parts of the rules
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = get_" + self.protocol_name + "_"
domain_name_rule_prefix = "dns_entry_contains(dns_map_get(dns_map, \"{}\"), (ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
domain_name_rule_suffix = "_addr}})"
ip_addr_rule_prefix = "compare_ip((ip_addr_t) {{.version = " + str(version) + ", .value." + self.protocol_name + " = "
ip_addr_rule_suffix = "_addr(payload)}}, ip_str_to_net(\"{}\", " + str(version) + "))"
# Template rules for a domain name
rules_domain_name = {
"forward": "( " + domain_name_rule_prefix + addr_dir + domain_name_rule_suffix + " )",
"backward": "( " + domain_name_rule_prefix + other_dir + domain_name_rule_suffix + " )"
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
}
# Template rules for an IP address
rules_address = {
"forward": ip_addr_rule_prefix + addr_dir + ip_addr_rule_suffix,
"backward": ip_addr_rule_prefix + other_dir + ip_addr_rule_suffix
}
value = self.protocol_data[addr_dir]
rules = {}
# If value from YAML profile is a list, produce disjunction of all elements
if isinstance(value, list):
template = []
match = []
# Value is a list
for v in value:
is_ip = self.is_ip(v)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
match.append(func(v))
if not is_backward:
template.append(template_rules["forward"])
elif is_backward and "backward" in template_rules:
template.append(template_rules["backward"])
rules = {"template": template, "match": match}
else:
# Value is a single element
is_ip = self.is_ip(value)
template_rules = rules_address if is_ip else rules_domain_name
func = self.explicit_address if is_ip else lambda x: x
if not is_backward:
rules = {"template": template_rules["forward"], "match": func(value)}
elif is_backward and "backward" in template_rules:
rules = {"template": template_rules["backward"], "match": func(value)}
# Append rules
if rules:
self.rules["nfq"].append(rules)
def add_addr(self, addr_dir: str, is_backward: bool = False, initiator: str = "") -> None:
"""
Add a new IP address match to the accumulator, in two possible ways:
- If the address is a well-known alias or an explicit IP address, add an nftables match.
- If the address is a domain name, add an nfqueue match.
:param addr_dir: Address direction to add the rule to (src or dst)
:param is_backward: Whether the field to add is for a backward rule.
:param initiator: Optional, initiator of the connection (src or dst).
"""
other_dir = "src" if addr_dir == "dst" else "dst"
addr = self.protocol_data[addr_dir]
if self.is_ip(addr): # Source address is a well-known alias or an explicit IP address
tpl_addr_matches = {
"src": "saddr {}",
"dst": "daddr {}"
}
if initiator: # Connection initiator is specified
if (initiator == "src" and not is_backward) or (initiator == "dst" and is_backward):
# Connection initiator is the source device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}"
}
elif (initiator == "src" and is_backward) or (initiator == "dst" and not is_backward):
# Connection initiator is the destination device
rules = {
"forward": f"ct original {self.nft_prefix} {tpl_addr_matches[other_dir]}",
"backward": f"ct original {self.nft_prefix} {tpl_addr_matches[addr_dir]}"
}
else: # Connection initiator is not specified
rules = {"forward": f"{self.nft_prefix} {tpl_addr_matches[addr_dir]}", "backward": f"{self.nft_prefix} {tpl_addr_matches[other_dir]}"}
self.add_field(addr_dir, rules, is_backward, self.explicit_address)
else: # Source address is potentially a domain name
self.add_addr_nfqueue(addr_dir, is_backward)
def parse(self, is_backward: bool = False, initiator: str = "") -> dict:
"""
Parse the IP (v4 or v6) protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
if "src" in self.protocol_data:
# Source address is specified
self.add_addr("src", is_backward, initiator)
if "dst" in self.protocol_data:
# Destination address is specified
self.add_addr("dst", is_backward, initiator)
return self.rules