import os
import hashlib
import mimetypes
import re
import sqlite3
import traceback
from contextlib import closing
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
from dataclasses import dataclass
from datetime import timezone
from typing import Final
from mcp_server_webcrawl.models.resources import (
ResourceResult,
ResourceResultType,
RESOURCES_DEFAULT_FIELD_MAPPING,
RESOURCES_DEFAULT_SORT_MAPPING,
RESOURCES_FIELDS_REQUIRED,
RESOURCES_ENUMERATED_TYPE_MAPPING,
RESOURCES_LIMIT_MAX,
)
from mcp_server_webcrawl.utils import to_isoformat_zulu, from_isoformat_zulu
from mcp_server_webcrawl.utils.search import SearchQueryParser, SearchSubquery
from mcp_server_webcrawl.utils.logger import get_logger
logger = get_logger()
# in the interest of sane imports (avoiding circulars), indexed constants live here,
# happily, as denizens of adapterville
INDEXED_BINARY_EXTENSIONS: Final[tuple[str, ...]] = (
".woff",".woff2",".ttf",".otf",".eot",
".jpeg",".jpg",".png",".webp",".gif",".bmp",".tiff",".tif",".svg",".ico",".heic",".heif",
".mp3",".wav",".ogg",".flac",".aac",".m4a",".wma",
".mp4",".webm",".avi",".mov",".wmv",".mkv",".flv",".m4v",".mpg",".mpeg",
".pdf",".doc",".docx",".xls",".xlsx",".ppt",".pptx",
".zip",".rar",".7z",".tar",".gz",".bz2",".xz",
".exe",".dll",".so",".dylib",".bin",".apk",".app",
".swf",".svgz",".dat",".db",".sqlite",".class",".pyc",".o"
)
# files on disk will need default for reassembly {proto}{dir}
# these things are already approximations (perhaps) having passed through wget filtering (--adjust-extension)
# representative of the file on disk, also https is what the LLM is going to guess in all cases
INDEXED_RESOURCE_DEFAULT_PROTOCOL: Final[str] = "https://"
INDEXED_BATCH_SIZE: Final[int] = 256
INDEXED_MAX_WORKERS: Final[int] = min(8, os.cpu_count() or 4)
INDEXED_MAX_FILE_SIZE: Final[int] = 2000000 # 2MB
# max indexing time may need a cli arg to override at some point,
# but for now, this is a fan spinner--just make sure it doesn't run away
INDEXED_MAX_PROCESS_TIME: Final[timedelta] = timedelta(minutes=10)
# maximum indexes held in cache, an index is a unique list[site-ids] argument
INDEXED_MANAGER_CACHE_MAX: Final[int] = 20
INDEXED_IGNORE_DIRECTORIES: Final[list[str]] = ["http-client-cache", "result-storage"]
INDEXED_TYPE_MAPPING: Final[dict[str, ResourceResultType]] = {
"": ResourceResultType.PAGE,
".html": ResourceResultType.PAGE,
".htm": ResourceResultType.PAGE,
".php": ResourceResultType.PAGE,
".asp": ResourceResultType.PAGE,
".aspx": ResourceResultType.PAGE,
".js": ResourceResultType.SCRIPT,
".css": ResourceResultType.CSS,
".jpg": ResourceResultType.IMAGE,
".jpeg": ResourceResultType.IMAGE,
".png": ResourceResultType.IMAGE,
".gif": ResourceResultType.IMAGE,
".svg": ResourceResultType.IMAGE,
".tif": ResourceResultType.IMAGE,
".tiff": ResourceResultType.IMAGE,
".webp": ResourceResultType.IMAGE,
".bmp": ResourceResultType.IMAGE,
".pdf": ResourceResultType.PDF,
".txt": ResourceResultType.TEXT,
".xml": ResourceResultType.TEXT,
".json": ResourceResultType.TEXT,
".doc": ResourceResultType.DOC,
".docx": ResourceResultType.DOC,
".mov": ResourceResultType.VIDEO,
".mp4": ResourceResultType.VIDEO,
".mp3": ResourceResultType.AUDIO,
".ogg": ResourceResultType.AUDIO,
}
[docs]
class IndexStatus(Enum):
UNDEFINED = ""
IDLE = "idle"
INDEXING = "indexing"
PARTIAL = "partial" # incomplete, but stable and searchable (timeout)
COMPLETE = "complete"
REMOTE = "remote"
FAILED = "failed"
[docs]
@dataclass
class IndexState:
"""Shared state between crawler and manager for indexing progress"""
status: IndexStatus = IndexStatus.UNDEFINED
processed: int = 0
time_start: datetime | None = None
time_end: datetime | None = None
[docs]
def set_status(self, status: IndexStatus):
if self.status == IndexStatus.UNDEFINED:
self.time_start = datetime.now(timezone.utc)
self.processed = 0
self.time_end = None
elif status in (IndexStatus.COMPLETE, IndexStatus.PARTIAL):
if self.time_end is None:
self.time_end = datetime.now(timezone.utc)
if status == IndexStatus.PARTIAL:
logger.info(f"Indexing timeout ({INDEXED_MAX_PROCESS_TIME} minutes) reached. \
Index status has been set to PARTIAL, and further indexing halted.")
self.status = status
[docs]
def increment_processed(self):
self.processed += 1
@property
def duration(self) -> str:
if not self.time_start:
return "00:00:00.000"
end = self.time_end or datetime.now(timezone.utc)
total_seconds = (end - self.time_start).total_seconds()
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
milliseconds = int((total_seconds % 1) * 1000)
# HH:MM:SS.mmm
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}"
[docs]
def is_timeout(self) -> bool:
"""Check if the indexing operation has exceeded the timeout threshold"""
if not self.time_start:
return False
return (datetime.now(timezone.utc) - self.time_start) > INDEXED_MAX_PROCESS_TIME
[docs]
def to_dict(self) -> dict:
"""Convert the IndexState to a dictionary representation"""
status = self.status.value if hasattr(self.status, 'value') else self.status
result = { "status": status }
if self.status not in (IndexStatus.REMOTE, IndexStatus.UNDEFINED):
result["processed"] = self.processed
result["time_start"] = to_isoformat_zulu(self.time_start) if self.time_start else None
result["time_end"] = to_isoformat_zulu(self.time_end) if self.time_end else None
result["duration"] = self.duration
return result
[docs]
class SitesGroup:
[docs]
def __init__(self, datasrc: Path, site_ids: list[int], site_paths: list[Path]) -> None:
"""
Container class supports the searching of one or more sites at once.
Args:
datasrc: site datasrc
site_ids: site ids of the sites
site_paths: paths to site contents (directories)
"""
self.datasrc: Path = datasrc
self.ids: list[int] = site_ids
self.paths: list[Path] = site_paths
self.cache_key = frozenset(map(str, site_ids))
def __str__(self) -> str:
return f"[SitesGroup {self.cache_key}]"
[docs]
def get_sites(self) -> dict[int, str]:
# unwrap { id1: path1, id2: path2 }
return {site_id: str(path) for site_id, path in zip(self.ids, self.paths)}
[docs]
class SitesStat:
[docs]
def __init__(self, group: SitesGroup, cached: bool) -> None:
"""
Some basic bookeeping, for troubleshooting
"""
self.group: Final[SitesGroup] = group
self.timestamp: Final[datetime] = datetime.now()
self.cached: Final[bool] = cached
[docs]
class BaseManager:
"""
Base class for managing web crawler data in in-memory SQLite databases.
Provides connection pooling and caching for efficient access.
"""
[docs]
def __init__(self) -> None:
"""Initialize the manager with statistics."""
self._stats: list[SitesStat] = []
[docs]
@staticmethod
def string_to_id(value: str) -> int:
"""
Convert a string, such as a directory name, to a numeric ID
suitable for a database primary key.
Hash space and collision probability notes:
- [:8] = 32 bits (4.29 billion values) - ~1% collision chance with 10,000 items
- [:12] = 48 bits (280 trillion values) - ~0.0000001% collision chance with 10,000 items
- [:16] = 64 bits (max safe SQLite INTEGER) - near-zero collision, 9.22 quintillion values
- SQLite INTEGER type is 64-bit signed, with max value of 9,223,372,036,854,775,807.
- The big problem with larger hashspaces is the length of the ids they generate for presentation.
Args:
value: Input string to convert to an ID
Returns:
Integer ID derived from the input string
"""
hash_obj = hashlib.sha1(value.encode())
return int(hash_obj.hexdigest()[:12], 16)
[docs]
@staticmethod
def read_files(paths: list[Path]) -> dict[Path, str | None]:
file_contents: dict[Path, str | None] = {}
with ThreadPoolExecutor(max_workers=INDEXED_MAX_WORKERS) as executor:
for file_path, content in executor.map(BaseManager.__read_files_contents, paths):
if content is not None:
file_contents[file_path] = content
return file_contents
@staticmethod
def __read_files_contents(file_path) -> tuple[Path, str | None]:
"""
Read content from text files with better error handling and encoding detection.
"""
# a null result just means we're not dealing with the content
# which has been determined to be binary or of unknown format
# we can still maintain a record the URL/headers/whatever as Resource
null_result: tuple[Path, str] = file_path, None
extension = os.path.splitext(file_path)[1].lower()
if (extension in INDEXED_BINARY_EXTENSIONS or
os.path.getsize(file_path) > INDEXED_MAX_FILE_SIZE):
return null_result
mime_type, _ = mimetypes.guess_type(file_path)
mime_text_exceptions = ["application/json", "application/xml", "application/javascript"]
if mime_type and not mime_type.startswith("text/") and mime_type not in mime_text_exceptions:
return null_result
content = None
try:
# errors="ignore" or "replace" required to read Katana txt files with
# data payloads and still capture url, headers, etc. replace supposedly
# softer touch generally, but not any better for Katana specifically
# as payload will not be stored
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
except UnicodeDecodeError:
logger.debug(f"Could not decode file as UTF-8: {file_path}")
return null_result
except Exception as e:
logger.error(f"Error reading file {file_path}")
return null_result
return file_path, content
[docs]
@staticmethod
def read_file_contents(file_path, resource_type) -> str | None:
"""Read content from text files with better error handling and encoding detection."""
if resource_type not in [ResourceResultType.PAGE, ResourceResultType.TEXT,
ResourceResultType.CSS, ResourceResultType.SCRIPT, ResourceResultType.OTHER]:
return None
if os.path.getsize(file_path) > INDEXED_MAX_FILE_SIZE:
return None
extension = os.path.splitext(file_path)[1].lower()
if extension in INDEXED_BINARY_EXTENSIONS:
return None
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type and not mime_type.startswith("text/"):
if not any(mime_type.startswith(prefix) for prefix in ["application/json",
"application/xml", "application/javascript"]):
return None
content = None
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
except UnicodeDecodeError:
logger.warning(f"Could not decode file as UTF-8: {file_path}")
return content
[docs]
@staticmethod
def decruft_path(path:str) -> str:
"""
Very light touch cleanup of file naming, these tmps are creating noise
and extensions are useful in classifying resources
"""
# clean path/file from wget modifications we don't want
decruftified = str(path)
decruftified = decruftified.lower()
decruftified = re.sub(r"[\u00b7·]?\d+\.tmp|\d{12}|\.tmp", "", decruftified)
# clean extension from non alpha
# S1/wget can generate some weird extensions with URL args
# filenames such as main.min.js202505251919
decruftified = re.sub(r'\.(\w+)[^\w]*$', r'.\1', decruftified)
return decruftified
[docs]
def get_stats(self) -> list[SitesStat]:
return self._stats.copy()
[docs]
def get_resources_for_sites_group(
self,
sites_group: SitesGroup,
query: str,
fields: list[str] | None,
sort: str | None,
limit: int,
offset: int,
swap_values: dict = {}
) -> tuple[list[ResourceResult], int, IndexState]:
"""
Get resources from directories using structured query parsing with SearchQueryParser.
This method extracts types, fields, and statuses from the querystring instead of
accepting them as separate arguments, using the new SearchSubquery functionality.
Args:
sites_group: Group of sites to search in
query: Search query string that can include field:value syntax for filtering
fields: resource fields to be returned by the API (Content, Headers, etc.)
sort: Sort order for results
limit: Maximum number of results to return
offset: Number of results to skip for pagination
swap_values: per-field parameterized values to check for (and replace)
Returns:
Tuple of (list of ResourceResult objects, total count, connection_index_state)
Notes:
Returns empty results if sites is empty or not provided.
If the database is being built, it will log a message and return empty results.
This method extracts field-specific filters from the query string using SearchQueryParser:
- type:html (to filter by resource type)
- status:200 (to filter by HTTP status)
Any fields present in the SearchSubquery will be included in the response.
"""
# get_connection must be defined in subclass
assert hasattr(self, "get_connection"), "get_connection not found"
null_result: tuple[list[ResourceResult], int, IndexState | None] = [], 0, None
# get sites arg from group
sites: list[int] = sites_group.ids
if not sites or not sites_group or len(sites) == 0:
return null_result
connection: sqlite3.Connection
connection_index_state: IndexState
connection, connection_index_state = self.get_connection(sites_group)
if connection is None:
# database is currently being built
logger.info(f"Database for sites {sites} is currently being built, try again later")
return null_result
parser = SearchQueryParser()
parsed_query = []
if query.strip():
try:
parsed_query = parser.parse(query.strip())
except Exception as e:
logger.error(f"Error parsing query: {e}")
# fall back to simple text search
parsed_query = []
# if status not explicitly in query, add status >=100
status_applied = False
for squery in parsed_query:
if squery.field == "status":
status_applied = True
break
if not status_applied:
http_status_received = SearchSubquery("status", 100, "term", [], "AND", comparator=">=")
parsed_query.append(http_status_received)
# determine fields to be retrieved
selected_fields: set[str] = set(RESOURCES_FIELDS_REQUIRED)
if fields:
selected_fields.update(f for f in fields if f in RESOURCES_DEFAULT_FIELD_MAPPING)
safe_sql_fields = [RESOURCES_DEFAULT_FIELD_MAPPING[f] for f in selected_fields]
assert all(re.match(r'^[A-Za-z\.]+$', field) for field in safe_sql_fields), "Unknown or unsafe field requested"
safe_sql_fields_joined: str = ", ".join(safe_sql_fields)
from_clause = "ResourcesFullText LEFT JOIN Resources ON ResourcesFullText.Id = Resources.Id"
where_clauses: list[str] = []
params: dict[str, int | str] = {}
if sites:
placeholders: list[str] = [f":sites{i}" for i in range(len(sites))]
where_clauses.append(f"ResourcesFullText.Project IN ({','.join(placeholders)})")
params.update({f"sites{i}": id_val for i, id_val in enumerate(sites)})
if parsed_query:
fts_parts, fts_params = parser.to_sqlite_fts(parsed_query, swap_values)
if fts_parts:
fts_where = ""
for part in fts_parts:
if part in ["AND", "OR", "NOT"]: # operator
fts_where += f" {part} "
else: # condition
fts_where += part
# fts subquery as a single condition in parentheses
if fts_where:
where_clauses.append(f"({fts_where})")
for param_name, param_value in fts_params.items():
params[param_name] = param_value
where_clause: str = f" WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
if sort in RESOURCES_DEFAULT_SORT_MAPPING:
field, direction = RESOURCES_DEFAULT_SORT_MAPPING[sort]
if direction == "RANDOM":
order_clause: str = " ORDER BY RANDOM()"
else:
order_clause = f" ORDER BY {field} {direction}"
else:
order_clause = " ORDER BY Resources.Id ASC"
limit = min(max(1, limit), RESOURCES_LIMIT_MAX)
limit_clause: str = f" LIMIT {limit} OFFSET {offset}"
statement: str = f"SELECT {safe_sql_fields_joined} FROM {from_clause}{where_clause}{order_clause}{limit_clause}"
results: list[ResourceResult] = []
total_count: int = 0
try:
with closing(connection.cursor()) as cursor:
cursor.execute(statement, params)
rows = cursor.fetchall()
if rows:
column_names = [description[0].lower() for description in cursor.description]
for row in rows:
row_dict = {column_names[i]: row[i] for i in range(len(column_names))}
type_value = row_dict.get("type", "")
resource_type = ResourceResultType.UNDEFINED
# map the type string back to enum
for rt in ResourceResultType:
if rt.value == type_value:
resource_type = rt
break
if resource_type == ResourceResultType.UNDEFINED and isinstance(type_value, int):
if type_value in RESOURCES_ENUMERATED_TYPE_MAPPING:
resource_type = RESOURCES_ENUMERATED_TYPE_MAPPING[type_value]
result = ResourceResult(
id=row_dict.get("id"),
site=row_dict.get("project"),
created=from_isoformat_zulu(row_dict.get("created")),
modified=from_isoformat_zulu(row_dict.get("modified")),
url=row_dict.get("url", ""),
type=resource_type,
name=row_dict.get("name"),
headers=row_dict.get("headers"),
content=row_dict.get("content") if "content" in selected_fields else None,
status=row_dict.get("status"),
size=row_dict.get("size"),
time=row_dict.get("time"),
metadata=None,
)
results.append(result)
# get total count
if len(results) < limit:
total_count = offset + len(results)
else:
count_statement = f"SELECT COUNT(*) as total FROM {from_clause}{where_clause}"
cursor.execute(count_statement, params)
count_row = cursor.fetchone()
total_count = count_row[0] if count_row else 0
except sqlite3.Error as ex:
logger.error(f"SQLite error in structured query: {ex}\n{statement}\n{traceback.format_exc()}")
return null_result
return results, total_count, connection_index_state
def _load_site_data(self, connection: sqlite3.Connection, site_path: Path,
site_id: int, index_state: IndexState = None) -> None:
"""
Load site data into the database. To be implemented by subclasses.
Args:
connection: SQLite connection
site_path: Path to the site data
site_id: ID for the site
"""
raise NotImplementedError("Subclasses must implement _load_site_data")
def _determine_resource_type(self, content_type: str) -> ResourceResultType:
content_type_mapping = {
"html": ResourceResultType.PAGE,
"javascript": ResourceResultType.SCRIPT,
"css": ResourceResultType.CSS,
"image/": ResourceResultType.IMAGE,
"pdf": ResourceResultType.PDF,
"text/": ResourceResultType.TEXT,
"audio/": ResourceResultType.AUDIO,
"video/": ResourceResultType.VIDEO,
"application/json": ResourceResultType.TEXT,
"application/xml": ResourceResultType.TEXT
}
content_type = content_type.lower()
for pattern, res_type in content_type_mapping.items():
if pattern in content_type:
return res_type
return ResourceResultType.OTHER
def _is_text_content(self, content_type: str) -> bool:
"""
Check if content should be stored as text. Filter out deadweight content in fts index.
"""
content_type_lower = content_type.lower()
if content_type_lower.startswith("text/"):
return True
elif content_type_lower.startswith(("font/", "image/", "audio/", "video/", "application/octet-stream")):
return False
elif content_type_lower.startswith("application/"):
return content_type_lower in {
"application/atom+xml", "application/ld+json", "application/rss+xml",
"application/x-www-form-urlencoded", "application/xml", "application/json",
"application/javascript"
}
else:
return True