Source code for internetnl_scan.utils

# -*- coding: utf-8 -*-
"""
Utilities for the internetnl tool
"""
import getpass
import logging
import ssl
import sys
from pathlib import Path
from urllib.parse import urlparse

import keyring
import pandas as pd
import requests

try:
    import requests_kerberos_proxy
except ImportError:
    requests_kerberos_proxy = None
else:
    try:
        from requests_kerberos_proxy.util import get_session
    except ImportError as err:
        raise ImportError(
            "Module 'request_kerberos_proxy' was found but 'get_session' could not be imported"
        )
from requests.auth import HTTPBasicAuth
from tldextract import tldextract
from tqdm import tqdm

_logger = logging.getLogger("internetnl-scan")


[docs] class Credentials(object): """stores the user credentials in a key ring""" def __init__(self, service_name="Internet.nl"): self.service_name = service_name self.username = None self.password = None self.http_auth = None self._credentials = None self.get_credentials()
[docs] def get_credentials(self): """Get the user credentials, either via cli, or via keyring""" self._credentials = keyring.get_credential(self.service_name, None) if self._credentials is None: _logger.debug("Get credentials from cli") self.username = input("Username: ") self.password = getpass.getpass() keyring.set_password( service_name=self.service_name, username=self.username, password=self.password, ) else: _logger.debug("Get credentials from keyring") self.username = self._credentials.username self.password = self._credentials.password self.http_auth = HTTPBasicAuth(self.username, self.password)
[docs] def reset_credentials(self): """in case of login failure: reset the stored credentials""" keyring.delete_password(service_name=self.service_name, username=self.username)
[docs] def response_to_dataframe(response): """ Convert the Internet.nl response to pandas dataframe Args: response: the returned response ot the Internet.nl API Returns: Pandas dataframe """ result = response.json() all_scans = result["requests"] all_scans = [pd.DataFrame.from_dict(scan, orient="index").T for scan in all_scans] scans_df = pd.concat(all_scans).reset_index().drop("index", axis=1) return scans_df
def _flatten_dict(current_key, current_value, new_dict): """ Given the current key and value of a dict, set the value as a string or as a dict and create a new key based on the current key and dict key Args: current_key (str): the current key string current_value (str): the current key value new_dict (dict): a new dictionary with the new keys which is modified in place """ if isinstance(current_value, dict): for key, value in current_value.items(): new_key = "_".join([current_key, key]) _flatten_dict(new_key, value, new_dict) else: new_dict[current_key] = current_value # noinspection GrazieInspection
[docs] def scan_result_to_dataframes(domains): """ Convert a dict internet.nl scans to a flat dictionary with on entry per result type Args: domains: dict keys are the urls, values are the nested json results Returns: dict with four tables """ tables = dict() _logger.info("Converting the results to a dataframe") for domain, properties in tqdm(domains.items()): for table_key, table_prop in properties.items(): if table_key not in tables.keys(): tables[table_key] = dict() if isinstance(table_prop, dict): new_dict = dict() for prop_key, prop_val in table_prop.items(): _flatten_dict(prop_key, prop_val, new_dict) tables[table_key][domain] = new_dict else: tables[table_key][domain] = table_prop # convert the dictionaries to a pandas data frames for table_key, table_prop in tables.items(): tables[table_key] = pd.DataFrame.from_dict(table_prop, orient="index") return tables
[docs] def make_cache_file_name(directory, scan_id, scan_type): """build the cache file name""" cache_file_name = f"{scan_id}_{scan_type}.pkl" return directory / Path(cache_file_name)
[docs] def query_yes_no(question, default_answer="no"): """Ask a yes/no question via raw_input() and return their answer. Parameters ---------- question : str A question to ask the user default_answer : str, optional A default answer that is given when only return is hit. Default to 'no' Returns ------- str: "yes" or "no", depending on the input of the user """ valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"} if not default_answer: prompt = " [y/n] " elif default_answer == "yes": prompt = " [Y/n] " elif default_answer == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default_answer) while 1: # sys.stdout.write(question + prompt) _logger.warning(question + prompt) choice = input().lower() if default_answer is not None and choice == "": return default_answer elif choice in list(valid.keys()): return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
[docs] def clean_list_of_urls(urls_to_scan: list): """cleans up the urls in a list""" new_url_list = list() for url in urls_to_scan: clean_url, suffix = get_clean_url(url) if clean_url is not None and clean_url not in new_url_list: new_url_list.append(clean_url) return new_url_list
[docs] def remove_sub_domain(url: str) -> str: """remove www or any other subdomain from the url""" if requests_kerberos_proxy is None: session = requests.Session() else: session = get_session() tld = tldextract.extract(url, session=session) domain_and_suffix = ".".join([tld.domain, tld.suffix]) return domain_and_suffix
[docs] def remove_sub_domains(urls_to_scan: list) -> list: """remove www or any other subdomain from the url""" new_url_list = list() for url in urls_to_scan: domain_and_suffix = remove_sub_domain(url) new_url_list.append(domain_and_suffix) return new_url_list
[docs] def get_clean_url(url, cache_dir=None): """ Turns an url into a clean url and adds it Args: url (str): url to clean cache_dir (str): directory name in case the tld cached data needs to be read Returns: str, str: cleaned url, the suffix """ clean_url = url suffix = None if cache_dir is not None: extract = tldextract.TLDExtract(cache_dir=cache_dir) session = None else: extract = tldextract.extract if requests_kerberos_proxy is None: session = requests.Session() else: session = get_session() try: url = url.strip() except AttributeError: pass else: try: tld = extract(url, session=session) except TypeError as type_err: _logger.debug(f"{type_err}Type error occurred for {url}") except ssl.SSLEOFError as ssl_err: _logger.debug(f"{ssl_err}\nSSLEOF error occurred for {url}") except requests.exceptions.SSLError as req_err: _logger.debug(f"{req_err}\nSSLError error occurred for {url}") else: if tld.subdomain == "" and tld.domain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "" and tld.domain == "": clean_url = None elif tld.domain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "": clean_url = ".".join([tld.domain, tld.suffix]) elif tld.suffix == "": clean_url = ".".join([tld.subdomain, tld.domain]) elif tld.domain == "": clean_url = ".".join([tld.subdomain, tld.suffix]) else: clean_url = ".".join([tld.subdomain, tld.domain, tld.suffix]) if clean_url is not None: if " " in clean_url: _logger.debug( f"{clean_url} cannot be real url with space. skipping" ) clean_url = None else: # We hebben een url gevonden. Maak hem met kleine letters en sla de suffix op clean_url = clean_url.lower() suffix = tld.suffix.lower() return clean_url, suffix
[docs] def validate_url(url_to_check: str) -> bool: """ Test if a string is a valid url Args: url_to_check (str): Url to check if it is a valid url Returns: bool: True if url is valid """ try: result = urlparse(url_to_check) except AttributeError: return False else: return result
[docs] def get_urls_from_domain_file( domain_file: str | Path, url_column_key: str = None, sep: str = ",", column_number: int = 0, ) -> list: """ Get urls from a file name Args: domain_file (str): the file name to be read url_column_key (str, optional): The name of the column containing the url values. Defaults to None, meaning that the file does not have a header sep (str, optional): The separator of the file column_number (int, optional): The column number to read in case no header is given Returns: list: list of cleaned url's """ _logger.info(f"Reading urls from {domain_file}") if url_column_key is not None: # if a key name is given, use that column urls_df = pd.read_csv(domain_file, sep=sep) # remove the white spaces from the column names urls_df.columns = [col.strip() for col in urls_df.columns] dirty_urls = urls_df[url_column_key].to_list() else: # read the file including the header and pick the first column urls_df = pd.read_csv(domain_file, sep=sep, header=None) dirty_urls = urls_df[column_number].to_list() # remove leading white spaces and None line's urls = [] for url_to_clean in dirty_urls: try: clean_url = url_to_clean.strip() except AttributeError: # remove all empty and non-valid URL's _logger.debug(f"Skipping empty url") else: if validate_url(clean_url): urls.append(clean_url) else: _logger.debug(f"Skipping invalid url {clean_url}") return urls