diff --git a/.gitignore b/.gitignore index d1eba1603ad6581ddc3bd48665ac40b764f8c128..712e7d29ef61f97c040c83aea67b58dc61c1d820 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,7 @@ build bin +devices + # Python cache __pycache__ diff --git a/CMakeLists.txt b/CMakeLists.txt index 50f80e22b91c9eb913bbf5663e252003e4d891c8..08bb192c35fbbb7f9fbd898c40228c1478953b43 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ set(PARSERS header dns dhcp http igmp ssdp coap) # Subdirectories containing code add_subdirectory(src) +add_subdirectory(devices) IF( NOT OPENWRT_CROSSCOMPILING ) add_subdirectory(test) ENDIF() diff --git a/src/translator/arg_types.py b/src/translator/arg_types.py new file mode 100644 index 0000000000000000000000000000000000000000..589e4998dadf6bf9b5107fd4436d4d66f1ea9ed6 --- /dev/null +++ b/src/translator/arg_types.py @@ -0,0 +1,51 @@ +""" +Argparse types used by the translator. +""" + +import os +import argparse + + +def uint16(arg: str) -> int: + """ + Custom type for argparse, + to check whether an argument is an unsigned 16-bit integer, + i.e. an integer between 0 and 65535. + + :param arg: given argument + :return: the argument, as an `int` if it is an unsigned 16-bit integer + :raises argparse.ArgumentTypeError: if the argument is not an unsigned 16-bit integer + """ + result = int(arg) + if result < 0 or result > 65535: + raise argparse.ArgumentTypeError(f"\"{arg}\" is not an unsigned 16-bit integer (must be between 0 and 65535)") + return result + + +def proba(arg: str) -> float: + """ + Custom type for argparse, + to check whether an argument is a valid probability, + i.e. a float between 0 and 1. + + :param arg: given argument + :return: the argument, as a `float`, if it is a valid probability + :raises argparse.ArgumentTypeError: if the argument is not a valid probability + """ + result = float(arg) + if result < 0 or result > 1: + raise argparse.ArgumentTypeError(f"\"{arg}\" is not a valid probability (must be a float between 0 and 1)") + return result + + +def directory(arg: str) -> str: + """ + Argparse type for an existing directory path. + + :param arg: given argument + :return: absolute path to an existing directory + :raises argparse.ArgumentTypeError: if the given argument is not an existing directory + """ + if not os.path.isdir(arg): + raise argparse.ArgumentTypeError(f"\"{arg}\" is not an existing directory") + return os.path.abspath(arg) diff --git a/src/translator/translator.py b/src/translator/translator.py index 4b580c8ceac538544e24bc0e4d326d743db63c60..336aad1ccccf2280001e33593f454112ced38f54 100644 --- a/src/translator/translator.py +++ b/src/translator/translator.py @@ -18,46 +18,14 @@ script_path = Path(os.path.abspath(__file__)) script_dir = script_path.parents[0] sys.path.insert(0, os.path.join(script_dir, "protocols")) -# Import custom classes +# Import custom modules +from arg_types import uint16, proba, directory from LogType import LogType from Policy import Policy from NFQueue import NFQueue from pyyaml_loaders import IncludeLoader -##### Custom Argparse types ##### - -def uint16(value: str) -> int: - """ - Custom type for argparse, - to check whether a value is an unsigned 16-bit integer, - i.e. an integer between 0 and 65535. - - :param value: value to check - :return: the value, if it is an unsigned 16-bit integer - :raises argparse.ArgumentTypeError: if the value is not an unsigned 16-bit integer - """ - result = int(value) - if result < 0 or result > 65535: - raise argparse.ArgumentTypeError(f"{value} is not an unsigned 16-bit integer (must be between 0 and 65535)") - return result - - -def proba(value: str) -> float: - """ - Custom type for argparse, - to check whether a value is a valid probability, - i.e. a float between 0 and 1. - - :param value: value to check - :return: the value, if it is a valid probability - :raises argparse.ArgumentTypeError: if the value is not a valid probability - """ - result = float(value) - if result < 0 or result > 1: - raise argparse.ArgumentTypeError(f"{value} is not a valid probability (must be a float between 0 and 1)") - return result - ##### Custom Jinja2 filters ##### @@ -161,23 +129,28 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description=description) parser.add_argument("profile", type=str, help="Path to the device YAML profile") parser.add_argument("-q", "--nfqueue", type=uint16, default=0, help="NFQueue start index for this profile's policies (must be an integer between 0 and 65535)") + parser.add_argument("-o", "--output", type=directory, help="Output directory for the generated files") # Verdict modes parser.add_argument("-r", "--rate", type=int, help="Rate limit, in packets/second, to apply to matched traffic, instead of a binary verdict. Cannot be used with dropping probability.") parser.add_argument("-p", "--drop-proba", type=proba, help="Dropping probability to apply to matched traffic, instead of a binary verdict. Cannot be used with rate limiting.") + # Netfilter logging parser.add_argument("-l", "--log-type", type=lambda log_type: LogType[log_type], choices=list(LogType), default=LogType.NONE, help="Type of packet logging to be used") parser.add_argument("-g", "--log-group", type=uint16, default=100, help="Log group number (must be an integer between 0 and 65535)") parser.add_argument("-t", "--test", action="store_true", help="Test mode: use VM instead of router") args = parser.parse_args() + ## Argument validation + # Retrieve device profile's path + device_path = os.path.abspath(os.path.dirname(args.profile)) + if args.output is None: + args.output = device_path # Verify verdict mode if args.rate is not None and args.drop_proba is not None: parser.error("Arguments --rate and --drop-proba are mutually exclusive") - # Set default value for drop probability args.drop_proba = 1.0 if args.drop_proba is None else args.drop_proba - # Retrieve device profile's path - device_path = os.path.abspath(os.path.dirname(args.profile)) + # Jinja2 loader loader = jinja2.FileSystemLoader(searchpath=f"{script_dir}/templates") @@ -251,7 +224,7 @@ if __name__ == "__main__": "log_group": args.log_group, "test": args.test } - env.get_template("firewall.nft.j2").stream(nft_dict).dump(f"{device_path}/firewall.nft") + env.get_template("firewall.nft.j2").stream(nft_dict).dump(f"{args.output}/firewall.nft") # If needed, create NFQueue-related files num_threads = len([q for q in global_accs["nfqueues"] if q.queue_num >= 0]) @@ -280,14 +253,14 @@ if __name__ == "__main__": main = env.get_template("main.c.j2").render(main_dict) # Write policy C file - with open(f"{device_path}/nfqueues.c", "w+") as fw: + with open(f"{args.output}/nfqueues.c", "w+") as fw: fw.write(header) fw.write(callback) fw.write(main) # Create CMake file cmake_dict = {"device": device["name"]} - env.get_template("CMakeLists.txt.j2").stream(cmake_dict).dump(f"{device_path}/CMakeLists.txt") + env.get_template("CMakeLists.txt.j2").stream(cmake_dict).dump(f"{args.output}/CMakeLists.txt") print(f"Done translating {args.profile}.")