diff --git a/tools/server/public_simplechat/local.tools/simpleproxy.json b/tools/server/public_simplechat/local.tools/simpleproxy.json index 1bae207341..72f7f81cf3 100644 --- a/tools/server/public_simplechat/local.tools/simpleproxy.json +++ b/tools/server/public_simplechat/local.tools/simpleproxy.json @@ -1,4 +1,9 @@ { + "allowed.schemes": [ + "file", + "http", + "https" + ], "allowed.domains": [ ".*\\.wikipedia\\.org$", ".*\\.bing\\.com$", diff --git a/tools/server/public_simplechat/local.tools/simpleproxy.py b/tools/server/public_simplechat/local.tools/simpleproxy.py index 03b9a330eb..3b2247cbbd 100644 --- a/tools/server/public_simplechat/local.tools/simpleproxy.py +++ b/tools/server/public_simplechat/local.tools/simpleproxy.py @@ -26,6 +26,7 @@ from dataclasses import dataclass import html.parser import re import time +import urlvalidator as uv gMe = { @@ -40,11 +41,12 @@ gConfigType = { '--port': 'int', '--config': 'str', '--debug': 'bool', + '--allowed.schemes': 'list', '--allowed.domains': 'list', '--bearer.insecure': 'str' } -gConfigNeeded = [ '--allowed.domains', '--bearer.insecure' ] +gConfigNeeded = [ '--allowed.schemes', '--allowed.domains', '--bearer.insecure' ] gAllowedCalls = [ "urltext", "urlraw", "pdf2text" ] @@ -195,27 +197,6 @@ def debug_dump(meta: dict, data: dict): f.write(f"\n\n\n\n{k}:{data[k]}\n\n\n\n") -def validate_url(url: str, tag: str): - """ - Implement a re based filter logic on the specified url. - """ - tag=f"VU:{tag}" - if (not gMe.get('--allowed.domains')): - return UrlReqResp(False, 400, f"DBUG:{tag}:MissingAllowedDomains") - urlParts = urllib.parse.urlparse(url) - print(f"DBUG:ValidateUrl:{urlParts}, {urlParts.hostname}") - urlHName = urlParts.hostname - if not urlHName: - return UrlReqResp(False, 400, f"WARN:{tag}:Missing hostname in Url") - bMatched = False - for filter in gMe['--allowed.domains']: - if re.match(filter, urlHName): - bMatched = True - if not bMatched: - return UrlReqResp(False, 400, f"WARN:{tag}:requested hostname not allowed") - return UrlReqResp(True, 200) - - def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str): """ Common part of the url request handling used by both urlraw and urltext. @@ -234,11 +215,9 @@ def handle_urlreq(ph: ProxyHandler, pr: urllib.parse.ParseResult, tag: str): url = queryParams['url'] print(f"DBUG:{tag}:Url:{url}") url = url[0] - if (not url) or (len(url) == 0): - return UrlReqResp(False, 400, f"WARN:{tag}:MissingUrl") - gotVU = validate_url(url, tag) + gotVU = uv.validate_url(url, tag) if not gotVU.callOk: - return gotVU + return UrlReqResp(gotVU.callOk, gotVU.statusCode, gotVU.statusMsg) try: hUA = ph.headers.get('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0') hAL = ph.headers.get('Accept-Language', "en-US,en;q=0.9") @@ -381,10 +360,11 @@ def handle_urltext(ph: ProxyHandler, pr: urllib.parse.ParseResult): def process_pdf2text(url: str, startPN: int, endPN: int): import pypdf import io - urlParts = url.split('://',1) - if not (urlParts[0] in gAllowedPdfUrlTypes): - return { 'status': 403, 'msg': f"WARN:HandlePdf2Text:ForbiddedUrlType:{urlParts[0]}:AllowedUrlTypes:{gAllowedPdfUrlTypes}" } - fPdf = open(urlParts[1], 'rb') + gotVU = uv.validate_url(url, "HandlePdf2Text") + if not gotVU.callOk: + return { 'status': gotVU.statusCode, 'msg': gotVU.statusMsg } + urlParts = urllib.parse.urlparse(url) + fPdf = open(urlParts.path, 'rb') dPdf = fPdf.read() tPdf = "" oPdf = pypdf.PdfReader(io.BytesIO(dPdf)) @@ -398,20 +378,13 @@ def process_pdf2text(url: str, startPN: int, endPN: int): return { 'status': 200, 'msg': "Pdf2Text Response follows", 'data': tPdf } -gAllowedPdfUrlTypes = [ "file", "http", "https" ] - def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult): """ Handle requests to pdf2text path, which is used to extract plain text from the specified pdf file. """ queryParams = urllib.parse.parse_qs(pr.query) - url = queryParams['url'] - print(f"DBUG:HandlePdf2Text:Url:{url}") - url = url[0] - if (not url) or (len(url) == 0): - ph.send_error(400, f"WARN:HandlePdf2Text:MissingUrl!") - return + url = queryParams['url'][0] startP = queryParams['startPageNumber'][0] if startP: startP = int(startP) @@ -422,7 +395,7 @@ def handle_pdf2text(ph: ProxyHandler, pr: urllib.parse.ParseResult): endP = int(endP) else: endP = -1 - print(f"INFO:HandlePdf2Text:Processing:{url}...") + print(f"INFO:HandlePdf2Text:Processing:{url}:{startP}:{endP}...") gotP2T = process_pdf2text(url, startP, endP) if (gotP2T['status'] != 200): ph.send_error(gotP2T['status'], gotP2T['msg'] ) @@ -509,6 +482,7 @@ def process_args(args: list[str]): if gMe.get(k) == None: print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...") exit(104) + uv.validator_setup(gMe['--allowed.schemes'], gMe['--allowed.domains']) def run(): diff --git a/tools/server/public_simplechat/local.tools/urlvalidator.py b/tools/server/public_simplechat/local.tools/urlvalidator.py index 59f796d430..e3fb6b1b32 100644 --- a/tools/server/public_simplechat/local.tools/urlvalidator.py +++ b/tools/server/public_simplechat/local.tools/urlvalidator.py @@ -10,6 +10,12 @@ gMe = { } +def validator_setup(allowedSchemes: list[str], allowedDomains: list[str]): + global gMe + gMe['--allowed.schemes'] = allowedSchemes + gMe['--allowed.domains'] = allowedDomains + + @dataclass(frozen=True) class UrlVResponse: """ @@ -21,6 +27,9 @@ class UrlVResponse: def validator_ok(tag: str): + """ + Cross check validator is setup as needed + """ if (not gMe.get('--allowed.domains')): return UrlVResponse(False, 400, f"DBUG:{tag}:MissingAllowedDomains") if (not gMe.get('--allowed.schemes')): @@ -29,6 +38,8 @@ def validator_ok(tag: str): def validate_fileurl(urlParts: urllib.parse.ParseResult, tag: str): + if urlParts.netloc != '': + return UrlVResponse(False, 400, f"WARN:{tag}:Malformed file url") return UrlVResponse(True, 100) @@ -54,6 +65,8 @@ def validate_url(url: str, tag: str): vok = validator_ok(tag) if (not vok.callOk): return vok + if (not url): + return UrlVResponse(False, 400, f"WARN:{tag}:Missing url") urlParts = urllib.parse.urlparse(url) print(f"DBUG:{tag}:{urlParts}, {urlParts.hostname}") # Cross check scheme