# -*- coding: utf-8 -*-
"""
Utilities for the internetnl tool
"""
import getpass
import logging
import ssl
import sys
from pathlib import Path
from urllib.parse import urlparse
import keyring
import pandas as pd
import requests
try:
import requests_kerberos_proxy
except ImportError:
requests_kerberos_proxy = None
else:
try:
from requests_kerberos_proxy.util import get_session
except ImportError as err:
raise ImportError(
"Module 'request_kerberos_proxy' was found but 'get_session' could not be imported"
)
from requests.auth import HTTPBasicAuth
from tldextract import tldextract
from tqdm import tqdm
_logger = logging.getLogger("internetnl-scan")
[docs]
class Credentials(object):
"""stores the user credentials in a key ring"""
def __init__(self, service_name="Internet.nl"):
self.service_name = service_name
self.username = None
self.password = None
self.http_auth = None
self._credentials = None
self.get_credentials()
[docs]
def get_credentials(self):
"""Get the user credentials, either via cli, or via keyring"""
self._credentials = keyring.get_credential(self.service_name, None)
if self._credentials is None:
_logger.debug("Get credentials from cli")
self.username = input("Username: ")
self.password = getpass.getpass()
keyring.set_password(
service_name=self.service_name,
username=self.username,
password=self.password,
)
else:
_logger.debug("Get credentials from keyring")
self.username = self._credentials.username
self.password = self._credentials.password
self.http_auth = HTTPBasicAuth(self.username, self.password)
[docs]
def reset_credentials(self):
"""in case of login failure: reset the stored credentials"""
keyring.delete_password(service_name=self.service_name, username=self.username)
[docs]
def response_to_dataframe(response):
"""
Convert the Internet.nl response to pandas dataframe
Args:
response: the returned response ot the Internet.nl API
Returns:
Pandas dataframe
"""
result = response.json()
all_scans = result["requests"]
all_scans = [pd.DataFrame.from_dict(scan, orient="index").T for scan in all_scans]
scans_df = pd.concat(all_scans).reset_index().drop("index", axis=1)
return scans_df
def _flatten_dict(current_key, current_value, new_dict):
"""
Given the current key and value of a dict, set the value as a string or as a dict and create a new key based on
the current key and dict key
Args:
current_key (str): the current key string
current_value (str): the current key value
new_dict (dict): a new dictionary with the new keys which is modified in place
"""
if isinstance(current_value, dict):
for key, value in current_value.items():
new_key = "_".join([current_key, key])
_flatten_dict(new_key, value, new_dict)
else:
new_dict[current_key] = current_value
# noinspection GrazieInspection
[docs]
def scan_result_to_dataframes(domains):
"""
Convert a dict internet.nl scans to a flat dictionary with on entry per result type
Args:
domains: dict
keys are the urls, values are the nested json results
Returns:
dict with four tables
"""
tables = dict()
_logger.info("Converting the results to a dataframe")
for domain, properties in tqdm(domains.items()):
for table_key, table_prop in properties.items():
if table_key not in tables.keys():
tables[table_key] = dict()
if isinstance(table_prop, dict):
new_dict = dict()
for prop_key, prop_val in table_prop.items():
_flatten_dict(prop_key, prop_val, new_dict)
tables[table_key][domain] = new_dict
else:
tables[table_key][domain] = table_prop
# convert the dictionaries to a pandas data frames
for table_key, table_prop in tables.items():
tables[table_key] = pd.DataFrame.from_dict(table_prop, orient="index")
return tables
[docs]
def make_cache_file_name(directory, scan_id, scan_type):
"""build the cache file name"""
cache_file_name = f"{scan_id}_{scan_type}.pkl"
return directory / Path(cache_file_name)
[docs]
def query_yes_no(question, default_answer="no"):
"""Ask a yes/no question via raw_input() and return their answer.
Parameters
----------
question : str
A question to ask the user
default_answer : str, optional
A default answer that is given when only return is hit. Default to 'no'
Returns
-------
str:
"yes" or "no", depending on the input of the user
"""
valid = {"yes": "yes", "y": "yes", "ye": "yes", "no": "no", "n": "no"}
if not default_answer:
prompt = " [y/n] "
elif default_answer == "yes":
prompt = " [Y/n] "
elif default_answer == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default_answer)
while 1:
# sys.stdout.write(question + prompt)
_logger.warning(question + prompt)
choice = input().lower()
if default_answer is not None and choice == "":
return default_answer
elif choice in list(valid.keys()):
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' " "(or 'y' or 'n').\n")
[docs]
def clean_list_of_urls(urls_to_scan: list):
"""cleans up the urls in a list"""
new_url_list = list()
for url in urls_to_scan:
clean_url, suffix = get_clean_url(url)
if clean_url is not None and clean_url not in new_url_list:
new_url_list.append(clean_url)
return new_url_list
[docs]
def remove_sub_domain(url: str) -> str:
"""remove www or any other subdomain from the url"""
if requests_kerberos_proxy is None:
session = requests.Session()
else:
session = get_session()
tld = tldextract.extract(url, session=session)
domain_and_suffix = ".".join([tld.domain, tld.suffix])
return domain_and_suffix
[docs]
def remove_sub_domains(urls_to_scan: list) -> list:
"""remove www or any other subdomain from the url"""
new_url_list = list()
for url in urls_to_scan:
domain_and_suffix = remove_sub_domain(url)
new_url_list.append(domain_and_suffix)
return new_url_list
[docs]
def get_clean_url(url, cache_dir=None):
"""
Turns an url into a clean url and adds it
Args:
url (str): url to clean
cache_dir (str): directory name in case the tld cached data needs to be read
Returns:
str, str: cleaned url, the suffix
"""
clean_url = url
suffix = None
if cache_dir is not None:
extract = tldextract.TLDExtract(cache_dir=cache_dir)
session = None
else:
extract = tldextract.extract
if requests_kerberos_proxy is None:
session = requests.Session()
else:
session = get_session()
try:
url = url.strip()
except AttributeError:
pass
else:
try:
tld = extract(url, session=session)
except TypeError as type_err:
_logger.debug(f"{type_err}Type error occurred for {url}")
except ssl.SSLEOFError as ssl_err:
_logger.debug(f"{ssl_err}\nSSLEOF error occurred for {url}")
except requests.exceptions.SSLError as req_err:
_logger.debug(f"{req_err}\nSSLError error occurred for {url}")
else:
if tld.subdomain == "" and tld.domain == "" and tld.suffix == "":
clean_url = None
elif tld.subdomain == "" and tld.suffix == "":
clean_url = None
elif tld.subdomain == "" and tld.domain == "":
clean_url = None
elif tld.domain == "" and tld.suffix == "":
clean_url = None
elif tld.subdomain == "":
clean_url = ".".join([tld.domain, tld.suffix])
elif tld.suffix == "":
clean_url = ".".join([tld.subdomain, tld.domain])
elif tld.domain == "":
clean_url = ".".join([tld.subdomain, tld.suffix])
else:
clean_url = ".".join([tld.subdomain, tld.domain, tld.suffix])
if clean_url is not None:
if " " in clean_url:
_logger.debug(
f"{clean_url} cannot be real url with space. skipping"
)
clean_url = None
else:
# We hebben een url gevonden. Maak hem met kleine letters en sla de suffix op
clean_url = clean_url.lower()
suffix = tld.suffix.lower()
return clean_url, suffix
[docs]
def validate_url(url_to_check: str) -> bool:
"""
Test if a string is a valid url
Args:
url_to_check (str): Url to check if it is a valid url
Returns:
bool: True if url is valid
"""
try:
result = urlparse(url_to_check)
except AttributeError:
return False
else:
return result
[docs]
def get_urls_from_domain_file(
domain_file: str | Path,
url_column_key: str = None,
sep: str = ",",
column_number: int = 0,
) -> list:
"""
Get urls from a file name
Args:
domain_file (str): the file name to be read
url_column_key (str, optional): The name of the column containing the url values. Defaults to None, meaning
that the file does not have a header
sep (str, optional): The separator of the file
column_number (int, optional): The column number to read in case no header is given
Returns:
list: list of cleaned url's
"""
_logger.info(f"Reading urls from {domain_file}")
if url_column_key is not None:
# if a key name is given, use that column
urls_df = pd.read_csv(domain_file, sep=sep)
# remove the white spaces from the column names
urls_df.columns = [col.strip() for col in urls_df.columns]
dirty_urls = urls_df[url_column_key].to_list()
else:
# read the file including the header and pick the first column
urls_df = pd.read_csv(domain_file, sep=sep, header=None)
dirty_urls = urls_df[column_number].to_list()
# remove leading white spaces and None line's
urls = []
for url_to_clean in dirty_urls:
try:
clean_url = url_to_clean.strip()
except AttributeError:
# remove all empty and non-valid URL's
_logger.debug(f"Skipping empty url")
else:
if validate_url(clean_url):
urls.append(clean_url)
else:
_logger.debug(f"Skipping invalid url {clean_url}")
return urls