Skip to content
Extraits de code Groupes Projets
Valider c240899a rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

Translator: small refactor, added arg output

parent 07efd878
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
......@@ -6,5 +6,7 @@
build
bin
devices
# Python cache
__pycache__
......@@ -37,6 +37,7 @@ set(PARSERS header dns dhcp http igmp ssdp coap)
# Subdirectories containing code
add_subdirectory(src)
add_subdirectory(devices)
IF( NOT OPENWRT_CROSSCOMPILING )
add_subdirectory(test)
ENDIF()
"""
Argparse types used by the translator.
"""
import os
import argparse
def uint16(arg: str) -> int:
"""
Custom type for argparse,
to check whether an argument is an unsigned 16-bit integer,
i.e. an integer between 0 and 65535.
:param arg: given argument
:return: the argument, as an `int` if it is an unsigned 16-bit integer
:raises argparse.ArgumentTypeError: if the argument is not an unsigned 16-bit integer
"""
result = int(arg)
if result < 0 or result > 65535:
raise argparse.ArgumentTypeError(f"\"{arg}\" is not an unsigned 16-bit integer (must be between 0 and 65535)")
return result
def proba(arg: str) -> float:
"""
Custom type for argparse,
to check whether an argument is a valid probability,
i.e. a float between 0 and 1.
:param arg: given argument
:return: the argument, as a `float`, if it is a valid probability
:raises argparse.ArgumentTypeError: if the argument is not a valid probability
"""
result = float(arg)
if result < 0 or result > 1:
raise argparse.ArgumentTypeError(f"\"{arg}\" is not a valid probability (must be a float between 0 and 1)")
return result
def directory(arg: str) -> str:
"""
Argparse type for an existing directory path.
:param arg: given argument
:return: absolute path to an existing directory
:raises argparse.ArgumentTypeError: if the given argument is not an existing directory
"""
if not os.path.isdir(arg):
raise argparse.ArgumentTypeError(f"\"{arg}\" is not an existing directory")
return os.path.abspath(arg)
......@@ -18,46 +18,14 @@ script_path = Path(os.path.abspath(__file__))
script_dir = script_path.parents[0]
sys.path.insert(0, os.path.join(script_dir, "protocols"))
# Import custom classes
# Import custom modules
from arg_types import uint16, proba, directory
from LogType import LogType
from Policy import Policy
from NFQueue import NFQueue
from pyyaml_loaders import IncludeLoader
##### Custom Argparse types #####
def uint16(value: str) -> int:
"""
Custom type for argparse,
to check whether a value is an unsigned 16-bit integer,
i.e. an integer between 0 and 65535.
:param value: value to check
:return: the value, if it is an unsigned 16-bit integer
:raises argparse.ArgumentTypeError: if the value is not an unsigned 16-bit integer
"""
result = int(value)
if result < 0 or result > 65535:
raise argparse.ArgumentTypeError(f"{value} is not an unsigned 16-bit integer (must be between 0 and 65535)")
return result
def proba(value: str) -> float:
"""
Custom type for argparse,
to check whether a value is a valid probability,
i.e. a float between 0 and 1.
:param value: value to check
:return: the value, if it is a valid probability
:raises argparse.ArgumentTypeError: if the value is not a valid probability
"""
result = float(value)
if result < 0 or result > 1:
raise argparse.ArgumentTypeError(f"{value} is not a valid probability (must be a float between 0 and 1)")
return result
##### Custom Jinja2 filters #####
......@@ -161,23 +129,28 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description=description)
parser.add_argument("profile", type=str, help="Path to the device YAML profile")
parser.add_argument("-q", "--nfqueue", type=uint16, default=0, help="NFQueue start index for this profile's policies (must be an integer between 0 and 65535)")
parser.add_argument("-o", "--output", type=directory, help="Output directory for the generated files")
# Verdict modes
parser.add_argument("-r", "--rate", type=int, help="Rate limit, in packets/second, to apply to matched traffic, instead of a binary verdict. Cannot be used with dropping probability.")
parser.add_argument("-p", "--drop-proba", type=proba, help="Dropping probability to apply to matched traffic, instead of a binary verdict. Cannot be used with rate limiting.")
# Netfilter logging
parser.add_argument("-l", "--log-type", type=lambda log_type: LogType[log_type], choices=list(LogType), default=LogType.NONE, help="Type of packet logging to be used")
parser.add_argument("-g", "--log-group", type=uint16, default=100, help="Log group number (must be an integer between 0 and 65535)")
parser.add_argument("-t", "--test", action="store_true", help="Test mode: use VM instead of router")
args = parser.parse_args()
## Argument validation
# Retrieve device profile's path
device_path = os.path.abspath(os.path.dirname(args.profile))
if args.output is None:
args.output = device_path
# Verify verdict mode
if args.rate is not None and args.drop_proba is not None:
parser.error("Arguments --rate and --drop-proba are mutually exclusive")
# Set default value for drop probability
args.drop_proba = 1.0 if args.drop_proba is None else args.drop_proba
# Retrieve device profile's path
device_path = os.path.abspath(os.path.dirname(args.profile))
# Jinja2 loader
loader = jinja2.FileSystemLoader(searchpath=f"{script_dir}/templates")
......@@ -251,7 +224,7 @@ if __name__ == "__main__":
"log_group": args.log_group,
"test": args.test
}
env.get_template("firewall.nft.j2").stream(nft_dict).dump(f"{device_path}/firewall.nft")
env.get_template("firewall.nft.j2").stream(nft_dict).dump(f"{args.output}/firewall.nft")
# If needed, create NFQueue-related files
num_threads = len([q for q in global_accs["nfqueues"] if q.queue_num >= 0])
......@@ -280,14 +253,14 @@ if __name__ == "__main__":
main = env.get_template("main.c.j2").render(main_dict)
# Write policy C file
with open(f"{device_path}/nfqueues.c", "w+") as fw:
with open(f"{args.output}/nfqueues.c", "w+") as fw:
fw.write(header)
fw.write(callback)
fw.write(main)
# Create CMake file
cmake_dict = {"device": device["name"]}
env.get_template("CMakeLists.txt.j2").stream(cmake_dict).dump(f"{device_path}/CMakeLists.txt")
env.get_template("CMakeLists.txt.j2").stream(cmake_dict).dump(f"{args.output}/CMakeLists.txt")
print(f"Done translating {args.profile}.")
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter