Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from Custom import Custom
class tls(Custom):
# Class variables
layer = 5 # Protocol OSI layer (arbitrary)
protocol_name = "tls" # Protocol name
supported_keys = [
"content-type",
"handshake-type",
"tls-version",
"session-id"
]
def parse(self, is_backward: bool = False, initiator: str = "src") -> dict:
"""
Parse the TLS protocol.
:param is_backward (optional): Whether the protocol must be parsed for a backward rule.
Optional, default is `False`.
:param initiator (optional): Connection initiator (src or dst).
Optional, default is "src".
:return: Dictionary containing the (forward and backward) nftables and nfqueue rules for this policy.
"""
# Handle TLS content type
content_type = self.protocol_data.get("content-type", None)
if content_type is not None:
rule = {"forward": "tls_packet->messages->message.content_type == {}"}
self.add_field("content-type", rule, is_backward)
# Handle TLS handshake type
handshake_type = self.protocol_data.get("handshake-type", None)
if handshake_type is not None:
if not ',' in str(handshake_type):
rule = {"forward": "tls_packet != NULL && tls_packet->messages != NULL && tls_packet->messages->message.handshake_type == {}"}
else:
if '[' in str(handshake_type): # in case format in profile is '[ {n°}, {n°}, ... ]'
handshake_type = handshake_type.replace('[', '').replace(']', '')
lst = [int(x.strip(), 16 if x.strip().startswith('0x') else 10) for x in handshake_type.split(',')]
conditions = ["tls_packet != NULL"]
current_chain = "tls_packet->messages"
conditions.append(f"{current_chain} != NULL")
for i, value in enumerate(lst):
if i > 0:
current_chain = f"{current_chain}->next"
conditions.append(f"{current_chain} != NULL")
if value == 20: # change cipher spec message or much less likely finished handshake encryted message
conditions.append(f"( {current_chain}->message.content_type == {value} || {current_chain}->message.handshake_type == {value} )")
else:
conditions.append(f"{current_chain}->message.handshake_type == {value}")
string = " && ".join(conditions)
rule = {"forward": string}
self.add_field("handshake-type", rule, is_backward)
# Handle TLS version
tls_version = self.protocol_data.get("tls-version", None)
if tls_version is not None:
if '"' in str(tls_version): # in case written in profile as string, is float otherwise
tls_version = tls_version.replace('"', '')
rule = {"forward": "tls_packet->messages->message.tls_version == {}"}
func = lambda tls_version: str(769 + int(str(tls_version)[-1])) # 769 = 0x0301
self.add_field("tls-version", rule, is_backward, func)
# Handle TLS session ID
session_id = self.protocol_data.get("session-id", None)
if session_id is not None:
rule = {"forward": "tls_packet->messages->message.session_id_present == {}"}
self.add_field("session-id", rule, is_backward)
return self.rules