Skip to content
Extraits de code Groupes Projets
dns_unbound_cache_reader.py 7,16 ko
Newer Older
  • Learn to ignore specific revisions
  • from enum import Enum, IntEnum
    import re
    
    import subprocess
    from fabric import Connection, Config
    
    
    ## Global variables
    
    # Localhost IP addresses
    
    localhost = [
        "localhost",
    
        "127.0.0.1",
        "::1",
        "0000:0000:0000:0000:0000:0000:0000:0001"
    
    # Shell command to get the DNS cache
    
    cmd = "unbound-control dump_cache"
    
    # Strings to skip in the DNS cache
    to_skip = (
        ";",
        "START",
        "END",
        "EOF"
    )
    # Regex patterns
    
    pattern_line      = r"^([a-zA-Z0-9._-]+)\s+(\d+)\s+IN\s+([A-Z]+)\s+(.+)$"                   # Generic DNS cache line
    
    pattern_ipv4_byte = r"(25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])"                          # Single byte from an IPv4 address
    pattern_ptr       = (pattern_ipv4_byte + r"\.") * 3 + pattern_ipv4_byte + r".in-addr.arpa"  # Reverse DNS lookup qname
    pattern_srv       = r"^(\d+)\s+(\d+)\s+(\d+)\s+([a-zA-Z0-9.-]+)$"                           # SRV record target
    
    
    class DnsCacheSection(Enum):
        """
        Enum class for the strings indicating the relevant sections in the DNS cache.
        """
        START_RRSET = "START_RRSET_CACHE"
        END_RRSET   = "END_RRSET_CACHE"
    
    
    class DnsRtype(IntEnum):
        """
        Enum class for the DNS resource record types.
        """
    
        A     = 1   # IPv4 address
        CNAME = 5   # Canonical name
        PTR   = 12  # Domain name pointer
        AAAA  = 28  # IPv6 address
        SRV   = 33  # Service locator
    
    
    
    class DnsTableKeys(Enum):
        """
        Enum class for the allowed dictionary keys.
        """
    
    def update_dns_table(
            dns_table: dict = {},
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
            host: str = "127.0.0.1",
            file: str = None
        ) -> dict:
    
        Update the given DNS table by reading the current DNS cache.
    
            dns_table (dict): Dictionary containing the current DNS table.
    
            host (str): IP address of the Unbound DNS server. Default is localhost.
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
            file (str): Path to a file containing the Unbound DNS cache. Default is None.
                        If specified, the function reads the cache from the file instead of the server.
    
        Returns:
            dict: Updated DNS table.
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
        ### Get DNS cache ###
        dns_cache = None
    
        if file is None:
            # Get DNS cache from Unbound
    
            if host in localhost:
                ## Unbound runs on localhost
    
                proc = subprocess.run(cmd.split(), capture_output=True, text=True)
                dns_cache = proc.stdout.strip().split("\n")
                del proc
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
    
            else:
                ## Unbound runs on a remote host
                # SSH connection with remote host
                ssh_config = Config(overrides={"run": {"hide": True}})
    
                with Connection(host, config=ssh_config) as remote:
                    # Get the DNS cache
                    result = remote.run(cmd, warn=True)
                    if not result.failed:
                        dns_cache = result.stdout.strip().split("\n")
                    # Free resources
                    del result
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
            # Read DNS cache from file
            with open(file, "r") as f:
                dns_cache = f.read().strip().split("\n")
    
    
        ### Parse DNS cache ###
    
        # Find start and end indices of RRSET section
        try:
            start_idx = dns_cache.index(DnsCacheSection.START_RRSET.value)
            end_idx   = dns_cache.index(DnsCacheSection.END_RRSET.value)
        except ValueError:
            start_idx = 0
            end_idx   = len(dns_cache)
    
        # Loop through the RRSET section
        for line in dns_cache[start_idx+1:end_idx]:
    
            # Lines containing metadata, skip
            if line.startswith(to_skip):
                continue
    
            # Parse line with regex
            match = re.match(pattern_line, line)
            
            # No regex match, skip line
            if not match:
                continue
    
    
            qname  = match.group(1)
            if qname.endswith("."):
                qname = qname[:-1]
    
            rtype = match.group(3)
            rdata = match.group(4)
            if rdata.endswith("."):
                rdata = rdata[:-1]
    
            # rtype not in allowed list, skip line
            if rtype not in DnsRtype._member_names_:
                continue
    
    
            ## Parse supported records
    
            # A (IPv4) and AAAA (IPv6) records
            if rtype == DnsRtype.A.name or rtype == DnsRtype.AAAA.name:
                ip = rdata
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
                if DnsTableKeys.IP.name in dns_table:
    
                    dns_table[DnsTableKeys.IP.name][ip] = qname
    
                    dns_table[DnsTableKeys.IP.name] = {ip: qname}
    
            # CNAME records
            if rtype == DnsRtype.CNAME.name:
                cname = rdata
                if DnsTableKeys.ALIAS.name in dns_table:
                    dns_table[DnsTableKeys.ALIAS.name][cname] = qname
                else:
                    dns_table[DnsTableKeys.ALIAS.name] = {cname: qname}
    
            # SRV records
            if rtype == DnsRtype.SRV.name:
                # Parse target service
                match_srv = re.match(pattern_srv, rdata)
                if not match_srv:
                    continue
                service = match_srv.group(4)
                if service.endswith("."):
                    service = service[:-1]
                if DnsTableKeys.ALIAS.name in dns_table:
                    dns_table[DnsTableKeys.ALIAS.name][service] = qname
                else:
                    dns_table[DnsTableKeys.ALIAS.name] = {service: qname}
    
    
            # PTR records
            if rtype == DnsRtype.PTR.name:
    
                match_ptr = re.match(pattern_ptr, qname)
    
                if match_ptr:
                    # PTR record is a reverse DNS lookup
                    ip = ".".join(reversed(match_ptr.groups()))
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
                    if ip not in dns_table.get(DnsTableKeys.IP.name, {}):
                        if DnsTableKeys.IP.name in dns_table:
                            dns_table[DnsTableKeys.IP.name][ip] = rdata
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
                            dns_table[DnsTableKeys.IP.name] = {ip: rdata}
    
                else:
                    # PTR record contains generic RDATA
    
                    ptr = rdata
                    if DnsTableKeys.ALIAS.name in dns_table:
                        dns_table[DnsTableKeys.ALIAS.name][qname] = ptr
    
                        dns_table[DnsTableKeys.ALIAS.name] = {qname: ptr}
    
    
        ## Post-processing
        # Replace all cnames with aliases
        if DnsTableKeys.IP.name in dns_table and DnsTableKeys.ALIAS.name in dns_table:
            for ip, cname in dns_table[DnsTableKeys.IP.name].items():
                if cname in dns_table[DnsTableKeys.ALIAS.name]:
                    dns_table[DnsTableKeys.IP.name][ip] = dns_table[DnsTableKeys.ALIAS.name][cname]
    
    François De Keersmaeker's avatar
    François De Keersmaeker a validé
        return dns_table
    
    
    
    
    def read_dns_cache(
            host: str = "127.0.0.1",
            file: str = None
        ) -> dict:
        """
        Read the Unbound DNS cache and return it as a dictionary,
        in the format:
            {
                DnsTableKeys.IP: {
                    ip_address: domain_name,
                    ...
                },
                DnsTableKeys.ALIAS: {
                    canonical_name: alias,
                    ...
                }
            }
    
        Args:
            host (str): IP address of the Unbound DNS server. Default is localhost.
            file (str): Path to a file containing the Unbound DNS cache. Default is None.
                        If specified, the function reads the cache from the file instead of the server.
        Returns:
            dict: Dictionary containing the DNS table read from the cache.
        """
        return update_dns_table({}, host, file)