286 lines
9.6 KiB
Python
286 lines
9.6 KiB
Python
# A simple proxy server
|
|
# by Humans for All
|
|
#
|
|
# Listens on the specified port (defaults to squids 3128)
|
|
# * if a url query is got wrt urlraw path
|
|
# http://localhost:3128/urlraw?url=http://site.of.interest/path/of/interest
|
|
# fetches the contents of the specified url and returns the same to the requester
|
|
# * if a url query is got wrt urltext path
|
|
# http://localhost:3128/urltext?url=http://site.of.interest/path/of/interest
|
|
# fetches the contents of the specified url and returns the same to the requester
|
|
# after removing html tags in general as well as contents of tags like style
|
|
# script, header, footer, nav ...
|
|
# * any request to aum path is used to respond with a predefined text response
|
|
# which can help identify this server, in a simple way.
|
|
#
|
|
# Expects a Bearer authorization line in the http header of the requests got.
|
|
# HOWEVER DO KEEP IN MIND THAT ITS A VERY INSECURE IMPLEMENTATION, AT BEST
|
|
#
|
|
|
|
|
|
import sys
|
|
import http.server
|
|
import urllib.parse
|
|
import time
|
|
import urlvalidator as uv
|
|
from typing import Callable
|
|
import pdfmagic as mPdf
|
|
import webmagic as mWeb
|
|
import debug as mDebug
|
|
|
|
|
|
gMe = {
|
|
'--port': 3128,
|
|
'--config': '/dev/null',
|
|
'--debug': False,
|
|
'bearer.transformed.year': "",
|
|
'server': None
|
|
}
|
|
|
|
gConfigType = {
|
|
'--port': 'int',
|
|
'--config': 'str',
|
|
'--debug': 'bool',
|
|
'--allowed.schemes': 'list',
|
|
'--allowed.domains': 'list',
|
|
'--bearer.insecure': 'str'
|
|
}
|
|
|
|
gConfigNeeded = [ '--allowed.schemes', '--allowed.domains', '--bearer.insecure' ]
|
|
|
|
gAllowedCalls = {
|
|
"urltext": [],
|
|
"urlraw": [],
|
|
"pdftext": [ "pypdf" ]
|
|
}
|
|
|
|
|
|
def bearer_transform():
|
|
"""
|
|
Transform the raw bearer token to the network handshaked token,
|
|
if and when needed.
|
|
"""
|
|
global gMe
|
|
year = str(time.gmtime().tm_year)
|
|
if gMe['bearer.transformed.year'] == year:
|
|
return
|
|
import hashlib
|
|
s256 = hashlib.sha256(year.encode('utf-8'))
|
|
s256.update(gMe['--bearer.insecure'].encode('utf-8'))
|
|
gMe['--bearer.transformed'] = s256.hexdigest()
|
|
gMe['bearer.transformed.year'] = year
|
|
|
|
|
|
class ProxyHandler(http.server.BaseHTTPRequestHandler):
|
|
"""
|
|
Implements the logic for handling requests sent to this server.
|
|
"""
|
|
|
|
def send_headers_common(self):
|
|
"""
|
|
Common headers to include in responses from this server
|
|
"""
|
|
self.send_header('Access-Control-Allow-Origin', '*')
|
|
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
|
|
self.send_header('Access-Control-Allow-Headers', '*')
|
|
self.end_headers()
|
|
|
|
def send_error(self, code: int, message: str | None = None, explain: str | None = None) -> None:
|
|
"""
|
|
Overrides the SendError helper
|
|
so that the common headers mentioned above can get added to them
|
|
else CORS failure will be triggered by the browser on fetch from browser.
|
|
"""
|
|
print(f"WARN:PH:SendError:{code}:{message}")
|
|
self.send_response(code, message)
|
|
self.send_headers_common()
|
|
|
|
def auth_check(self):
|
|
"""
|
|
Simple Bearer authorization
|
|
ALERT: For multiple reasons, this is a very insecure implementation.
|
|
"""
|
|
bearer_transform()
|
|
authline = self.headers['Authorization']
|
|
if authline == None:
|
|
return { 'AllOk': False, 'Msg': "No auth line" }
|
|
authlineA = authline.strip().split(' ')
|
|
if len(authlineA) != 2:
|
|
return { 'AllOk': False, 'Msg': "Invalid auth line" }
|
|
if authlineA[0] != 'Bearer':
|
|
return { 'AllOk': False, 'Msg': "Invalid auth type" }
|
|
if authlineA[1] != gMe['--bearer.transformed']:
|
|
return { 'AllOk': False, 'Msg': "Invalid auth" }
|
|
return { 'AllOk': True, 'Msg': "Auth Ok" }
|
|
|
|
def auth_and_run(self, pr:urllib.parse.ParseResult, handler:Callable[['ProxyHandler', urllib.parse.ParseResult], None]):
|
|
"""
|
|
If authorisation is ok for the request, run the specified handler.
|
|
"""
|
|
acGot = self.auth_check()
|
|
if not acGot['AllOk']:
|
|
self.send_error(400, f"WARN:{acGot['Msg']}")
|
|
else:
|
|
try:
|
|
handler(self, pr)
|
|
except Exception as e:
|
|
self.send_error(400, f"ERRR:ProxyHandler:{e}")
|
|
|
|
def do_GET(self):
|
|
"""
|
|
Handle GET requests
|
|
"""
|
|
print(f"\n\n\nDBUG:ProxyHandler:GET:{self.address_string()}:{self.path}")
|
|
print(f"DBUG:PH:Get:Headers:{self.headers}")
|
|
pr = urllib.parse.urlparse(self.path)
|
|
print(f"DBUG:ProxyHandler:GET:{pr}")
|
|
match pr.path:
|
|
case '/urlraw':
|
|
self.auth_and_run(pr, mWeb.handle_urlraw)
|
|
case '/urltext':
|
|
self.auth_and_run(pr, mWeb.handle_urltext)
|
|
case '/pdftext':
|
|
self.auth_and_run(pr, mPdf.handle_pdftext)
|
|
case '/aum':
|
|
handle_aum(self, pr)
|
|
case _:
|
|
print(f"WARN:ProxyHandler:GET:UnknownPath{pr.path}")
|
|
self.send_error(400, f"WARN:UnknownPath:{pr.path}")
|
|
|
|
def do_OPTIONS(self):
|
|
"""
|
|
Handle OPTIONS for CORS preflights (just in case from browser)
|
|
"""
|
|
print(f"DBUG:ProxyHandler:OPTIONS:{self.path}")
|
|
self.send_response(200)
|
|
self.send_headers_common()
|
|
|
|
|
|
def handle_aum(ph: ProxyHandler, pr: urllib.parse.ParseResult):
|
|
"""
|
|
Handle requests to aum path, which is used in a simple way to
|
|
verify that one is communicating with this proxy server
|
|
"""
|
|
import importlib
|
|
queryParams = urllib.parse.parse_qs(pr.query)
|
|
url = queryParams['url']
|
|
print(f"DBUG:HandleAUM:Url:{url}")
|
|
url = url[0]
|
|
if (not url) or (len(url) == 0):
|
|
ph.send_error(400, f"WARN:HandleAUM:MissingUrl/UnknownQuery?!")
|
|
return
|
|
urlParts = url.split('.',1)
|
|
if gAllowedCalls.get(urlParts[0], None) == None:
|
|
ph.send_error(403, f"WARN:HandleAUM:Forbidden:{urlParts[0]}")
|
|
return
|
|
for dep in gAllowedCalls[urlParts[0]]:
|
|
try:
|
|
importlib.import_module(dep)
|
|
except ImportError as exc:
|
|
ph.send_error(400, f"WARN:HandleAUM:{urlParts[0]}:Support module [{dep}] missing or has issues")
|
|
return
|
|
print(f"INFO:HandleAUM:Availability ok for:{urlParts[0]}")
|
|
ph.send_response_only(200, "bharatavarshe")
|
|
ph.send_header('Access-Control-Allow-Origin', '*')
|
|
ph.end_headers()
|
|
|
|
|
|
def load_config():
|
|
"""
|
|
Allow loading of a json based config file
|
|
|
|
The config entries should be named same as their equivalent cmdline argument
|
|
entries but without the -- prefix. They will be loaded into gMe after adding
|
|
-- prefix.
|
|
|
|
As far as the program is concerned the entries could either come from cmdline
|
|
or from a json based config file.
|
|
"""
|
|
global gMe
|
|
import json
|
|
with open(gMe['--config']) as f:
|
|
cfg = json.load(f)
|
|
for k in cfg:
|
|
print(f"DBUG:LoadConfig:{k}")
|
|
try:
|
|
cArg = f"--{k}"
|
|
aTypeCheck = gConfigType[cArg]
|
|
aValue = cfg[k]
|
|
aType = type(aValue).__name__
|
|
if aType != aTypeCheck:
|
|
print(f"ERRR:LoadConfig:{k}:expected type [{aTypeCheck}] got type [{aType}]")
|
|
exit(112)
|
|
gMe[cArg] = aValue
|
|
except KeyError:
|
|
print(f"ERRR:LoadConfig:{k}:UnknownCommand")
|
|
exit(113)
|
|
|
|
|
|
def process_args(args: list[str]):
|
|
"""
|
|
Helper to process command line arguments.
|
|
|
|
Flow setup below such that
|
|
* location of --config in commandline will decide whether command line or config file will get
|
|
priority wrt setting program parameters.
|
|
* str type values in cmdline are picked up directly, without running them through ast.literal_eval,
|
|
bcas otherwise one will have to ensure throught the cmdline arg mechanism that string quote is
|
|
retained for literal_eval
|
|
"""
|
|
import ast
|
|
global gMe
|
|
iArg = 1
|
|
while iArg < len(args):
|
|
cArg = args[iArg]
|
|
if (not cArg.startswith("--")):
|
|
print(f"ERRR:ProcessArgs:{iArg}:{cArg}:MalformedCommandOr???")
|
|
exit(101)
|
|
print(f"DBUG:ProcessArgs:{iArg}:{cArg}")
|
|
try:
|
|
aTypeCheck = gConfigType[cArg]
|
|
aValue = args[iArg+1]
|
|
if aTypeCheck != 'str':
|
|
aValue = ast.literal_eval(aValue)
|
|
aType = type(aValue).__name__
|
|
if aType != aTypeCheck:
|
|
print(f"ERRR:ProcessArgs:{iArg}:{cArg}:expected type [{aTypeCheck}] got type [{aType}]")
|
|
exit(102)
|
|
gMe[cArg] = aValue
|
|
iArg += 2
|
|
if cArg == '--config':
|
|
load_config()
|
|
except KeyError:
|
|
print(f"ERRR:ProcessArgs:{iArg}:{cArg}:UnknownCommand")
|
|
exit(103)
|
|
print(gMe)
|
|
for k in gConfigNeeded:
|
|
if gMe.get(k) == None:
|
|
print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...")
|
|
exit(104)
|
|
mDebug.setup(gMe['--debug'])
|
|
uv.validator_setup(gMe['--allowed.schemes'], gMe['--allowed.domains'])
|
|
|
|
|
|
|
|
def run():
|
|
try:
|
|
gMe['serverAddr'] = ('', gMe['--port'])
|
|
gMe['server'] = http.server.HTTPServer(gMe['serverAddr'], ProxyHandler)
|
|
print(f"INFO:Run:Starting on {gMe['serverAddr']}")
|
|
gMe['server'].serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("INFO:Run:Shuting down...")
|
|
if (gMe['server']):
|
|
gMe['server'].server_close()
|
|
sys.exit(0)
|
|
except Exception as exc:
|
|
print(f"ERRR:Run:Exiting:Exception:{exc}")
|
|
if (gMe['server']):
|
|
gMe['server'].server_close()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
process_args(sys.argv)
|
|
run()
|