From c597572e109c24b1810d6a6cbedfb43546c30b33 Mon Sep 17 00:00:00 2001 From: hanishkvc Date: Sun, 2 Nov 2025 14:15:07 +0530 Subject: [PATCH] SimpleChatTC:SimpleProxy: Use urlvalidator Add --allowed.schemes config entry as a needed config. Setup the url validator. Use this wrt urltext, urlraw and pdf2text This allows user to control whether local file access is enabled or not. By default in the sample simpleproxy.json config file local file access is allowed. --- .../local.tools/simpleproxy.json | 5 ++ .../local.tools/simpleproxy.py | 52 +++++-------------- .../local.tools/urlvalidator.py | 13 +++++ 3 files changed, 31 insertions(+), 39 deletions(-) diff --git a/tools/server/public_simplechat/local.tools/simpleproxy.json b/tools/server/public_simplechat/local.tools/simpleproxy.json index 1bae207341..72f7f81cf3 100644 --- a/tools/server/public_simplechat/local.tools/simpleproxy.json +++ b/tools/server/public_simplechat/local.tools/simpleproxy.json @@ -1,4 +1,9 @@ { + "allowed.schemes": [ + "file", + "http", + "https" + ], "allowed.domains": [ ".*\\.wikipedia\\.org$", ".*\\.bing\\.com$", diff --git a/tools/server/public_simplechat/local.tools/simpleproxy.py b/tools/server/public_simplechat/local.tools/simpleproxy.py index 03b9a330eb..3b2247cbbd 100644 --- a/tools/server/public_simplechat/local.tools/simpleproxy.py +++ b/tools/server/public_simplechat/local.tools/simpleproxy.py @@ -26,6 +26,7 @@ from dataclasses import dataclass import html.parser import re import time +import urlvalidator as uv gMe = { @@ -40,11 +41,12 @@ gConfigType = { '--port': 'int', '--config': 'str', '--debug': 'bool', + '--allowed.schemes': 'list', '--allowed.domains': 'list', '--bearer.insecure': 'str' } -gConfigNeeded = [ '--allowed.domains', '--bearer.insecure' ] +gConfigNeeded = [ '--allowed.schemes', '--allowed.domains', '--bearer.insecure' ] gAllowedCalls = [ "urltext", "urlraw", "pdf2text" ] @@ -195,27 +197,6 @@ def debug_dump(meta: dict, data: dict): f.write(f"\n\n\n\n{k}:{data[k]}\n\n\n\n") -def validate_url(url: str, tag: str): - """ - Implement a re based filter logic on the specified url. - """ - tag=f"VU:{tag}" - if (not gMe.get('--allowed.domains')): - return UrlReqResp(False, 400, f"DBUG:{tag}:MissingAllowedDomains") - urlParts = urllib.parse.urlparse(url) - print(f"DBUG:ValidateUrl:{urlParts}, {urlParts.hostname}") - urlHName = urlParts.hostname - if not urlHName: - return UrlReqResp(False, 400, f"WARN:{tag}:Missing hostname in Url") - bMatched = False - for filter in gMe['--allowed.domains']: - if re.match(filter, urlHName): - bMatched = True - if not bMatched: - return UrlReqResp(False, 400, f"WARN:{tag}:requested hostname not allowed") - return UrlReqResp(True, 200) - - def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str): """ Common part of the url request handling used by both urlraw and urltext. @@ -234,11 +215,9 @@ def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str): url = queryParams['url'] print(f"DBUG:{tag}:Url:{url}") url = url[0] - if (not url) or (len(url) == 0): - return UrlReqResp(False, 400, f"WARN:{tag}:MissingUrl") - gotVU = validate_url(url, tag) + gotVU = uv.validate_url(url, tag) if not gotVU.callOk: - return gotVU + return UrlReqResp(gotVU.callOk, gotVU.statusCode, gotVU.statusMsg) try: hUA = ph.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0') hAL = ph.headers.get('Accept-Language', "en-US,en;q=0.9") @@ -381,10 +360,11 @@ def handle_urltext(ph: ProxyHandler, pr: urllib.parse.ParseResult): def process_pdf2text(url: str, startPN: int, endPN: int): import pypdf import io - urlParts = url.split('://',1) - if not (urlParts[0] in gAllowedPdfUrlTypes): - return { 'status': 403, 'msg': f"WARN:HandlePdf2Text:ForbiddedUrlType:{urlParts[0]}:AllowedUrlTypes:{gAllowedPdfUrlTypes}" } - fPdf = open(urlParts[1], 'rb') + gotVU = uv.validate_url(url, "HandlePdf2Text") + if not gotVU.callOk: + return { 'status': gotVU.statusCode, 'msg': gotVU.statusMsg } + urlParts = urllib.parse.urlparse(url) + fPdf = open(urlParts.path, 'rb') dPdf = fPdf.read() tPdf = "" oPdf = pypdf.PdfReader(io.BytesIO(dPdf)) @@ -398,20 +378,13 @@ def process_pdf2text(url: str, startPN: int, endPN: int): return { 'status': 200, 'msg': "Pdf2Text Response follows", 'data': tPdf } -gAllowedPdfUrlTypes = [ "file", "http", "https" ] - def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult): """ Handle requests to pdf2text path, which is used to extract plain text from the specified pdf file. """ queryParams = urllib.parse.parse_qs(pr.query) - url = queryParams['url'] - print(f"DBUG:HandlePdf2Text:Url:{url}") - url = url[0] - if (not url) or (len(url) == 0): - ph.send_error(400, f"WARN:HandlePdf2Text:MissingUrl!") - return + url = queryParams['url'][0] startP = queryParams['startPageNumber'][0] if startP: startP = int(startP) @@ -422,7 +395,7 @@ def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult): endP = int(endP) else: endP = -1 - print(f"INFO:HandlePdf2Text:Processing:{url}...") + print(f"INFO:HandlePdf2Text:Processing:{url}:{startP}:{endP}...") gotP2T = process_pdf2text(url, startP, endP) if (gotP2T['status'] != 200): ph.send_error(gotP2T['status'], gotP2T['msg'] ) @@ -509,6 +482,7 @@ def process_args(args: list[str]): if gMe.get(k) == None: print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...") exit(104) + uv.validator_setup(gMe['--allowed.schemes'], gMe['--allowed.domains']) def run(): diff --git a/tools/server/public_simplechat/local.tools/urlvalidator.py b/tools/server/public_simplechat/local.tools/urlvalidator.py index 59f796d430..e3fb6b1b32 100644 --- a/tools/server/public_simplechat/local.tools/urlvalidator.py +++ b/tools/server/public_simplechat/local.tools/urlvalidator.py @@ -10,6 +10,12 @@ gMe = { } +def validator_setup(allowedSchemes: list[str], allowedDomains: list[str]): + global gMe + gMe['--allowed.schemes'] = allowedSchemes + gMe['--allowed.domains'] = allowedDomains + + @dataclass(frozen=True) class UrlVResponse: """ @@ -21,6 +27,9 @@ class UrlVResponse: def validator_ok(tag: str): + """ + Cross check validator is setup as needed + """ if (not gMe.get('--allowed.domains')): return UrlVResponse(False, 400, f"DBUG:{tag}:MissingAllowedDomains") if (not gMe.get('--allowed.schemes')): @@ -29,6 +38,8 @@ def validator_ok(tag: str): def validate_fileurl(urlParts: urllib.parse.ParseResult, tag: str): + if urlParts.netloc != '': + return UrlVResponse(False, 400, f"WARN:{tag}:Malformed file url") return UrlVResponse(True, 100) @@ -54,6 +65,8 @@ def validate_url(url: str, tag: str): vok = validator_ok(tag) if (not vok.callOk): return vok + if (not url): + return UrlVResponse(False, 400, f"WARN:{tag}:Missing url") urlParts = urllib.parse.urlparse(url) print(f"DBUG:{tag}:{urlParts}, {urlParts.hostname}") # Cross check scheme