Source code for mcp_server_webcrawl.crawlers.katana.adapter

import re
import sqlite3

from itertools import chain
from contextlib import closing
from pathlib import Path

from datetime import datetime, timezone

from mcp_server_webcrawl.crawlers.base.adapter import (
    IndexState,
    IndexStatus,
    BaseManager,
    SitesGroup,
    INDEXED_BATCH_SIZE,
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.models.resources import (
    ResourceResult,
    ResourceResultType,
    RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
    SiteResult,
)
from mcp_server_webcrawl.utils.logger import get_logger

logger = get_logger()


[docs] class KatanaManager(IndexedManager): """ Manages HTTP text files in in-memory SQLite databases. Provides connection pooling and caching for efficient access. """
[docs] def __init__(self) -> None: """Initialize the HTTP text manager with empty cache and statistics.""" super().__init__()
def _load_site_data(self, connection: sqlite3.Connection, directory: Path, site_id: int, index_state: IndexState = None) -> None: """ Load a site directory of HTTP text files into the database with parallel reading and batch SQL insertions. Args: connection: SQLite connection directory: path to the site directory site_id: ID for the site index_state: tracker for FTS indexing status """ if not directory.exists() or not directory.is_dir(): logger.error(f"Directory not found or not a directory: {directory}") return if index_state is not None: index_state.set_status(IndexStatus.INDEXING) file_paths = list(chain( directory.glob("*.txt"), directory.glob("*/*.txt") # katana stores offsite assets under hostname )) with closing(connection.cursor()) as cursor: for i in range(0, len(file_paths), INDEXED_BATCH_SIZE): if index_state is not None and index_state.is_timeout(): index_state.set_status(IndexStatus.PARTIAL) return batch_file_paths: list[Path] = file_paths[i:i+INDEXED_BATCH_SIZE] batch_file_contents = BaseManager.read_files(batch_file_paths) batch_insert_resource_results: list[ResourceResult] = [] for file_path, content in batch_file_contents.items(): # avoid readme in repo, katana crawl files should be named 9080ef8... if file_path.name.lower().endswith("readme.txt"): continue try: record = self._prepare_katana_record(file_path, site_id, content) if record: batch_insert_resource_results.append(record) if index_state is not None: index_state.increment_processed() except Exception as e: logger.error(f"Error processing file {file_path}: {e}") self._execute_batch_insert(connection, cursor, batch_insert_resource_results) if index_state is not None and index_state.status == IndexStatus.INDEXING: index_state.set_status(IndexStatus.COMPLETE) def _prepare_katana_record(self, file_path: Path, site_id: int, content: str) -> ResourceResult | None: """ Prepare a record for batch insertion. Args: file_path: path to the Katana crawl file record site_id: ID for the site content: loaded file content Returns: ResourceResult object ready for insertion, or None if processing fails """ if file_path.is_file(): file_stat = file_path.stat() # HTTP header modified mostly useless, change my mind file_created = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc) file_modified = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc) else: file_created = None file_modified = None # crawl format: <url>\n\n<request>\n\n<headers>...<response> parts: list[str] = content.split("\n\n", 2) if len(parts) < 3: logger.warning(f"Invalid HTTP text format in file {file_path}") return None url: str = parts[0].strip() response_data: str = parts[2].strip() try: response_parts: list[str] = response_data.split("\n\n", 1) headers: str = response_parts[0].strip() body: str = response_parts[1].strip() if len(response_parts) > 1 else "" if "Transfer-Encoding: chunked" in headers: body = body.split("\n", 1)[1].strip() # remove hex prefix body = body.rsplit("\n0", 1)[0].strip() # remove trailing "0" terminator # status from the first line of headers status_match: str = re.search(r"HTTP/\d\.\d\s+(\d+)", headers.split("\n")[0]) status_code: int = int(status_match.group(1)) if status_match else 0 content_type_match = re.search(r"Content-Type:\s*([^\r\n;]+)", headers, re.IGNORECASE) content_type = content_type_match.group(1).strip() if content_type_match else "" resource_type = self._determine_resource_type(content_type) content_size = len(body) resource_id = BaseManager.string_to_id(url) return ResourceResult( id=resource_id, site=site_id, created=file_created, modified=file_modified, url=url, type=resource_type, headers=headers, content=body if self._is_text_content(content_type) else None, status=status_code, size=content_size, time=0 # time not available in file or Katana index ) except Exception as e: logger.error(f"Error processing HTTP response in file {file_path}: {e}") return None
manager: KatanaManager = KatanaManager()
[docs] def get_sites( datasrc: Path, ids: list[int] | None = None, fields: list[str] | None = None ) -> list[SiteResult]: """ List site directories in the datasrc directory as sites. Args: datasrc: path to the directory containing site subdirectories ids: optional list of site IDs to filter by fields: optional list of fields to include in the response Returns: List of SiteResult objects, one for each site directory Notes: Returns an empty list if the datasrc directory doesn't exist. """ return manager.get_sites_for_directories(datasrc, ids, fields)
[docs] def get_resources( datasrc: Path, ids: list[int] | None = None, sites: list[int] | None = None, query: str = "", types: list[ResourceResultType] | None = None, fields: list[str] | None = None, statuses: list[int] | None = None, sort: str | None = None, limit: int = RESOURCES_LIMIT_DEFAULT, offset: int = 0, ) -> tuple[list[ResourceResult], int, IndexState]: """ Get resources from wget directories using in-memory SQLite. Args: datasrc: path to the directory containing wget captures ids: optional list of resource IDs to filter by sites: optional list of site IDs to filter by query: search query string types: optional list of resource types to filter by fields: optional list of fields to include in response statuses: optional list of HTTP status codes to filter by sort: sort order for results limit: maximum number of results to return offset: number of results to skip for pagination Returns: Tuple of (list of ResourceResult objects, total count) """ sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites) assert sites_results, "At least one site is required to search" site_paths = [site.path for site in sites_results] sites_group = SitesGroup(datasrc, sites, site_paths) return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)