Skip to content
Extraits de code Groupes Projets
Valider bafac511 rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

Merge branch 'main' of github.com:smart-home-network-security/profile-translator-blocklist

parents 178844da 11af9ee7
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
""" """
Custom Jinja2 filters for the `profile-translator` package. Jinja2-related functions.
""" """
import jinja2
def is_list(value: any) -> bool: def is_list(value: any) -> bool:
""" """
Custom filter for Jinja2, to check whether a value is a list. Custom filter for Jinja2, to check whether a value is a list.
...@@ -21,3 +24,25 @@ def debug(value: any) -> str: ...@@ -21,3 +24,25 @@ def debug(value: any) -> str:
""" """
print(str(value)) print(str(value))
return "" return ""
def create_jinja_env(package: str) -> jinja2.Environment:
"""
Create a Jinja2 environment with custom filters.
Args:
package (str): package name
Returns:
Jinja2 environment
"""
# Create Jinja2 environment
loader = jinja2.PackageLoader(package, "templates")
env = jinja2.Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
# Add custom Jinja2 filters
env.filters["debug"] = debug
env.filters["is_list"] = is_list
env.filters["any"] = any
env.filters["all"] = all
return env
...@@ -13,7 +13,7 @@ import jinja2 ...@@ -13,7 +13,7 @@ import jinja2
from typing import Tuple from typing import Tuple
# Custom modules # Custom modules
from .arg_types import uint16, proba, directory from .arg_types import uint16, proba, directory
from .jinja_filters import debug, is_list from .jinja_utils import create_jinja_env
from .LogType import LogType from .LogType import LogType
from .Policy import Policy from .Policy import Policy
from .NFQueue import NFQueue from .NFQueue import NFQueue
...@@ -23,16 +23,6 @@ from pyyaml_loaders import IncludeLoader ...@@ -23,16 +23,6 @@ from pyyaml_loaders import IncludeLoader
# Package name # Package name
package = importlib.import_module(__name__).__name__.rpartition(".")[0] package = importlib.import_module(__name__).__name__.rpartition(".")[0]
## Jinja2 config
loader = jinja2.PackageLoader(package, "templates")
env = jinja2.Environment(loader=loader, trim_blocks=True, lstrip_blocks=True)
# Add custom Jinja2 filters
env.filters["debug"] = debug
env.filters["is_list"] = is_list
env.filters["any"] = any
env.filters["all"] = all
##### FUNCTIONS ##### ##### FUNCTIONS #####
...@@ -185,6 +175,15 @@ def write_firewall( ...@@ -185,6 +175,15 @@ def write_firewall(
args = validate_args(output_dir=output_dir, drop_proba=drop_proba) args = validate_args(output_dir=output_dir, drop_proba=drop_proba)
drop_proba = args["drop_proba"] drop_proba = args["drop_proba"]
# Jinja2 environment
templates = {}
env = create_jinja_env(package)
templates["firewall.nft"] = env.get_template("firewall.nft.j2")
templates["header.c"] = env.get_template("header.c.j2")
templates["callback.c"] = env.get_template("callback.c.j2")
templates["main.c"] = env.get_template("main.c.j2")
templates["CMakeLists.txt"] = env.get_template("CMakeLists.txt.j2")
# Create nftables script # Create nftables script
nft_dict = { nft_dict = {
"device": device, "device": device,
...@@ -194,7 +193,7 @@ def write_firewall( ...@@ -194,7 +193,7 @@ def write_firewall(
"log_group": log_group, "log_group": log_group,
"test": test "test": test
} }
env.get_template("firewall.nft.j2").stream(nft_dict).dump(os.path.join(output_dir, "firewall.nft")) templates["firewall.nft"].stream(nft_dict).dump(os.path.join(output_dir, "firewall.nft"))
# If needed, create NFQueue-related files # If needed, create NFQueue-related files
num_threads = len([q for q in global_accs["nfqueues"] if q.queue_num >= 0]) num_threads = len([q for q in global_accs["nfqueues"] if q.queue_num >= 0])
...@@ -207,20 +206,20 @@ def write_firewall( ...@@ -207,20 +206,20 @@ def write_firewall(
"drop_proba": drop_proba, "drop_proba": drop_proba,
"num_threads": num_threads, "num_threads": num_threads,
} }
header = env.get_template("header.c.j2").render(header_dict) header = templates["header.c"].render(header_dict)
callback_dict = { callback_dict = {
"nft_table": f"bridge {device['name']}", "nft_table": f"bridge {device['name']}",
"nfqueues": global_accs["nfqueues"], "nfqueues": global_accs["nfqueues"],
"drop_proba": drop_proba "drop_proba": drop_proba
} }
callback = env.get_template("callback.c.j2").render(callback_dict) callback = templates["callback.c"].render(callback_dict)
main_dict = { main_dict = {
"custom_parsers": global_accs["custom_parsers"], "custom_parsers": global_accs["custom_parsers"],
"nfqueues": global_accs["nfqueues"], "nfqueues": global_accs["nfqueues"],
"domain_names": global_accs["domain_names"], "domain_names": global_accs["domain_names"],
"num_threads": num_threads "num_threads": num_threads
} }
main = env.get_template("main.c.j2").render(main_dict) main = templates["main.c"].render(main_dict)
# Write policy C file # Write policy C file
with open(os.path.join(output_dir, "nfqueues.c"), "w+") as fw: with open(os.path.join(output_dir, "nfqueues.c"), "w+") as fw:
...@@ -235,7 +234,7 @@ def write_firewall( ...@@ -235,7 +234,7 @@ def write_firewall(
"custom_parsers": global_accs["custom_parsers"], "custom_parsers": global_accs["custom_parsers"],
"domain_names": global_accs["domain_names"] "domain_names": global_accs["domain_names"]
} }
env.get_template("CMakeLists.txt.j2").stream(cmake_dict).dump(os.path.join(output_dir, "CMakeLists.txt")) templates["CMakeLists.txt"].stream(cmake_dict).dump(os.path.join(output_dir, "CMakeLists.txt"))
def translate_policy( def translate_policy(
......
from setuptools import setup, find_packages
setup(
name='profile_translator_blocklist',
version='0.3.0',
author='François De Keersmaeker',
author_email='francois.dekeersmaeker@uclouvain.be',
description='Translate IoT YAML profiles to NFTables / NFQueue files for a block-list firewall.',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
url='https://github.com/smart-home-network-security/profile-translator-blocklist',
license='GPLv3+',
packages=find_packages(),
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
'Operating System :: OS Independent'
],
python_requires='>=3.8',
install_requires=[
"PyYAML",
"Jinja2",
"pyyaml-loaders"
],
package_data={
'profile_translator_blocklist': ['templates/*']
},
include_package_data=True
)
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter