From 494d063657207eb09377f10cd2c9edde8fdac992 Mon Sep 17 00:00:00 2001 From: hanishkvc Date: Sun, 2 Nov 2025 20:13:10 +0530 Subject: [PATCH] SimpleChatTC:SimpleProxy: getting local / web file module ++ Added logic to help get a file from either the local file system or from the web, based on the url specified. Update pdfmagic module to use the same, so that it can support both local as well as web based pdf. Bring in the debug module, which I had forgotten to commit, after moving debug helper code from simpleproxy.py to the debug module --- .../public_simplechat/local.tools/debug.py | 24 ++++++ .../local.tools/filemagic.py | 79 +++++++++++++++++++ .../public_simplechat/local.tools/pdfmagic.py | 9 ++- .../local.tools/simpleproxy.py | 3 + 4 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 tools/server/public_simplechat/local.tools/debug.py create mode 100644 tools/server/public_simplechat/local.tools/filemagic.py diff --git a/tools/server/public_simplechat/local.tools/debug.py b/tools/server/public_simplechat/local.tools/debug.py new file mode 100644 index 0000000000..bf42be631c --- /dev/null +++ b/tools/server/public_simplechat/local.tools/debug.py @@ -0,0 +1,24 @@ +# Helpers for debugging +# by Humans for All + + +import time + +gMe = { '--debug' : False } + + +def setup(bEnable): + global gMe + gMe['--debug'] = bEnable + + +def dump(meta: dict, data: dict): + if not gMe['--debug']: + return + timeTag = f"{time.time():0.12f}" + with open(f"/tmp/simpleproxy.{timeTag}.meta", '+w') as f: + for k in meta: + f.write(f"\n\n\n\n{k}:{meta[k]}\n\n\n\n") + with open(f"/tmp/simpleproxy.{timeTag}.data", '+w') as f: + for k in data: + f.write(f"\n\n\n\n{k}:{data[k]}\n\n\n\n") diff --git a/tools/server/public_simplechat/local.tools/filemagic.py b/tools/server/public_simplechat/local.tools/filemagic.py new file mode 100644 index 0000000000..dfadb08095 --- /dev/null +++ b/tools/server/public_simplechat/local.tools/filemagic.py @@ -0,0 +1,79 @@ +# Handle file related helpers, be it a local file or one on the internet +# by Humans for All + +import urllib.request +import urllib.parse +import debug +from dataclasses import dataclass + + +@dataclass(frozen=True) +class Response: + """ + Used to return result wrt urlreq helper below. + """ + callOk: bool + statusCode: int + statusMsg: str = "" + contentType: str = "" + contentData: bytes = b"" + + + +def get_from_web(url: str, tag: str, inContentType: str, inHeaders: dict[str, str]): + """ + Get the url specified from web. + + If passed header doesnt contain certain useful http header entries, + some predefined defaults will be used in place. + """ + try: + hUA = inHeaders.get('User-Agent', None) + if not hUA: + hUA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:132.0) Gecko/20100101 Firefox/132.0' + hAL = inHeaders.get('Accept-Language', None) + if not hAL: + hAL = "en-US,en;q=0.9" + hA = inHeaders.get('Accept', None) + if not hA: + hA = "text/html,*/*" + headers = { + 'User-Agent': hUA, + 'Accept': hA, + 'Accept-Language': hAL + } + req = urllib.request.Request(url, headers=headers) + # Get requested url + print(f"DBUG:{tag}:Req:{req.full_url}:{req.headers}") + with urllib.request.urlopen(req, timeout=10) as response: + contentData = response.read() + statusCode = response.status or 200 + contentType = response.getheader('Content-Type') or inContentType + debug.dump({ 'url': req.full_url, 'headers': req.headers, 'ctype': contentType }, { 'cdata': contentData }) + return Response(True, statusCode, "", contentType, contentData) + except Exception as exc: + return Response(False, 502, f"WARN:{tag}:Failed:{exc}") + + +def get_from_local(urlParts: urllib.parse.ParseResult, tag: str, inContentType: str): + """ + Get the requested file from the local filesystem + """ + try: + fPdf = open(urlParts.path, 'rb') + dPdf = fPdf.read() + return Response(True, 200, "", inContentType, dPdf) + except Exception as exc: + return Response(False, 502, f"WARN:{tag}:Failed:{exc}") + + +def get_file(url: str, tag: str, inContentType: str, inHeaders: dict[str, str]={}): + """ + Based on the scheme specified in the passed url, + either get from local file system or from the web. + """ + urlParts = urllib.parse.urlparse(url) + if urlParts.scheme == "file": + return get_from_local(urlParts, tag, inContentType) + else: + return get_from_web(url, tag, inContentType, inHeaders) diff --git a/tools/server/public_simplechat/local.tools/pdfmagic.py b/tools/server/public_simplechat/local.tools/pdfmagic.py index d89496e366..336a61250a 100644 --- a/tools/server/public_simplechat/local.tools/pdfmagic.py +++ b/tools/server/public_simplechat/local.tools/pdfmagic.py @@ -3,6 +3,7 @@ import urllib.parse import urlvalidator as uv +import filemagic as mFile from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -25,11 +26,11 @@ def process_pdf2text(url: str, startPN: int, endPN: int): gotVU = uv.validate_url(url, "HandlePdf2Text") if not gotVU.callOk: return { 'status': gotVU.statusCode, 'msg': gotVU.statusMsg } - urlParts = urllib.parse.urlparse(url) - fPdf = open(urlParts.path, 'rb') - dPdf = fPdf.read() + gotFile = mFile.get_file(url, "ProcessPdf2Text", "application/pdf", {}) + if not gotFile.callOk: + return { 'status': gotFile.statusCode, 'msg': gotFile.statusMsg, 'data': gotFile.contentData} tPdf = "" - oPdf = pypdf.PdfReader(io.BytesIO(dPdf)) + oPdf = pypdf.PdfReader(io.BytesIO(gotFile.contentData)) if (startPN <= 0): startPN = 1 if (endPN <= 0) or (endPN > len(oPdf.pages)): diff --git a/tools/server/public_simplechat/local.tools/simpleproxy.py b/tools/server/public_simplechat/local.tools/simpleproxy.py index eda88750d0..4a74c6a254 100644 --- a/tools/server/public_simplechat/local.tools/simpleproxy.py +++ b/tools/server/public_simplechat/local.tools/simpleproxy.py @@ -26,6 +26,7 @@ import urlvalidator as uv from typing import Callable import pdfmagic as mPdf import webmagic as mWeb +import debug as mDebug gMe = { @@ -245,9 +246,11 @@ def process_args(args: list[str]): if gMe.get(k) == None: print(f"ERRR:ProcessArgs:{k}:missing, did you forget to pass the config file...") exit(104) + mDebug.setup(gMe['--debug']) uv.validator_setup(gMe['--allowed.schemes'], gMe['--allowed.domains']) + def run(): try: gMe['serverAddr'] = ('', gMe['--port'])