Smart Home firewall
Profile-based Smart Home firewall
Functions | Variables
translator Namespace Reference

Functions

int uint16 (str value)
 Custom Argparse types #####. More...
 
bool is_list (any value)
 Custom Jinja2 filters #####. More...
 
str debug (any value)
 
None flatten_policies (str single_policy_name, dict single_policy, dict acc={})
 Utility functions #####. More...
 
Tuple[Policy, bool] parse_policy (dict policy_data, dict interaction_data, dict global_accs, int policies_count, bool is_interaction=False, LogType log_type=LogType.NONE, int log_group=100)
 

Variables

 script_name = os.path.basename(__file__)
 
 script_path = Path(os.path.abspath(__file__))
 
 script_dir = script_path.parents[0]
 
string description = "Translate a device YAML profile to the corresponding pair of NFTables firewall script and NFQueue C source code."
 MAIN #####.
 
 parser = argparse.ArgumentParser(description=description)
 
 type
 
 str
 
 help
 
 uint16
 
 choices
 
 default
 
 NONE
 
 action
 
 args = parser.parse_args()
 
 device_path = os.path.abspath(os.path.dirname(args.profile))
 
 loader = jinja2.FileSystemLoader(searchpath=f"{script_dir}/templates")
 
 env = jinja2.Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
 
int nfq_id_inc = 10
 
 profile = yaml.load(f, IncludeLoader)
 
 device = profile["device-info"]
 
 nfq_id_base = args.nfq_id_base
 
dictionary global_accs
 
 profile_data = profile["single-policies"][policy_name]
 
dictionary policy_data
 
dictionary interaction_data
 
 is_backward = profile_data.get("bidirectional", False)
 
int policies_count = 2 if is_backward else 1
 
 policy
 
 new_nfq
 
dictionary policy_data_backward
 
 policy_backward
 
 interaction_policy = profile["interactions"][interaction_policy_name]
 
dictionary single_policies = {}
 
bool update_nfq_id_base = False
 
 timeout = profile_data["timeout"]
 
 single_policy
 
dictionary header_dict
 
 header = env.get_template("header.c.j2").render(header_dict)
 
dictionary callback_dict
 
 callback = env.get_template("callback.c.j2").render(callback_dict)
 
dictionary main_dict
 
 main = env.get_template("main.c.j2").render(main_dict)
 
dictionary nft_dict
 
dictionary cmake_dict = {"device": device["name"]}
 

Detailed Description

Translate a device YAML profile to the corresponding pair
of NFTables firewall script and NFQueue C source code.

Function Documentation

◆ debug()

str translator.debug ( any  value)
Custom filter for Jinja2, to print a value.

:param value: value to print
:return: an empty string

◆ flatten_policies()

None translator.flatten_policies ( str  single_policy_name,
dict  single_policy,
dict   acc = {} 
)

Utility functions #####.

Flatten a nested single policy into a list of single policies.

:param single_policy_name: Name of the single policy to be flattened
:param single_policy: Single policy to be flattened
:param acc: Accumulator for the flattened policies

◆ is_list()

bool translator.is_list ( any  value)

Custom Jinja2 filters #####.

Custom filter for Jinja2, to check whether a value is a list.

:param value: value to check
:return: True if value is a list, False otherwise

◆ parse_policy()

Tuple[Policy, bool] translator.parse_policy ( dict  policy_data,
dict  interaction_data,
dict  global_accs,
int  policies_count,
bool   is_interaction = False,
LogType   log_type = LogType.NONE,
int   log_group = 100 
)
Parse a policy.

:param policy_data: Dictionary containing all the necessary data to create a Policy object
:param interaction_data: Dictionary containing the interaction data that must be updated when parsing this policy
:param global_accs: Dictionary containing the global accumulators
:param policies_count: Number of policies in the interaction
:param is_interaction: Whether this policy is part of an interaction policy or not
:param log_type: Type of packet logging to be used
:param log_group: Log group ID to be used
:return: the parsed policy, as a `Policy` object, and a boolean indicating whether a new NFQueue was created

◆ uint16()

int translator.uint16 ( str  value)

Custom Argparse types #####.

Custom type for argparse,
to check whether a value is an unsigned 16-bit integer,
i.e. an integer between 0 and 65535.

:param value: value to check
:return: the value, if it is an unsigned 16-bit integer
:raises argparse.ArgumentTypeError: if the value is not an unsigned 16-bit integer

Variable Documentation

◆ callback_dict

dictionary translator.callback_dict
Initial value:
1 = {
2  "nft_table": f"bridge {device['name']}",
3  "interactions": global_accs["interactions"],
4  "nfqueues": global_accs["nfqueues"]
5  }

◆ header_dict

dictionary translator.header_dict
Initial value:
1 = {
2  "device": device["name"],
3  "custom_parsers": global_accs["custom_parsers"],
4  "num_threads": len(global_accs["nfqueues"]),
5  "interactions": global_accs["interactions"]
6  }

◆ interaction_data

dictionary translator.interaction_data
Initial value:
1 = {
2  "policy_idx": -1,
3  "max_state": -1,
4  "nfq_id_base": nfq_id_base,
5  "nfq_id_offset": 0,
6  "policies": []
7  }

◆ main_dict

dictionary translator.main_dict
Initial value:
1 = {
2  "interactions": global_accs["interactions"],
3  "custom_parsers": global_accs["custom_parsers"],
4  "nfqueues": global_accs["nfqueues"]
5  }

◆ nft_dict

dictionary translator.nft_dict
Initial value:
1 = {
2  "device": device,
3  "other_hosts": global_accs["other_hosts"],
4  "nfqueues": global_accs["nfqueues"],
5  "log_type": args.log_type,
6  "log_group": args.log_group,
7  "test": args.test
8  }

◆ policy_data

dictionary translator.policy_data
Initial value:
1 = {
2  "interaction_name": "single",
3  "policy_name": policy_name,
4  "profile_data": profile_data,
5  "device": device,
6  "is_backward": False,
7  "in_interaction": False
8  }

◆ policy_data_backward

dictionary translator.policy_data_backward
Initial value:
1 = {
2  "interaction_name": "single",
3  "policy_name": f"{policy_name}-backward",
4  "profile_data": profile_data,
5  "device": device,
6  "is_backward": True,
7  "in_interaction": False
8  }