Source code for mcp_server_webcrawl.utils.blobs

import os
import aiohttp
import asyncio
import base64
import concurrent
import hashlib
import io
import re
import threading
import traceback

from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import ParseResult, urlparse
from PIL import Image

from mcp_server_webcrawl.settings import DATA_DIRECTORY
from mcp_server_webcrawl.utils.logger import get_logger

HTTP_THREADS: int = 8
ALLOWED_THUMBNAIL_TYPES = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp"}
MAX_THUMBNAIL_BYTES = 2 * 1024 * 1024  # 2MB cap

logger = get_logger()

[docs] class ThumbnailManager: """ Manages thumbnail generation and caching for image files and URLs. """
[docs] def __init__(self): DATA_DIRECTORY.mkdir(parents=True, exist_ok=True) assert DATA_DIRECTORY.is_dir(), f"DATA_DIRECTORY {DATA_DIRECTORY} is not a directory" self.__temp_directory: Path = DATA_DIRECTORY / "thumb" if not self.__temp_directory.is_dir(): self.__temp_directory.mkdir(parents=True, exist_ok=True) os.chmod(self.__temp_directory, 0o700)
def __md5(self, path: str) -> str: return hashlib.md5(path.encode()).hexdigest() def __is_valid_url(self, path: str) -> tuple[bool, ParseResult | None]: try: result = urlparse(path) return all([result.scheme, result.netloc]), result except: return False, None def __is_valid_file(self, path: str) -> bool: return Path(path).is_file() def __get_temp_file(self, key: str) -> Path: return self.__temp_directory / f"{key}.webp" def __get_extension(self, path: str) -> str | None: ext = Path(path).suffix.lower() if ext: return ext # try to parse extension from the path is_valid, parsed = self.__is_valid_url(path) if is_valid: path_parts = parsed.path.split("/") if path_parts: last_part = path_parts[-1] if "." in last_part: return "." + last_part.split(".")[-1].lower() return None def __is_allowed_type(self, path: str) -> bool: ext = self.__get_extension(path) return ext in ALLOWED_THUMBNAIL_TYPES if ext else False def __clean_thumbs_directory(self): try: md5_pattern: re.Pattern = re.compile(r"^[0-9a-f]{32}$") cutoff_time: timedelta = datetime.now() - timedelta(hours=4) deleted_count: int = 0 for file_path in self.__temp_directory.glob("*"): if not file_path.is_file(): continue if not md5_pattern.match(file_path.name): continue file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime) if file_mtime < cutoff_time: file_path.unlink() deleted_count += 1 logger.info(f"Temporary file cleanup complete: {deleted_count} files deleted") except Exception as ex: logger.error( f"Error during temporary file cleanup: {str(ex)}\n{traceback.format_exc()}" ) def __check_content_length(self, headers) -> bool: """Helper to check if content length is acceptable""" if "Content-Length" in headers: content_length = int(headers["Content-Length"]) if content_length > MAX_THUMBNAIL_BYTES: logger.info( f"Skipping large file ({content_length} bytes > " f"{MAX_THUMBNAIL_BYTES} bytes)" ) return False return True async def __fetch_url( self, session: aiohttp.ClientSession, url: str, key: str ) -> str | None: temp_file = self.__get_temp_file(key) try: # check HEAD to get Content-Length without downloading async with session.head(url, timeout=1, allow_redirects=True) as head_response: if head_response.status == 200 and not self.__check_content_length(head_response.headers): return None async with session.get(url, timeout=2) as response: if response.status != 200: return None if not self.__check_content_length(response.headers): return None # stream the content with a size limit content = bytearray() chunk_size = 8192 # 8KB chunks total_size = 0 async for chunk in response.content.iter_chunked(chunk_size): total_size += len(chunk) if total_size > MAX_THUMBNAIL_BYTES: logger.info( f"Download exceeded size limit of {MAX_THUMBNAIL_BYTES} bytes " f"while streaming" ) return None content.extend(chunk) return self.__process_image_data(bytes(content), temp_file) except (aiohttp.ClientError, asyncio.TimeoutError) as ex: # http is the wild west, keep chugging logger.debug(f"HTTP error: {str(ex)}") return None def __process_image_data(self, data: bytes, temp_file: Path) -> str | None: """Process image data, save to temp file, and return base64 encoding""" thumbnail = self.__create_webp_thumbnail(data) if thumbnail is not None: temp_file.write_bytes(thumbnail) return base64.b64encode(thumbnail).decode("utf-8") return None async def __get_file(self, path: str, key: str) -> str | None: try: file_path = Path(path) content = file_path.read_bytes() temp_file = self.__get_temp_file(key) return self.__process_image_data(content, temp_file) except Exception as ex: logger.debug(f"File error: {str(ex)}") return None async def __process_path( self, session: aiohttp.ClientSession, path: str, results: dict[str, str | None], metrics: dict[str, int] ) -> None: key: str = self.__md5(path) temp_file: Path = self.__get_temp_file(key) is_valid_url, _ = self.__is_valid_url(path) valid_file: bool = self.__is_valid_file(path) if not (is_valid_url or valid_file) or not self.__is_allowed_type(path): return # cache hit if temp_file.exists(): content: bytes = temp_file.read_bytes() results[path] = base64.b64encode(content).decode("utf-8") metrics["total_cached"] += 1 return result: str | None = await self.__fetch_url(session, path, key) if is_valid_url else await self.__get_file(path, key) results[path] = result if result is None: metrics["total_errors"] += 1 else: metrics["total_returned"] += 1 async def __get_blobs_async(self, paths: list[str]) -> dict[str, str | None]: results = {path: None for path in paths} metrics = { "total_requested": len(paths), "total_returned": 0, "total_errors": 0, "total_cached": 0 } async with aiohttp.ClientSession() as session: # Process tasks in batches of HTTP_THREADS for i in range(0, len(paths), HTTP_THREADS): batch_paths = paths[i:i + HTTP_THREADS] batch_tasks = [ self.__process_path(session, path, results, metrics) for path in batch_paths ] await asyncio.gather(*batch_tasks) logger.info( f"Found {metrics['total_requested']}, fetched {metrics['total_returned']} " f"({metrics['total_errors']} errors, {metrics['total_cached']} cached)" ) return results def __create_webp_thumbnail(self, image_data: bytes, size: int = 512) -> bytes | None: img = None try: img = Image.open(io.BytesIO(image_data)) width, height = img.size max_dimension = max(width, height) if max_dimension > size: if width > height: new_width = size new_height = int(height * (new_width / width)) else: new_height = size new_width = int(width * (new_height / height)) img = img.resize((new_width, new_height), Image.LANCZOS) output = io.BytesIO() img.save( output, format="WEBP", quality=20, optimize=True, method=6 # highest compression ) return output.getvalue() except Exception as ex: logger.error(f"Error creating WebP thumbnail: {str(ex)}\n{traceback.format_exc()}") return None finally: if img is not None: img.close()
[docs] def get_thumbnails(self, paths: list[str]) -> dict[str, str | None]: """ Convert URLs or file paths to base64 encoded strings. Args: paths: List of URLs or file paths to convert Returns: Dictionary mapping paths to their base64 representation or None if failed """ assert paths is not None, "paths must be a list[str]" def run_in_thread(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(self.__get_blobs_async(paths)) finally: loop.close() try: with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(run_in_thread) results = future.result(timeout=5) # start cleanup in a background thread cleanup_thread = threading.Thread(target=self.__clean_thumbs_directory) cleanup_thread.daemon = True cleanup_thread.start() return results except Exception as ex: logger.error(f"Error fetching thumbnails: {ex}\n{traceback.format_exc()}") return {path: None for path in paths}