Smart Home firewall
Profile-based Smart Home firewall
Classes | Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | List of all members
Policy.Policy Class Reference

Classes

class  NftType
 

Public Member Functions

None __init__ (self, str interaction_name, str policy_name, dict profile_data, dict device, bool is_backward=False, bool in_interaction=False, timeout=0)
 
bool __eq__ (self, object other)
 
bool __lt__ (self, object other)
 
int __hash__ (self)
 
bool is_transient (self)
 
bool is_periodic (self)
 
Tuple[any, any] get_field (self, str field)
 
Dict[str, str] parse_stat (self, str stat)
 
str build_nft_rule (self, int queue_num, LogType log_type=LogType.NONE, int log_group=100)
 
str get_nft_rule (self)
 
None parse (self)
 
Tuple[str, dict] get_domain_name_hosts (self)
 
def is_base_for_counter (self, str counter)
 
def is_backward_for_counter (self, str counter)
 
dict get_data_from_nfqueues (self, list nfqueues)
 
dict get_nft_match_stats (self)
 

Static Public Member Functions

Tuple[any, any] get_field_static (any var, str field, str parent_key="")
 
int parse_duration (str duration)
 

Public Attributes

 interaction_name
 
 name
 
 profile_data
 
 is_backward
 
 in_interaction
 
 device
 
 custom_parser
 
 nft_matches
 
 nft_stats
 
 nft_match
 
 queue_num
 
 nft_action
 
 nfq_matches
 
 counters
 
 is_device
 Check involved devices.
 
 other_host
 
 timeout
 
 is_bidirectional
 
 transient
 
 periodic
 
 one_off
 
 initiator
 

Static Public Attributes

dictionary stats_metadata
 

Detailed Description

Class which represents a single access control policy.

Constructor & Destructor Documentation

◆ __init__()

None Policy.Policy.__init__ (   self,
str  interaction_name,
str  policy_name,
dict  profile_data,
dict  device,
bool   is_backward = False,
bool   in_interaction = False,
  timeout = 0 
)
Initialize a new Policy object.

:param name: Name of the policy
:param profile_data: Dictionary containing the policy data from the YAML profile
:param device: Dictionary containing the device metadata from the YAML profile
:param is_backward: Whether the policy is backwards (i.e. the source and destination are reversed)
:param in_interaction: Whether the policy is part of an interaction
:param logging: Whether to enable logging

Member Function Documentation

◆ __eq__()

bool Policy.Policy.__eq__ (   self,
object  other 
)
Check whether this Policy object is equal to another object.

:param other: object to compare to this Policy object
:return: True if the other object represents the same policy, False otherwise

◆ __hash__()

int Policy.Policy.__hash__ (   self)
Compute a hash value for this Policy object.

:return: hash value for this Policy object

◆ __lt__()

bool Policy.Policy.__lt__ (   self,
object  other 
)
Check whether this Policy object is less than another object.

:param other: object to compare to this Policy object
:return: True if this Policy object is less than the other object, False otherwise

◆ build_nft_rule()

str Policy.Policy.build_nft_rule (   self,
int  queue_num,
LogType   log_type = LogType.NONE,
int   log_group = 100 
)
Build and store the nftables match and action, as strings, for this policy.

:param queue_num: number of the nfqueue queue corresponding to this policy,
                  or a negative number if the policy is simply `accept`
:param log_type: type of logging to enable
:param log_group: log group number
:return: complete nftables rule for this policy

◆ get_data_from_nfqueues()

dict Policy.Policy.get_data_from_nfqueues (   self,
list  nfqueues 
)
Retrieve the policy dictionary from the nfqueue list.

:param nfqueues: List of nfqueues
:return: dictionary containing the policy data,
         or None if the policy is not found

◆ get_domain_name_hosts()

Tuple[str, dict] Policy.Policy.get_domain_name_hosts (   self)
Retrieve the domain names and IP addresses for this policy, if any.

:return: tuple containing:
            - the IP family nftables match (`ip` or `ip6`)
            - a dictionary containing a mapping between the direction matches (`saddr` or `daddr`)
              and the corresponding domain names or ip addresses

◆ get_field()

Tuple[any, any] Policy.Policy.get_field (   self,
str  field 
)
Retrieve the value for a given field in the policy profile data.
Adapted from https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists.

:param field: Field to retrieve
:return: tuple containing the parent key and the value for the given field,
         or None if the field is not found

◆ get_field_static()

Tuple[any, any] Policy.Policy.get_field_static ( any  var,
str  field,
str   parent_key = "" 
)
static
Retrieve the parent key and value for a given field in a dict.
Adapted from https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists.

:param var: Data structure to search in
:param field: Field to retrieve
:param parent_key: Parent key of the current data structure
:return: tuple containing the parent key and the value for the given field,
         or None if the field is not found

◆ get_nft_match_stats()

dict Policy.Policy.get_nft_match_stats (   self)
Retrieve this policy's stats which correspond to an NFTables match.

:return: dictionary containing the policy match statistics

◆ get_nft_rule()

str Policy.Policy.get_nft_rule (   self)
Retrieve the complete nftables rule, composed of the complete nftables match
and the action, for this policy.

:return: complete nftables rule for this policy

◆ is_backward_for_counter()

def Policy.Policy.is_backward_for_counter (   self,
str  counter 
)
Check if the policy is the backward policy for a given counter.

:param counter: Counter to check (packet-count or duration)
:return: True if the policy is the backward policy for the given counter and direction, False otherwise

◆ is_base_for_counter()

def Policy.Policy.is_base_for_counter (   self,
str  counter 
)
Check if the policy is the base policy for a given counter.

:param counter: Counter to check (packet-count or duration)
:return: True if the policy is the base policy for the given counter and direction, False otherwise

◆ is_periodic()

bool Policy.Policy.is_periodic (   self)
Check whether the policy represents a periodic pattern.

:return: True if this policy represents a periodic pattern, False otherwise

◆ is_transient()

bool Policy.Policy.is_transient (   self)
Check whether the policy represents a transient pattern.

:return: True if this policy represents a transient pattern, False otherwise

◆ parse()

None Policy.Policy.parse (   self)
Parse the policy and populate the related instance variables.

◆ parse_duration()

int Policy.Policy.parse_duration ( str  duration)
static
Parse the duration statistic value, and convert it to microseconds.
This value has the form "<value> <unit>".

:param duration: Duration value to parse
:return: duration value in microseconds
:raises ValueError: if the duration unit is invalid (not in the list of supported units, i.e. seconds, milliseconds, or microseconds)

◆ parse_stat()

Dict[str, str] Policy.Policy.parse_stat (   self,
str  stat 
)
Parse a single statistic.
Add the corresponding counters and nftables matches.

:param stat: Statistic to handle
:return: parsed stat, with the form {"template": ..., "match": ...}

Member Data Documentation

◆ stats_metadata

dictionary Policy.Policy.stats_metadata
static
Initial value:
= {
"rate": {"nft_type": NftType.MATCH, "counter": False, "template": "limit rate {}"},
"packet-size": {"nft_type": NftType.MATCH, "counter": False, "template": "ip length {}"},
"packet-count": {"counter": True},
"duration": {"counter": True}
}

The documentation for this class was generated from the following file: