Source code for mcp_server_webcrawl.crawlers.warc.adapter

import email.utils
import os
import sqlite3
import warcio

from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Final
from warcio.recordloader import ArcWarcRecord

from mcp_server_webcrawl.crawlers.base.adapter import (
    IndexState,
    IndexStatus,
    SitesGroup,
    INDEXED_BATCH_SIZE,
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.models.resources import (
    ResourceResult,
    ResourceResultType,
    RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
    SiteResult,
    SITES_FIELDS_DEFAULT,
    SITES_FIELDS_REQUIRED,
)
from mcp_server_webcrawl.utils.logger import get_logger

logger = get_logger()

WARC_FILE_EXTENSIONS: Final[list[str]] = [".warc", ".warc.gz", ".txt"]


[docs] class WarcManager(IndexedManager): """ Manages WARC file data in in-memory SQLite databases. Provides connection pooling and caching for efficient access. """
[docs] def __init__(self) -> None: """Initialize the WARC manager with empty cache and statistics.""" super().__init__()
def _load_site_data(self, connection: sqlite3.Connection, warc_path: Path, site_id: int, index_state: IndexState = None) -> None: """ Load a WARC file into the database with batch processing for better performance. Args: connection: SQLite connection warc_path: path to the WARC file site_id: ID for the site index_state: IndexState object for tracking progress """ if not warc_path.exists() or not warc_path.is_file(): logger.error(f"WARC file not found or not a file: {warc_path}") return with closing(connection.cursor()) as cursor: if index_state is not None: index_state.set_status(IndexStatus.INDEXING) try: batch_insert_resource_results: list[ResourceResult] = [] batch_count: int = 0 with open(warc_path, "rb") as stream: for warc_record in warcio.ArchiveIterator(stream): if index_state is not None and index_state.is_timeout(): index_state.set_status(IndexStatus.PARTIAL) # commit current batch and shut it down if batch_insert_resource_results: self._execute_batch_insert(connection, cursor, batch_insert_resource_results) return if warc_record is not None and warc_record.rec_type == "response": resource_result: ResourceResult = self._prepare_warc_record(warc_record, site_id) if resource_result: batch_insert_resource_results.append(resource_result) if index_state is not None: index_state.increment_processed() batch_count += 1 if batch_count >= INDEXED_BATCH_SIZE: self._execute_batch_insert(connection, cursor, batch_insert_resource_results) batch_insert_resource_results = [] batch_count = 0 # batch insert remaining if batch_insert_resource_results: self._execute_batch_insert(connection, cursor, batch_insert_resource_results) if index_state is not None and index_state.status == IndexStatus.INDEXING: index_state.set_status(IndexStatus.COMPLETE) except Exception as e: logger.error(f"Error processing WARC file {warc_path}: {e}") if index_state is not None: index_state.set_status(IndexStatus.FAILED) def _prepare_warc_record(self, record: ArcWarcRecord, site_id: int) -> ResourceResult | None: """ Prepare a WARC record for batch insertion. Args: record: a warcio record object site_id: ID for the site Returns: Tuple of values ready for insertion, or None if processing fails """ try: url: str = record.rec_headers.get_header("WARC-Target-URI") content_type: str = record.http_headers.get_header("Content-Type", "") status: int = int(record.http_headers.get_statuscode()) or 200 resource_type: ResourceResultType = self._determine_resource_type(content_type) content: bytes = record.content_stream().read() content_size: int = len(content) if self._is_text_content(content_type): try: content_str: str = content.decode("utf-8") except UnicodeDecodeError: content_str = None else: content_str = None warc_date = record.rec_headers.get_header("WARC-Date") if warc_date: try: file_created = datetime.fromisoformat(warc_date.replace('Z', '+00:00')) except ValueError: # Fallback to email date parser try: time_tuple = email.utils.parsedate_tz(warc_date) file_created = datetime.fromtimestamp(email.utils.mktime_tz(time_tuple), tz=timezone.utc) except (ValueError, TypeError): file_created = datetime.now(timezone.utc) else: file_created = None # don't pretend it is now, ResourceResult can survive file_modified = file_created # like file stat indexes, these are equivalent result = ResourceResult( id=IndexedManager.string_to_id(url), site=site_id, created=file_created, modified=file_modified, url=url, type=resource_type, status=status, headers=record.http_headers.to_str(), content=content_str, size=content_size, time=0 # time not available ) return result except Exception as e: logger.error(f"Error processing WARC record for URL {url if 'url' in locals() else 'unknown'}: {e}") return None
manager: WarcManager = WarcManager()
[docs] def get_sites( datasrc: Path, ids: list[int] | None = None, fields: list[str] | None = None ) -> list[SiteResult]: """ List WARC files in the datasrc directory as sites. Args: datasrc: path to the directory containing WARC files ids: optional list of site IDs to filter by fields: list of fields to include in the response Returns: List of SiteResult objects, one for each WARC file """ assert datasrc is not None, f"datasrc not provided ({datasrc})" # nothing can be done, but don't crash the server either, keep chugging along if not datasrc.exists(): logger.error(f"Directory not found ({datasrc})") return [] # determine which fields to include selected_fields: set[str] = set(SITES_FIELDS_REQUIRED) if fields: valid_fields: set[str] = set(SITES_FIELDS_DEFAULT) selected_fields.update(f for f in fields if f in valid_fields) else: selected_fields.update(SITES_FIELDS_DEFAULT) results: list[SiteResult] = [] files_to_check: list[Path] = [] for ext in WARC_FILE_EXTENSIONS: files_to_check.extend(datasrc.glob(f"*{ext}")) # map of file_id -> file_path for filtering file_id_map: dict[int, Path] = {WarcManager.string_to_id(str(os.path.basename(f))): f for f in files_to_check if f is not None} if ids: file_id_map = {id_val: path for id_val, path in file_id_map.items() if id_val in ids} for site_id, file_path in sorted(file_id_map.items()): file_stat = file_path.stat() created_time: datetime = datetime.fromtimestamp(file_stat.st_ctime) modified_time: datetime = datetime.fromtimestamp(file_stat.st_mtime) site: SiteResult = SiteResult( path=file_path, id=site_id, url=str(file_path.absolute()), created=created_time if "created" in selected_fields else None, modified=modified_time if "modified" in selected_fields else None, ) results.append(site) return results
[docs] def get_resources( datasrc: Path, sites: list[int] | None = None, query: str = "", fields: list[str] | None = None, sort: str | None = None, limit: int = RESOURCES_LIMIT_DEFAULT, offset: int = 0, ) -> tuple[list[ResourceResult], int, IndexState]: """ Get resources from wget directories using in-memory SQLite. Args: datasrc: path to the directory containing wget captures sites: optional list of site IDs to filter by query: search query string fields: optional list of fields to include in response sort: sort order for results limit: maximum number of results to return offset: number of results to skip for pagination Returns: Tuple of (list of ResourceResult objects, total count) """ sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites) assert sites_results, "At least one site is required to search" site_paths = [site.path for site in sites_results] sites_group = SitesGroup(datasrc, sites, site_paths) return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)