|
None | __init__ (self, str interaction_name, str policy_name, dict profile_data, dict device, bool is_backward=False, bool in_interaction=False, timeout=0) |
|
bool | __eq__ (self, object other) |
|
bool | __lt__ (self, object other) |
|
int | __hash__ (self) |
|
bool | is_transient (self) |
|
bool | is_periodic (self) |
|
Tuple[any, any] | get_field (self, str field) |
|
Dict[str, str] | parse_stat (self, str stat) |
|
str | build_nft_rule (self, int queue_num, LogType log_type=LogType.NONE, int log_group=100) |
|
str | get_nft_rule (self) |
|
None | parse (self) |
|
Tuple[str, dict] | get_domain_name_hosts (self) |
|
def | is_base_for_counter (self, str counter) |
|
def | is_backward_for_counter (self, str counter) |
|
dict | get_data_from_nfqueues (self, list nfqueues) |
|
dict | get_nft_match_stats (self) |
|
|
| interaction_name |
|
| name |
|
| profile_data |
|
| is_backward |
|
| in_interaction |
|
| device |
|
| custom_parser |
|
| nft_matches |
|
| nft_stats |
|
| nft_match |
|
| queue_num |
|
| nft_action |
|
| nfq_matches |
|
| counters |
|
| is_device |
| Check involved devices.
|
|
| other_host |
|
| timeout |
|
| is_bidirectional |
|
| transient |
|
| periodic |
|
| one_off |
|
| initiator |
|
|
dictionary | stats_metadata |
|
Class which represents a single access control policy.
◆ __init__()
None Policy.Policy.__init__ |
( |
|
self, |
|
|
str |
interaction_name, |
|
|
str |
policy_name, |
|
|
dict |
profile_data, |
|
|
dict |
device, |
|
|
bool |
is_backward = False , |
|
|
bool |
in_interaction = False , |
|
|
|
timeout = 0 |
|
) |
| |
Initialize a new Policy object.
:param name: Name of the policy
:param profile_data: Dictionary containing the policy data from the YAML profile
:param device: Dictionary containing the device metadata from the YAML profile
:param is_backward: Whether the policy is backwards (i.e. the source and destination are reversed)
:param in_interaction: Whether the policy is part of an interaction
:param logging: Whether to enable logging
◆ __eq__()
bool Policy.Policy.__eq__ |
( |
|
self, |
|
|
object |
other |
|
) |
| |
Check whether this Policy object is equal to another object.
:param other: object to compare to this Policy object
:return: True if the other object represents the same policy, False otherwise
◆ __hash__()
int Policy.Policy.__hash__ |
( |
|
self | ) |
|
Compute a hash value for this Policy object.
:return: hash value for this Policy object
◆ __lt__()
bool Policy.Policy.__lt__ |
( |
|
self, |
|
|
object |
other |
|
) |
| |
Check whether this Policy object is less than another object.
:param other: object to compare to this Policy object
:return: True if this Policy object is less than the other object, False otherwise
◆ build_nft_rule()
str Policy.Policy.build_nft_rule |
( |
|
self, |
|
|
int |
queue_num, |
|
|
LogType |
log_type = LogType.NONE , |
|
|
int |
log_group = 100 |
|
) |
| |
Build and store the nftables match and action, as strings, for this policy.
:param queue_num: number of the nfqueue queue corresponding to this policy,
or a negative number if the policy is simply `accept`
:param log_type: type of logging to enable
:param log_group: log group number
:return: complete nftables rule for this policy
◆ get_data_from_nfqueues()
dict Policy.Policy.get_data_from_nfqueues |
( |
|
self, |
|
|
list |
nfqueues |
|
) |
| |
Retrieve the policy dictionary from the nfqueue list.
:param nfqueues: List of nfqueues
:return: dictionary containing the policy data,
or None if the policy is not found
◆ get_domain_name_hosts()
Tuple[str, dict] Policy.Policy.get_domain_name_hosts |
( |
|
self | ) |
|
Retrieve the domain names and IP addresses for this policy, if any.
:return: tuple containing:
- the IP family nftables match (`ip` or `ip6`)
- a dictionary containing a mapping between the direction matches (`saddr` or `daddr`)
and the corresponding domain names or ip addresses
◆ get_field()
Tuple[any, any] Policy.Policy.get_field |
( |
|
self, |
|
|
str |
field |
|
) |
| |
Retrieve the value for a given field in the policy profile data.
Adapted from https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists.
:param field: Field to retrieve
:return: tuple containing the parent key and the value for the given field,
or None if the field is not found
◆ get_field_static()
Tuple[any, any] Policy.Policy.get_field_static |
( |
any |
var, |
|
|
str |
field, |
|
|
str |
parent_key = "" |
|
) |
| |
|
static |
Retrieve the parent key and value for a given field in a dict.
Adapted from https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists.
:param var: Data structure to search in
:param field: Field to retrieve
:param parent_key: Parent key of the current data structure
:return: tuple containing the parent key and the value for the given field,
or None if the field is not found
◆ get_nft_match_stats()
dict Policy.Policy.get_nft_match_stats |
( |
|
self | ) |
|
Retrieve this policy's stats which correspond to an NFTables match.
:return: dictionary containing the policy match statistics
◆ get_nft_rule()
str Policy.Policy.get_nft_rule |
( |
|
self | ) |
|
Retrieve the complete nftables rule, composed of the complete nftables match
and the action, for this policy.
:return: complete nftables rule for this policy
◆ is_backward_for_counter()
def Policy.Policy.is_backward_for_counter |
( |
|
self, |
|
|
str |
counter |
|
) |
| |
Check if the policy is the backward policy for a given counter.
:param counter: Counter to check (packet-count or duration)
:return: True if the policy is the backward policy for the given counter and direction, False otherwise
◆ is_base_for_counter()
def Policy.Policy.is_base_for_counter |
( |
|
self, |
|
|
str |
counter |
|
) |
| |
Check if the policy is the base policy for a given counter.
:param counter: Counter to check (packet-count or duration)
:return: True if the policy is the base policy for the given counter and direction, False otherwise
◆ is_periodic()
bool Policy.Policy.is_periodic |
( |
|
self | ) |
|
Check whether the policy represents a periodic pattern.
:return: True if this policy represents a periodic pattern, False otherwise
◆ is_transient()
bool Policy.Policy.is_transient |
( |
|
self | ) |
|
Check whether the policy represents a transient pattern.
:return: True if this policy represents a transient pattern, False otherwise
◆ parse()
None Policy.Policy.parse |
( |
|
self | ) |
|
Parse the policy and populate the related instance variables.
◆ parse_duration()
int Policy.Policy.parse_duration |
( |
str |
duration | ) |
|
|
static |
Parse the duration statistic value, and convert it to microseconds.
This value has the form "<value> <unit>".
:param duration: Duration value to parse
:return: duration value in microseconds
:raises ValueError: if the duration unit is invalid (not in the list of supported units, i.e. seconds, milliseconds, or microseconds)
◆ parse_stat()
Dict[str, str] Policy.Policy.parse_stat |
( |
|
self, |
|
|
str |
stat |
|
) |
| |
Parse a single statistic.
Add the corresponding counters and nftables matches.
:param stat: Statistic to handle
:return: parsed stat, with the form {"template": ..., "match": ...}
◆ stats_metadata
dictionary Policy.Policy.stats_metadata |
|
static |
Initial value:= {
"rate": {"nft_type": NftType.MATCH, "counter": False, "template": "limit rate {}"},
"packet-size": {"nft_type": NftType.MATCH, "counter": False, "template": "ip length {}"},
"packet-count": {"counter": True},
"duration": {"counter": True}
}
The documentation for this class was generated from the following file: