Unverified Commit d447e500 authored by Ben Busby's avatar Ben Busby
Browse files

Improve naming of *_utils files, update fn/class doc

The app/utils/*_utils weren't named very well, and all have been updated
to have more accurate names.

Function and class documention for the utils have been updated as well,
as part of the effort to improve overall documentation for the project.
parent 855b4e85
from app.request import send_tor_signal
from app.utils.session_utils import generate_user_keys
from app.utils.gen_ddg_bangs import gen_bangs_json
from app.utils.session import generate_user_keys
from app.utils.bangs import gen_bangs_json
from flask import Flask
from flask_session import Session
import json
from app.request import VALID_PARAMS
from app.utils.filter_utils import *
from app.utils.results import *
from bs4.element import ResultSet
from cryptography.fernet import Fernet
import re
......@@ -208,7 +208,7 @@ class Filter:
# Add no-js option
if self.nojs:
link['href'] = href
......@@ -23,8 +23,8 @@ class TorError(Exception):
"""Exception raised for errors in Tor requests.
message -- a message describing the error that occurred
disable -- optionally disables Tor in the user config (note:
message: a message describing the error that occurred
disable: optionally disables Tor in the user config (note:
this should only happen if the connection has been dropped
......@@ -133,9 +133,9 @@ class Request:
search suggestions, and loading of external content (images, audio, etc).
normal_ua -- the user's current user agent
root_path -- the root path of the whoogle instance
config -- the user's current whoogle configuration
normal_ua: the user's current user agent
root_path: the root path of the whoogle instance
config: the user's current whoogle configuration
def __init__(self, normal_ua, root_path, config: Config):
......@@ -16,8 +16,9 @@ from requests import exceptions
from app import app
from app.models.config import Config
from app.request import Request, TorError
from app.utils.session_utils import valid_user_session
from app.utils.routing_utils import *
from app.utils.bangs import resolve_bang
from app.utils.session import valid_user_session
from app.utils.search import *
# Load DDG bang json files only on init
bang_json = json.load(open(app.config['BANG_FILE']))
......@@ -199,13 +200,13 @@ def search():
# Update user config if specified in search args
g.user_config = g.user_config.from_params(g.request_params)
search_util = RoutingUtils(request, g.user_config, session,
search_util = Search(request, g.user_config, session,
query = search_util.new_search_query()
resolved_bangs = search_util.bang_operator(bang_json)
if resolved_bangs != '':
return redirect(resolved_bangs)
bang = resolve_bang(query=query, bangs_dict=bang_json)
if bang != '':
return redirect(bang)
# Redirect to home if invalid/blank search
if not query:
import json
import requests
DDG_BANGS = 'https://duckduckgo.com/bang.v255.js'
def gen_bangs_json(bangs_file):
# Request list
def gen_bangs_json(bangs_file: str) -> None:
"""Generates a json file from the DDG bangs list
bangs_file: The str path to the new DDG bangs json file
r = requests.get('https://duckduckgo.com/bang.v255.js')
# Request full list from DDG
r = requests.get(DDG_BANGS)
except requests.exceptions.HTTPError as err:
raise SystemExit(err)
......@@ -24,3 +35,27 @@ def gen_bangs_json(bangs_file):
json.dump(bangs_data, open(bangs_file, 'w'))
def resolve_bang(query: str, bangs_dict: dict) -> str:
"""Transform's a user's query to a bang search, if an operator is found
query: The search query
bangs_dict: The dict of available bang operators, with corresponding
format string search URLs
(i.e. "!w": "https://en.wikipedia.org...?search={}")
str: A formatted redirect for a bang search, or an empty str if there
wasn't a match or didn't contain a bang operator
split_query = query.split(' ')
for operator in bangs_dict.keys():
if operator not in split_query:
return bangs_dict[operator]['url'].format(
query.replace(operator, '').strip())
return ''
......@@ -28,12 +28,30 @@ SITE_ALTS = {
def has_ad_content(element: str):
def has_ad_content(element: str) -> bool:
"""Inspects an HTML element for ad related content
element: The HTML element to inspect
bool: True/False for the element containing an ad
return element.upper() in (value.upper() for value in BLACKLIST) \
or 'ⓘ' in element
def get_first_link(soup):
def get_first_link(soup: BeautifulSoup) -> str:
"""Retrieves the first result link from the query response
soup: The BeautifulSoup response body
str: A str link to the first result
# Replace hrefs with only the intended destination (no "utm" type tags)
for a in soup.find_all('a', href=True):
# Return the first search result URL
......@@ -41,7 +59,16 @@ def get_first_link(soup):
return filter_link_args(a['href'])
def get_site_alt(link: str):
def get_site_alt(link: str) -> str:
"""Returns an alternative to a particular site, if one is configured
link: A string result URL to check against the SITE_ALTS map
str: An updated (or ignored) result link
for site_key in SITE_ALTS.keys():
if site_key not in link:
......@@ -55,13 +82,22 @@ def get_site_alt(link: str):
return link
def filter_link_args(query_link):
parsed_link = urlparse.urlparse(query_link)
def filter_link_args(link: str) -> str:
"""Filters out unnecessary URL args from a result link
link: The string result link to check for extraneous URL params
str: An updated (or ignored) result link
parsed_link = urlparse.urlparse(link)
link_args = parse_qs(parsed_link.query)
safe_args = {}
if len(link_args) == 0 and len(parsed_link) > 0:
return query_link
return link
for arg in link_args.keys():
if arg in SKIP_ARGS:
......@@ -70,19 +106,28 @@ def filter_link_args(query_link):
safe_args[arg] = link_args[arg]
# Remove original link query and replace with filtered args
query_link = query_link.replace(parsed_link.query, '')
link = link.replace(parsed_link.query, '')
if len(safe_args) > 0:
query_link = query_link + urlparse.urlencode(safe_args, doseq=True)
link = link + urlparse.urlencode(safe_args, doseq=True)
query_link = query_link.replace('?', '')
link = link.replace('?', '')
return link
def append_nojs(result: BeautifulSoup) -> None:
"""Appends a no-Javascript alternative for a search result
return query_link
result: The search result to append a no-JS link to
def gen_nojs(sibling):
nojs_link = BeautifulSoup(features='html.parser').new_tag('a')
nojs_link['href'] = '/window?location=' + sibling['href']
nojs_link['href'] = '/window?location=' + result['href']
nojs_link['style'] = 'display:block;width:100%;'
nojs_link.string = 'NoJS Link: ' + nojs_link['href']
sibling.append(BeautifulSoup('<br><hr><br>', 'html.parser'))
result.append(BeautifulSoup('<br><hr><br>', 'html.parser'))
from app.filter import Filter, get_first_link
from app.utils.session_utils import generate_user_keys
from app.utils.session import generate_user_keys
from app.request import gen_query
from bs4 import BeautifulSoup as bsoup
from cryptography.fernet import Fernet, InvalidToken
......@@ -11,6 +11,18 @@ TOR_BANNER = '<hr><h1 style="text-align: center">You are using Tor</h1><hr>'
def needs_https(url: str) -> bool:
"""Checks if the current instance needs to be upgraded to HTTPS
Note that all Heroku instances are available by default over HTTPS, but
do not automatically set up a redirect when visited over HTTP.
url: The instance url
bool: True/False representing the need to upgrade
https_only = os.getenv('HTTPS_ONLY', False)
is_heroku = url.endswith('.herokuapp.com')
is_http = url.startswith('http://')
......@@ -18,7 +30,15 @@ def needs_https(url: str) -> bool:
return (is_heroku and is_http) or (https_only and is_http)
class RoutingUtils:
class Search:
"""Search query preprocessor - used before submitting the query or
redirecting to another site
request: the incoming flask request
config: the current user config settings
session: the flask user session
def __init__(self, request, config, session, cookies_disabled=False):
method = request.method
self.request_params = request.args if method == 'GET' else request.form
......@@ -31,19 +51,28 @@ class RoutingUtils:
self.search_type = self.request_params.get(
'tbm') if 'tbm' in self.request_params else ''
def __getitem__(self, name):
def __getitem__(self, name) -> Any:
return getattr(self, name)
def __setitem__(self, name, value):
def __setitem__(self, name, value) -> None:
return setattr(self, name, value)
def __delitem__(self, name):
def __delitem__(self, name) -> None:
return delattr(self, name)
def __contains__(self, name):
def __contains__(self, name) -> bool:
return hasattr(self, name)
def new_search_query(self) -> str:
"""Parses a plaintext query into a valid string for submission
Also decrypts the query string, if encrypted (in the case of
paginated results).
str: A valid query string
# Generate a new element key each time a new search is performed
self.session['fernet_keys']['element_key'] = generate_user_keys(
......@@ -70,17 +99,18 @@ class RoutingUtils:
self.query = q[2:] if self.feeling_lucky else q
return self.query
def bang_operator(self, bangs_dict: dict) -> str:
split_query = self.query.split(' ')
for operator in bangs_dict.keys():
if operator not in split_query:
def generate_response(self) -> Tuple[Any, int]:
"""Generates a response for the user's query
return bangs_dict[operator]['url'].format(
self.query.replace(operator, '').strip())
return ''
Tuple[Any, int]: A tuple in the format (response, # of elements)
For example, in the case of a "feeling lucky"
search, the response is a result URL, with no
encrypted elements to account for. Otherwise, the
response is a BeautifulSoup response body, with
N encrypted elements to track before key regen.
def generate_response(self) -> Tuple[Any, int]:
mobile = 'Android' in self.user_agent or 'iPhone' in self.user_agent
content_filter = Filter(
......@@ -102,7 +132,7 @@ class RoutingUtils:
if g.user_request.tor_valid else bsoup('', 'html.parser'))
if self.feeling_lucky:
return get_first_link(html_soup), 1
return get_first_link(html_soup), 0
formatted_results = content_filter.clean(html_soup)
......@@ -5,6 +5,17 @@ REQUIRED_SESSION_VALUES = ['uuid', 'config', 'fernet_keys']
def generate_user_keys(cookies_disabled=False) -> dict:
"""Generates a set of user keys
cookies_disabled: Flag for whether or not cookies are disabled by the
user. If so, the user can only use the default key
set generated on app init for queries.
dict: A new Fernet key set
if cookies_disabled:
return app.default_key_set
......@@ -15,7 +26,17 @@ def generate_user_keys(cookies_disabled=False) -> dict:
def valid_user_session(session):
def valid_user_session(session: dict) -> bool:
"""Validates the current user session
session: The current Flask user session
bool: True/False indicating that all required session values are
# Generate secret key for user if unavailable
if value not in session:
from app import app
from app.utils.session_utils import generate_user_keys
from app.utils.session import generate_user_keys
import pytest
import random
from app.utils.session_utils import generate_user_keys, valid_user_session
from app.utils.session import generate_user_keys, valid_user_session
def test_generate_user_keys():
from bs4 import BeautifulSoup
from app.filter import Filter
from app.utils.session_utils import generate_user_keys
from app.utils.session import generate_user_keys
from datetime import datetime
from dateutil.parser import *
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment