Source code for mcp_server_webcrawl.crawlers.siteone.adapter

import os
import re
import sqlite3

from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path

from mcp_server_webcrawl.crawlers.base.adapter import (
    BaseManager,
    IndexState,
    IndexStatus,
    SitesGroup,
    INDEXED_BATCH_SIZE,
    INDEXED_RESOURCE_DEFAULT_PROTOCOL,
    INDEXED_TYPE_MAPPING
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.utils.logger import get_logger
from mcp_server_webcrawl.models.resources import (
    ResourceResult,
    ResourceResultType,
    RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
    SiteResult,
)

# heads up. SiteOne uses wget adapters, this is unintuitive but reasonable as SiteOne
# uses wget for archiving. lean into maximal recycling of wget, if it stops making
# sense switch to homegrown
# from mcp_server_webcrawl.crawlers.wget.adapter import (
#     get_sites,  # hands off, used
# )

logger = get_logger()

[docs] class SiteOneManager(IndexedManager): """ Manages SiteOne directory data in in-memory SQLite databases. Wraps wget archive format (shared by SiteOne and wget) Provides connection pooling and caching for efficient access. """
[docs] def __init__(self) -> None: """Initialize the SiteOne manager with empty cache and statistics.""" super().__init__()
def _extract_log_metadata(self, directory: Path) -> tuple[dict, dict]: """ Extract metadata from SiteOne log files. Args: directory: path to the site directory Returns: Tuple of (success log data, error log data) dictionaries """ directory_name: str = directory.name log_data = {} log_http_error_data = {} log_pattern: str = f"output.{directory_name}.*.txt" log_files = list(Path(directory.parent).glob(log_pattern)) if not log_files: return log_data, log_http_error_data log_latest = max(log_files, key=lambda p: p.stat().st_mtime) try: with open(log_latest, "r", encoding="utf-8") as log_file: for line in log_file: parts = [part.strip() for part in line.split("|")] if len(parts) == 10: parts_path = parts[3].split("?")[0] try: status = int(parts[4]) url = f"{INDEXED_RESOURCE_DEFAULT_PROTOCOL}{directory_name}{parts_path}" time_str = parts[6].split()[0] time = int(float(time_str) * (1000 if "s" in parts[6] else 1)) # size collected for errors, os stat preferred size_str = parts[7].strip() size = 0 if size_str: size_value = float(size_str.split()[0]) size_unit = size_str.split()[1].lower() if len(size_str.split()) > 1 else "b" multiplier = { "b": 1, "kb": 1024, "kB": 1024, "mb": 1024*1024, "MB": 1024*1024, "gb": 1024*1024*1024, "GB": 1024*1024*1024 }.get(size_unit, 1) size = int(size_value * multiplier) if 400 <= status < 600: log_http_error_data[url] = { "status": status, "type": parts[5].lower(), "time": time, "size": size, } else: log_data[url] = { "status": status, "type": parts[5].lower(), "time": time, "size": size, } except (ValueError, IndexError, UnicodeDecodeError, KeyError): continue elif line.strip() == "Redirected URLs": # stop processing we're through HTTP requests break except Exception as e: logger.error(f"Error processing log file {log_latest}: {e}") return log_data, log_http_error_data def _load_site_data(self, connection: sqlite3.Connection, directory: Path, site_id: int, index_state: IndexState = None) -> None: """ Load a SiteOne directory into the database with parallel processing and batch insertions. Args: connection: SQLite connection directory: path to the SiteOne directory site_id: ID for the site index_state: IndexState object for tracking progress """ if not directory.exists() or not directory.is_dir(): logger.error(f"Directory not found or not a directory: {directory}") return if index_state is not None: index_state.set_status(IndexStatus.INDEXING) log_data, log_http_error_data = self._extract_log_metadata(directory) file_paths = [] for root, _, files in os.walk(directory): for filename in files: if filename == "robots.txt" or (filename.startswith("output.") and filename.endswith(".txt")): continue file_paths.append(Path(root) / filename) processed_urls = set() with closing(connection.cursor()) as cursor: for i in range(0, len(file_paths), INDEXED_BATCH_SIZE): if index_state is not None and index_state.is_timeout(): index_state.set_status(IndexStatus.PARTIAL) return batch_paths = file_paths[i:i+INDEXED_BATCH_SIZE] batch_insert_crawled: list[ResourceResult] = [] file_contents = BaseManager.read_files(batch_paths) for file_path in batch_paths: try: result: ResourceResult | None = self._prepare_siteone_record(file_path, site_id, directory, log_data, file_contents.get(file_path)) if result and result.url not in processed_urls: batch_insert_crawled.append(result) processed_urls.add(result.url) if index_state is not None: index_state.increment_processed() except Exception as e: logger.error(f"Error processing file {file_path}: {e}") self._execute_batch_insert(connection, cursor, batch_insert_crawled) # HTTP errors not already processed batch_insert_errors: list[ResourceResult] = [] for url, meta in log_http_error_data.items(): if url not in processed_urls: size = meta.get("size", 0) result = ResourceResult( id=BaseManager.string_to_id(url), site=site_id, url=url, type=ResourceResultType.OTHER, status=meta["status"], headers=BaseManager.get_basic_headers(size, ResourceResultType.OTHER), content="", # no content size=size, # size from log time=meta["time"] ) batch_insert_errors.append(result) if index_state is not None: index_state.increment_processed() # errors in batches too if len(batch_insert_errors) >= INDEXED_BATCH_SIZE: self._execute_batch_insert(connection, cursor, batch_insert_errors) # insert any remaining error records if batch_insert_errors: self._execute_batch_insert(connection, cursor, batch_insert_errors) if index_state is not None and index_state.status == IndexStatus.INDEXING: index_state.set_status(IndexStatus.COMPLETE) def _prepare_siteone_record(self, file_path: Path, site_id: int, base_dir: Path, log_data: dict, content: str = None) -> ResourceResult | None: """ Prepare a record for batch insertion from a SiteOne file. Args: file_path: path to the file site_id: id for the site base_dir: base directory for the capture log_data: dictionary of metadata from logs keyed by URL content: optional pre-loaded file content Returns: Tuple of (record tuple, URL) or None if processing fails """ try: # generate relative url path from file path (similar to wget) relative_path = file_path.relative_to(base_dir) url = f"{INDEXED_RESOURCE_DEFAULT_PROTOCOL}{base_dir.name}/{str(relative_path).replace(os.sep, '/')}" if file_path.is_file(): file_stat = file_path.stat() file_size = file_stat.st_size file_created = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc) file_modified = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc) else: file_created = None file_modified = None file_size = 0 decruftified_path = BaseManager.decruft_path(str(file_path)) extension = Path(decruftified_path).suffix.lower() wget_static_pattern = re.compile(r"\.[0-9a-f]{8,}\.") # look up metadata from log if available, otherwise use defaults metadata = None wget_aliases = list(set([ url, # exact match first re.sub(wget_static_pattern, ".", url), # static pattern url.replace(".html", ""), # file without extension (redirects) url.replace(".html", "/"), # directory style (targets) url.replace("index.html", ""), # index removal ])) for wget_alias in wget_aliases: metadata = log_data.get(wget_alias, None) if metadata is not None: break if metadata is not None: # preventing duplicate html pages ./appstat.html and ./appstat/index.html # prefer index.html (actual content) over redirect stubs canonical_url = None # Sort aliases to prefer index.html files over redirect stubs sorted_aliases = sorted([alias for alias in wget_aliases if log_data.get(alias) == metadata], key=lambda x: (not x.endswith('index.html'), x)) if sorted_aliases: canonical_url = sorted_aliases[0] # Take the preferred one url = canonical_url else: metadata = {} status_code = metadata.get("status", 200) response_time = metadata.get("time", 0) log_type = metadata.get("type", "").lower() if log_type: # no type for redirects, but more often than not # redirection to another page type_mapping = { "html": ResourceResultType.PAGE, "redirect": ResourceResultType.PAGE, "image": ResourceResultType.IMAGE, "js": ResourceResultType.SCRIPT, "css": ResourceResultType.CSS, "video": ResourceResultType.VIDEO, "audio": ResourceResultType.AUDIO, "pdf": ResourceResultType.PDF, "other": ResourceResultType.OTHER, "font": ResourceResultType.OTHER, } resource_type = type_mapping.get(log_type, INDEXED_TYPE_MAPPING.get(extension, ResourceResultType.OTHER)) else: # fallback to extension-based mapping resource_type = INDEXED_TYPE_MAPPING.get(extension, ResourceResultType.OTHER) file_content = content if file_content is None: file_content = BaseManager.read_file_contents(file_path, resource_type) # skip redirect stub files left in SiteOne archive (duplicate, wait for real content) if status_code == 200 and file_content and '<meta http-equiv="refresh" content="0' in file_content: return None record = ResourceResult( id=BaseManager.string_to_id(url), site=site_id, created=file_created, modified=file_modified, url=url, type=resource_type, status=status_code, headers=BaseManager.get_basic_headers(file_size, resource_type), content=file_content, size=file_size, time=response_time # possibly from log ) return record except Exception as e: logger.error(f"Error preparing record for file {file_path}: {e}") return None
manager: SiteOneManager = SiteOneManager()
[docs] def get_sites( datasrc: Path, ids: list[int] | None = None, fields: list[str] | None = None ) -> list[SiteResult]: """ List site directories in the datasrc directory as sites. Args: datasrc: path to the directory containing site subdirectories ids: optional list of site IDs to filter by fields: optional list of fields to include in the response Returns: List of SiteResult objects, one for each site directory Notes: Returns an empty list if the datasrc directory doesn't exist. """ return manager.get_sites_for_directories(datasrc, ids, fields)
[docs] def get_resources( datasrc: Path, sites: list[int] | None = None, query: str = "", fields: list[str] | None = None, sort: str | None = None, limit: int = RESOURCES_LIMIT_DEFAULT, offset: int = 0, ) -> tuple[list[ResourceResult], int, IndexState]: """ Get resources from wget directories using in-memory SQLite. Args: datasrc: path to the directory containing wget captures sites: optional list of site IDs to filter by query: search query string fields: optional list of fields to include in response sort: sort order for results limit: maximum number of results to return offset: number of results to skip for pagination Returns: Tuple of (list of ResourceResult objects, total count) """ sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites) assert sites_results, "At least one site is required to search" site_paths = [site.path for site in sites_results] sites_group = SitesGroup(datasrc, sites, site_paths) return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)