Source code for mcp_server_webcrawl.crawlers.interrobot.adapter

import re
import sqlite3
import traceback
from contextlib import closing
from logging import Logger
from pathlib import Path
from typing import Final

from mcp_server_webcrawl.crawlers.base.adapter import IndexState, IndexStatus, BaseManager, SitesGroup
from mcp_server_webcrawl.models.resources import (
    ResourceResult,
    RESOURCES_LIMIT_DEFAULT,
)

from mcp_server_webcrawl.models.sites import SiteResult
from mcp_server_webcrawl.utils import from_isoformat_zulu
from mcp_server_webcrawl.utils.logger import get_logger

# maybe dedupe with near match RESOURCES version
INTERROBOT_RESOURCE_FIELD_MAPPING: Final[dict[str, str]] = {
    "id": "ResourcesFullText.Id",
    "site": "ResourcesFullText.Project",
    "created": "Resources.Created",
    "modified": "Resources.Modified",
    "url": "ResourcesFullText.Url",
    "status": "ResourcesFullText.Status",
    "size": "Resources.Size",
    "type": "ResourcesFullText.Type",
    "headers": "ResourcesFullText.Headers",
    "content": "ResourcesFullText.Content",
    "time": "ResourcesFullText.Time"
}

INTERROBOT_SITE_FIELD_REQUIRED: Final[set[str]] = set(["id", "url"])

# legit different from default version (extra robots)
INTERROBOT_SITE_FIELD_MAPPING: Final[dict[str, str]] = {
    "id": "Project.Id",
    "url": "Project.Url",
    "created": "Project.Created",
    "modified": "Project.Modified",
    "robots": "Project.RobotsText",
}

logger: Logger = get_logger()

[docs] class InterroBotManager(BaseManager): """ Manages HTTP text files in in-memory SQLite databases. Provides connection pooling and caching for efficient access. """
[docs] def __init__(self) -> None: """Initialize the HTTP text manager with empty cache and statistics.""" super().__init__()
[docs] def get_connection(self, group: SitesGroup) -> tuple[sqlite3.Connection | None, IndexState]: """ Get database connection for sites in the group, creating if needed. Args: group: Group of sites to connect to Returns: Tuple of (SQLite connection to in-memory database with data loaded or None if building, IndexState associated with this database) """ index_state = IndexState() index_state.set_status(IndexStatus.REMOTE) connection: sqlite3.Connection try: # note, responsible for implementing closing() on other side connection = sqlite3.connect(group.datasrc) except sqlite3.Error as ex: logger.error(f"SQLite error reading database: {ex}\n{traceback.format_exc()}") except (FileNotFoundError, PermissionError) as ex: logger.error(f"Database access error: {group.datasrc}\n{traceback.format_exc()}") raise except Exception as ex: logger.error(f"Unexpected error reading database {group.datasrc}: {ex}\n{traceback.format_exc()}") raise return connection, index_state
manager: InterroBotManager = InterroBotManager()
[docs] def get_sites(datasrc: Path, ids=None, fields=None) -> list[SiteResult]: """ Get sites based on the provided parameters. Args: datasrc: path to the database ids: optional list of site IDs fields: list of fields to include in response Returns: List of SiteResult objects """ site_fields_required: list[str] = ["id", "url"] site_fields_default: list[str] = site_fields_required + ["created", "modified"] site_fields_available: list[str] = list(INTERROBOT_SITE_FIELD_MAPPING.keys()) # build query params: dict[str, int | str] = {} # these inputs are named parameters ids_clause: str = "" if ids and isinstance(ids, list) and len(ids) > 0: placeholders: list[str] = [f":id{i}" for i in range(len(ids))] ids_clause: str = f" WHERE Project.Id IN ({','.join(placeholders)})" params.update({f"id{i}": id_val for i, id_val in enumerate(ids)}) # these inputs are not parameterized # fields will be returned from database, if found in INTERROBOT_SITE_FIELD_MAPPING selected_fields = set(site_fields_required) if fields and isinstance(fields, list): selected_fields.update(f for f in fields if f in site_fields_available) else: selected_fields.update(site_fields_default) safe_sql_fields = [INTERROBOT_SITE_FIELD_MAPPING[f] for f in selected_fields] assert all(re.match(r"^[A-Za-z\.]+$", field) for field in safe_sql_fields), "Unknown or unsafe field requested" safe_sql_fields_joined: str = ", ".join(safe_sql_fields) statement: str = f"SELECT {safe_sql_fields_joined} FROM Projects AS Project{ids_clause} ORDER BY Project.Url ASC" sql_results: list[dict[str, int | str | None]] = [] try: if not statement.strip().upper().startswith("SELECT"): logger.error("Unauthorized SQL statement") raise ValueError("Only SELECT queries are permitted") with closing(sqlite3.connect(datasrc)) as conn: conn.row_factory = sqlite3.Row with closing(conn.cursor()) as cursor: cursor.execute(statement, params or {}) sql_results = [{k.lower(): v for k, v in dict(row).items()} for row in cursor.fetchall()] except sqlite3.Error as ex: logger.error(f"SQLite error reading database: {ex}\n{traceback.format_exc()}") return [] except Exception as ex: logger.error(f"Database error: {ex}") return [] results: list[SiteResult] = [] for row in sql_results: results.append(SiteResult( path=datasrc, id=row.get("id"), url=row.get("url", ""), created=from_isoformat_zulu(row.get("created")), modified=from_isoformat_zulu(row.get("modified")), robots=row.get("robotstext"), metadata=None, )) return results
[docs] def get_resources( datasrc: Path, sites: list[int] | None = None, query: str = "", fields: list[str] | None = None, sort: str | None = None, limit: int = RESOURCES_LIMIT_DEFAULT, offset: int = 0, ) -> tuple[list[ResourceResult], int, IndexState]: """ Get resources from wget directories using in-memory SQLite. Args: datasrc: path to the directory containing wget captures sites: optional list of site IDs to filter by query: search query string fields: optional list of fields to include in response sort: sort order for results limit: maximum number of results to return offset: number of results to skip for pagination Returns: Tuple of (list of ResourceResult objects, total count) """ sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites) assert sites_results, "At least one site is required to search" site_paths = [site.path for site in sites_results] sites_group = SitesGroup(datasrc, sites, site_paths) # InterroBot uses ints in place of strings swap_values = { "type" : { "": 0, # UNDEFINED "html": 1, # PAGE "other": 2, # OTHER (could also be 5 or 12 depending on context) "rss": 3, # FEED "iframe": 4, # FRAME "img": 6, # IMAGE "audio": 7, # AUDIO "video": 8, # VIDEO "font": 9, # FONT "style": 10, # CSS "script": 11, # SCRIPT "text": 13, # TEXT "pdf": 14, # PDF "doc": 15 # DOC } } return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset, swap_values)