SimpleChatTC:SimpleProxy: Use urlvalidator

Add --allowed.schemes config entry as a needed config.

Setup the url validator.

Use this wrt urltext, urlraw and pdf2text

This allows user to control whether local file access is enabled
or not. By default in the sample simpleproxy.json config file
local file access is allowed.
This commit is contained in:
hanishkvc 2025-11-02 14:15:07 +05:30
parent 6cab95657f
commit c597572e10
3 changed files with 31 additions and 39 deletions

View File

@ -1,4 +1,9 @@
{ {
"allowed.schemes": [
"file",
"http",
"https"
],
"allowed.domains": [ "allowed.domains": [
".*\\.wikipedia\\.org$", ".*\\.wikipedia\\.org$",
".*\\.bing\\.com$", ".*\\.bing\\.com$",

View File

@ -26,6 +26,7 @@ from dataclasses import dataclass
import html.parser import html.parser
import re import re
import time import time
import urlvalidator as uv
gMe = { gMe = {
@ -40,11 +41,12 @@ gConfigType = {
'--port': 'int', '--port': 'int',
'--config': 'str', '--config': 'str',
'--debug': 'bool', '--debug': 'bool',
'--allowed.schemes': 'list',
'--allowed.domains': 'list', '--allowed.domains': 'list',
'--bearer.insecure': 'str' '--bearer.insecure': 'str'
} }
gConfigNeeded = [ '--allowed.domains', '--bearer.insecure' ] gConfigNeeded = [ '--allowed.schemes', '--allowed.domains', '--bearer.insecure' ]
gAllowedCalls = [ "urltext", "urlraw", "pdf2text" ] gAllowedCalls = [ "urltext", "urlraw", "pdf2text" ]
@ -195,27 +197,6 @@ def debug_dump(meta: dict, data: dict):
f.write(f"\n\n\n\n{k}:{data[k]}\n\n\n\n") f.write(f"\n\n\n\n{k}:{data[k]}\n\n\n\n")
def validate_url(url: str, tag: str):
"""
Implement a re based filter logic on the specified url.
"""
tag=f"VU:{tag}"
if (not gMe.get('--allowed.domains')):
return UrlReqResp(False, 400, f"DBUG:{tag}:MissingAllowedDomains")
urlParts = urllib.parse.urlparse(url)
print(f"DBUG:ValidateUrl:{urlParts}, {urlParts.hostname}")
urlHName = urlParts.hostname
if not urlHName:
return UrlReqResp(False, 400, f"WARN:{tag}:Missing hostname in Url")
bMatched = False
for filter in gMe['--allowed.domains']:
if re.match(filter, urlHName):
bMatched = True
if not bMatched:
return UrlReqResp(False, 400, f"WARN:{tag}:requested hostname not allowed")
return UrlReqResp(True, 200)
def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str): def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str):
""" """
Common part of the url request handling used by both urlraw and urltext. Common part of the url request handling used by both urlraw and urltext.
@ -234,11 +215,9 @@ def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str):
url = queryParams['url'] url = queryParams['url']
print(f"DBUG:{tag}:Url:{url}") print(f"DBUG:{tag}:Url:{url}")
url = url[0] url = url[0]
if (not url) or (len(url) == 0): gotVU = uv.validate_url(url, tag)
return UrlReqResp(False, 400, f"WARN:{tag}:MissingUrl")
gotVU = validate_url(url, tag)
if not gotVU.callOk: if not gotVU.callOk:
return gotVU return UrlReqResp(gotVU.callOk, gotVU.statusCode, gotVU.statusMsg)
try: try:
hUA = ph.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0') hUA = ph.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0')
hAL = ph.headers.get('Accept-Language', "en-US,en;q=0.9") hAL = ph.headers.get('Accept-Language', "en-US,en;q=0.9")
@ -381,10 +360,11 @@ def handle_urltext(ph: ProxyHandler, pr: urllib.parse.ParseResult):
def process_pdf2text(url: str, startPN: int, endPN: int): def process_pdf2text(url: str, startPN: int, endPN: int):
import pypdf import pypdf
import io import io
urlParts = url.split('://',1) gotVU = uv.validate_url(url, "HandlePdf2Text")
if not (urlParts[0] in gAllowedPdfUrlTypes): if not gotVU.callOk:
return { 'status': 403, 'msg': f"WARN:HandlePdf2Text:ForbiddedUrlType:{urlParts[0]}:AllowedUrlTypes:{gAllowedPdfUrlTypes}" } return { 'status': gotVU.statusCode, 'msg': gotVU.statusMsg }
fPdf = open(urlParts[1], 'rb') urlParts = urllib.parse.urlparse(url)
fPdf = open(urlParts.path, 'rb')
dPdf = fPdf.read() dPdf = fPdf.read()
tPdf = "" tPdf = ""
oPdf = pypdf.PdfReader(io.BytesIO(dPdf)) oPdf = pypdf.PdfReader(io.BytesIO(dPdf))
@ -398,20 +378,13 @@ def process_pdf2text(url: str, startPN: int, endPN: int):
return { 'status': 200, 'msg': "Pdf2Text Response follows", 'data': tPdf } return { 'status': 200, 'msg': "Pdf2Text Response follows", 'data': tPdf }
gAllowedPdfUrlTypes = [ "file", "http", "https" ]
def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult): def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult):
""" """
Handle requests to pdf2text path, which is used to extract plain text Handle requests to pdf2text path, which is used to extract plain text
from the specified pdf file. from the specified pdf file.
""" """
queryParams = urllib.parse.parse_qs(pr.query) queryParams = urllib.parse.parse_qs(pr.query)
url = queryParams['url'] url = queryParams['url'][0]
print(f"DBUG:HandlePdf2Text:Url:{url}")
url = url[0]
if (not url) or (len(url) == 0):
ph.send_error(400, f"WARN:HandlePdf2Text:MissingUrl!")
return
startP = queryParams['startPageNumber'][0] startP = queryParams['startPageNumber'][0]
if startP: if startP:
startP = int(startP) startP = int(startP)
@ -422,7 +395,7 @@ def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult):
endP = int(endP) endP = int(endP)
else: else:
endP = -1 endP = -1
print(f"INFO:HandlePdf2Text:Processing:{url}...") print(f"INFO:HandlePdf2Text:Processing:{url}:{startP}:{endP}...")
gotP2T = process_pdf2text(url, startP, endP) gotP2T = process_pdf2text(url, startP, endP)
if (gotP2T['status'] != 200): if (gotP2T['status'] != 200):
ph.send_error(gotP2T['status'], gotP2T['msg'] ) ph.send_error(gotP2T['status'], gotP2T['msg'] )
@ -509,6 +482,7 @@ def process_args(args: list[str]):
if gMe.get(k) == None: if gMe.get(k) == None:
print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...") print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...")
exit(104) exit(104)
uv.validator_setup(gMe['--allowed.schemes'], gMe['--allowed.domains'])
def run(): def run():

View File

@ -10,6 +10,12 @@ gMe = {
} }
def validator_setup(allowedSchemes: list[str], allowedDomains: list[str]):
global gMe
gMe['--allowed.schemes'] = allowedSchemes
gMe['--allowed.domains'] = allowedDomains
@dataclass(frozen=True) @dataclass(frozen=True)
class UrlVResponse: class UrlVResponse:
""" """
@ -21,6 +27,9 @@ class UrlVResponse:
def validator_ok(tag: str): def validator_ok(tag: str):
"""
Cross check validator is setup as needed
"""
if (not gMe.get('--allowed.domains')): if (not gMe.get('--allowed.domains')):
return UrlVResponse(False, 400, f"DBUG:{tag}:MissingAllowedDomains") return UrlVResponse(False, 400, f"DBUG:{tag}:MissingAllowedDomains")
if (not gMe.get('--allowed.schemes')): if (not gMe.get('--allowed.schemes')):
@ -29,6 +38,8 @@ def validator_ok(tag: str):
def validate_fileurl(urlParts: urllib.parse.ParseResult, tag: str): def validate_fileurl(urlParts: urllib.parse.ParseResult, tag: str):
if urlParts.netloc != '':
return UrlVResponse(False, 400, f"WARN:{tag}:Malformed file url")
return UrlVResponse(True, 100) return UrlVResponse(True, 100)
@ -54,6 +65,8 @@ def validate_url(url: str, tag: str):
vok = validator_ok(tag) vok = validator_ok(tag)
if (not vok.callOk): if (not vok.callOk):
return vok return vok
if (not url):
return UrlVResponse(False, 400, f"WARN:{tag}:Missing url")
urlParts = urllib.parse.urlparse(url) urlParts = urllib.parse.urlparse(url)
print(f"DBUG:{tag}:{urlParts}, {urlParts.hostname}") print(f"DBUG:{tag}:{urlParts}, {urlParts.hostname}")
# Cross check scheme # Cross check scheme