Source code for internetnl_scan.utils

import getpass
import logging
import ssl
import sys
from pathlib import Path

from urllib.parse import urlparse

import keyring
import pandas as pd
import requests
from urllib3.util import url

try:
    import requests_kerberos_proxy
except ImportError:
    requests_kerberos_proxy = None
else:
    try:
        from requests_kerberos_proxy.util import get_session
    except ImportError as err:
        raise ImportError(
            "Module 'request_kerberos_proxy' was found but 'get_session' could not be imported"
        )
from requests.auth import HTTPBasicAuth
from tldextract import tldextract
from tqdm import tqdm

_logger = logging.getLogger("internetnl-scan")


[docs] class Credentials(object): """stores the user credentials in a key ring""" def __init__(self, service_name="Internet.nl"): self.service_name = service_name self.username = None self.password = None self.http_auth = None self._credentials = None self.get_credentials()
[docs] def get_credentials(self): """Get the user credentials, either via cli, or via keyring""" self._credentials = keyring.get_credential(self.service_name, None) if self._credentials is None: _logger.debug("Get credentials from cli") self.username = input("Username: ") self.password = getpass.getpass() keyring.set_password( service_name=self.service_name, username=self.username, password=self.password, ) else: _logger.debug("Get credentials from keyring") self.username = self._credentials.username self.password = self._credentials.password self.http_auth = HTTPBasicAuth(self.username, self.password)
[docs] def reset_credentials(self): """in case of login failure: reset the stored credentials""" keyring.delete_password(service_name=self.service_name, username=self.username)
[docs] def response_to_dataframe(response): """ Convert the Internet.nl response to pandas dataframe Args: response: the returned response ot the Internet.nl API Returns: Pandas dataframe """ result = response.json() all_scans = result["requests"] all_scans = [pd.DataFrame.from_dict(scan, orient="index").T for scan in all_scans] scans_df = pd.concat(all_scans).reset_index().drop("index", axis=1) return scans_df
def _flatten_dict(current_key, current_value, new_dict): """ Given the current key and value of a dict, set the value as a string or as a dict and create a new key based on the current key and dict key Args: current_key (str): the current key string current_value (str): the current key value new_dict (dict): a new dictionary with the new keys which is modified in place """ if isinstance(current_value, dict): for key, value in current_value.items(): new_key = "_".join([current_key, key]) _flatten_dict(new_key, value, new_dict) else: new_dict[current_key] = current_value # noinspection GrazieInspection
[docs] def scan_result_to_dataframes(domains): """ Convert a dict internet.nl scans to a flat dictionary with on entry per result type Args: domains: dict keys are the urls, values are the nested json results Returns: dict with four tables """ tables = dict() _logger.info("Converting the results to a dataframe") for domain, properties in tqdm(domains.items()): for table_key, table_prop in properties.items(): if table_key not in tables.keys(): tables[table_key] = dict() if isinstance(table_prop, dict): new_dict = dict() for prop_key, prop_val in table_prop.items(): _flatten_dict(prop_key, prop_val, new_dict) tables[table_key][domain] = new_dict else: tables[table_key][domain] = table_prop # convert the dictionaries to a pandas data frames for table_key, table_prop in tables.items(): tables[table_key] = pd.DataFrame.from_dict(table_prop, orient="index") return tables
[docs] def make_cache_file_name(directory, scan_id, scan_type): """build the cache file name""" cache_file_name = f"{scan_id}_{scan_type}.pkl" return directory / Path(cache_file_name)
[docs] def query_yes_no(question, default_answer="no"): """Ask a yes/no question via raw_input() and return their answer. Parameters ---------- question : str A question to ask the user default_answer : str, optional A default answer that is given when only return is hit. Default to 'no' Returns ------- str: "yes" or "no", depending on the input of the user """ valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"} if not default_answer: prompt = " [y/n] " elif default_answer == "yes": prompt = " [Y/n] " elif default_answer == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default_answer) while 1: # sys.stdout.write(question + prompt) _logger.warning(question + prompt) choice = input().lower() if default_answer is not None and choice == "": return default_answer elif choice in list(valid.keys()): return valid[choice] else: sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
[docs] def convert_url_list(urls_to_scan: list, scan_type="web"): """cleans up the urls in a list""" new_url_list = list() for url in urls_to_scan: clean_url, suffix = get_clean_url(url) if clean_url is not None and clean_url not in new_url_list: new_url_list.append(clean_url) return new_url_list
[docs] def remove_sub_domain(url: str) -> str: """remove www or any other subdomain from the url""" if requests_kerberos_proxy is None: session = requests.Session() else: session = get_session() tld = tldextract.extract(url, session=session) domain_and_suffix = ".".join([tld.domain, tld.suffix]) return domain_and_suffix
[docs] def remove_sub_domains(urls_to_scan: list) -> list: """remove www or any other subdomain from the url""" new_url_list = list() for url in urls_to_scan: domain_and_suffix = remove_sub_domain(url) new_url_list.append(domain_and_suffix) return new_url_list
[docs] def get_clean_url(url, cache_dir=None): """ Turns an url into a clean url and adds it Args: url (str): url to clean cache_dir (str): directory name in case the tld cached data needs to be read Returns: str, str: cleaned url, the suffix """ clean_url = url suffix = None if cache_dir is not None: extract = tldextract.TLDExtract(cache_dir=cache_dir) session = None else: extract = tldextract.extract if requests_kerberos_proxy is None: session = requests.Session() else: session = get_session() try: url = url.strip() except AttributeError: pass else: try: tld = extract(url, session=session) except TypeError: _logger.debug(f"Type error occurred for {url}") except ssl.SSLEOFError as ssl_err: _logger.debug(f"SSLEOF error occurred for {url}") except requests.exceptions.SSLError as req_err: _logger.debug(f"SSLError error occurred for {url}") else: if tld.subdomain == "" and tld.domain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "" and tld.domain == "": clean_url = None elif tld.domain == "" and tld.suffix == "": clean_url = None elif tld.subdomain == "": clean_url = ".".join([tld.domain, tld.suffix]) elif tld.suffix == "": clean_url = ".".join([tld.subdomain, tld.domain]) elif tld.domain == "": clean_url = ".".join([tld.subdomain, tld.suffix]) else: clean_url = ".".join([tld.subdomain, tld.domain, tld.suffix]) if clean_url is not None: if " " in clean_url: _logger.debug( f"{clean_url} cannot be real url with space. skipping" ) clean_url = None else: # We hebben een url gevonden. Maak hem met kleine letters en sla de suffix op clean_url = clean_url.lower() suffix = tld.suffix.lower() return clean_url, suffix
[docs] def validate_url(url_to_check: str) -> bool: """ Test if a string is a valid url Args: url_to_check (str): Url to check if it is a valid url Returns: bool: True if url is valid """ try: result = urlparse(url_to_check) except AttributeError: return False else: return True
[docs] def get_urls_from_domain_file( domain_file: str, url_column_key: str = None, sep: str = ",", column_number: int = 0, ) -> list: """ Get urls from a file name Args: domain_file (str): the file name to be read url_column_key (str, optional): The name of the column containing the url values. Defaults to None, meaning that the file does not have a header sep (str, optional): The separator of the file column_number (int, optional): The column number to read in case no header is given Returns: list: list of cleaned url's """ _logger.info(f"Reading urls from {domain_file}") if url_column_key is not None: # if a key name is given, use that column urls_df = pd.read_csv(domain_file, sep=sep) # remove the white spaces from the column names urls_df.columns = [col.strip() for col in urls_df.columns] dirty_urls = urls_df[url_column_key].to_list() else: # read the file including the header and pick the first column urls_df = pd.read_csv(domain_file, sep=sep, header=None) dirty_urls = urls_df[column_number].to_list() # remove leading white spaces and None line's urls = [] for url_to_clean in dirty_urls: try: clean_url = url_to_clean.strip() except AttributeError: # remove all empty and non-valid URL's _logger.debug(f"Skipping empty url {clean_url}") else: if validate_url(clean_url): urls.append(clean_url) else: _logger.debug(f"Skipping invalid url {clean_url}") return urls