Source code for mcp_server_webcrawl.crawlers.wget.adapter

import os
import sqlite3
import traceback
from datetime import timezone

from contextlib import closing
from datetime import datetime
from pathlib import Path

from mcp_server_webcrawl.crawlers.base.adapter import (
    BaseManager,
    IndexState,
    IndexStatus,
    SitesGroup,
    INDEXED_BATCH_SIZE,
    INDEXED_RESOURCE_DEFAULT_PROTOCOL,
    INDEXED_TYPE_MAPPING,
    INDEXED_IGNORE_DIRECTORIES,
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.models.resources import (
    ResourceResult,
    ResourceResultType,
    RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
    SiteResult,
)
from mcp_server_webcrawl.utils.logger import get_logger

logger = get_logger()


[docs] class WgetManager(IndexedManager): """ Manages wget directory data in in-memory SQLite databases. Provides connection pooling and caching for efficient access. """
[docs] def __init__(self) -> None: """Initialize the wget manager with empty cache and statistics.""" super().__init__()
def _load_site_data(self, connection: sqlite3.Connection, directory: Path, site_id: int, index_state: IndexState = None) -> None: """ Load a wget directory into the database with parallel processing and batch SQL insertions. Args: connection: SQLite connection directory: path to the wget directory site_id: id for the site index_state: indexState object for tracking progress """ if not directory.exists() or not directory.is_dir(): logger.error(f"Directory not found or not a directory: {directory}") return if index_state is not None: index_state.set_status(IndexStatus.INDEXING) # collect files to process file_paths = [] for root, _, files in os.walk(directory): for filename in files: if filename == "robots.txt": continue rel_path = Path(root).relative_to(directory) ignore_file = False for ignore_dir in INDEXED_IGNORE_DIRECTORIES: if ignore_dir in str(rel_path): ignore_file = True break if not ignore_file: file_paths.append(Path(root) / filename) # each crawler a litle different with closing(connection.cursor()) as cursor: for i in range(0, len(file_paths), INDEXED_BATCH_SIZE): if index_state is not None and index_state.is_timeout(): index_state.set_status(IndexStatus.PARTIAL) return batch_file_paths: list[Path] = file_paths[i:i+INDEXED_BATCH_SIZE] batch_file_contents = BaseManager.read_files(batch_file_paths) batch_insert_resource_results: list[ResourceResult] = [] for file_path, content in batch_file_contents.items(): try: result: ResourceResult = self._prepare_wget_record(file_path, site_id, directory, content) if result: batch_insert_resource_results.append(result) if index_state is not None: index_state.increment_processed() except Exception as e: logger.error(f"Error processing file {file_path}: {e}\n{traceback.format_exc()}") self._execute_batch_insert(connection, cursor, batch_insert_resource_results) if index_state is not None and index_state.status == IndexStatus.INDEXING: index_state.set_status(IndexStatus.COMPLETE) def _prepare_wget_record(self, file_path: Path, site_id: int, base_dir: Path, content: str = None) -> ResourceResult | None: """ Prepare a record for batch insertion from a wget file. Args: file_path: path to the wget file site_id: id for the site base_dir: base directory for the wget capture content: optional pre-loaded file content Returns: Tuple of values ready for insertion, or None if processing fails """ try: relative_path = file_path.relative_to(base_dir) url = f"{INDEXED_RESOURCE_DEFAULT_PROTOCOL}{base_dir.name}/{str(relative_path).replace(os.sep, '/')}" decruftified_path = BaseManager.decruft_path(str(file_path)) extension = Path(decruftified_path).suffix.lower() resource_type = INDEXED_TYPE_MAPPING.get(extension, ResourceResultType.OTHER) file_stat = file_path.stat() file_size = file_stat.st_size file_created = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc) file_modified = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc) # use pre-loaded content if available, otherwise rely on read_file_contents file_content = content if file_content is None: file_content = BaseManager.read_file_contents(file_path, resource_type) return ResourceResult( id=BaseManager.string_to_id(url), site=site_id, created=file_created, modified=file_modified, url=url, type=resource_type, status=200, headers=BaseManager.get_basic_headers(file_size, resource_type), content=file_content, size=file_size, time=0, ) except Exception as e: logger.error(f"Error preparing record for file {file_path}: {e}") return None
manager: WgetManager = WgetManager()
[docs] def get_sites( datasrc: Path, ids: list[int] | None = None, fields: list[str] | None = None ) -> list[SiteResult]: """ List site directories in the datasrc directory as sites. Args: datasrc: path to the directory containing site subdirectories ids: optional list of site IDs to filter by fields: optional list of fields to include in the response Returns: List of SiteResult objects, one for each site directory Notes: Returns an empty list if the datasrc directory doesn't exist. """ return manager.get_sites_for_directories(datasrc, ids, fields)
[docs] def get_resources( datasrc: Path, sites: list[int] | None = None, query: str = "", fields: list[str] | None = None, sort: str | None = None, limit: int = RESOURCES_LIMIT_DEFAULT, offset: int = 0, ) -> tuple[list[ResourceResult], int, IndexState]: """ Get resources from wget directories using in-memory SQLite. Args: datasrc: path to the directory containing wget captures sites: optional list of site IDs to filter by query: search query string fields: optional list of fields to include in response sort: sort order for results limit: maximum number of results to return offset: number of results to skip for pagination Returns: Tuple of (list of ResourceResult objects, total count) """ sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites) assert sites_results, "At least one site is required to search" site_paths = [site.path for site in sites_results] sites_group = SitesGroup(datasrc, sites, site_paths) return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)