import sqlite3
import traceback
from datetime import datetime
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Callable
from mcp.types import Tool
from mcp_server_webcrawl.crawlers.base.adapter import (
BaseManager,
IndexState,
IndexStatus,
SitesGroup,
SitesStat,
INDEXED_MANAGER_CACHE_MAX,
INDEXED_RESOURCE_DEFAULT_PROTOCOL,
INDEXED_IGNORE_DIRECTORIES,
)
from mcp_server_webcrawl.crawlers.base.crawler import BaseCrawler
from mcp_server_webcrawl.models.resources import (
ResourceResult,
ResourceResultType,
RESOURCES_DEFAULT_FIELD_MAPPING,
)
from mcp_server_webcrawl.models.sites import (
SiteResult,
SITES_FIELDS_REQUIRED,
SITES_FIELDS_DEFAULT,
)
from mcp_server_webcrawl.utils import to_isoformat_zulu
from mcp_server_webcrawl.utils.logger import get_logger
from mcp_server_webcrawl.utils.tools import get_crawler_tools
logger = get_logger()
[docs]
class IndexedManager(BaseManager):
[docs]
def __init__(self):
super().__init__()
self._db_cache: dict[frozenset, tuple[sqlite3.Connection, IndexState]] = {}
self._build_locks: dict[frozenset, tuple[datetime, str]] = {}
[docs]
def get_connection(self, group: SitesGroup) -> tuple[sqlite3.Connection | None, IndexState]:
"""
Get database connection for sites in the group, creating if needed.
Args:
group: group of sites to connect to
Returns:
Tuple of (SQLite connection to in-memory database with data loaded or None if building,
IndexState associated with this database)
"""
if group.cache_key in self._build_locks:
build_time, status = self._build_locks[group.cache_key]
get_logger().info(f"Database for {group} is currently {status} (started at {build_time})")
return None, IndexState() # Return empty IndexState for building databases
if len(self._db_cache) >= INDEXED_MANAGER_CACHE_MAX:
logger.warning(f"Cache limit reached ({INDEXED_MANAGER_CACHE_MAX}), clearing all cached databases")
self._db_cache.clear()
is_cached: bool = group.cache_key in self._db_cache
self._stats.append(SitesStat(group, is_cached))
if not is_cached:
index_state = IndexState()
index_state.set_status(IndexStatus.INDEXING)
with self._building_lock(group):
connection: sqlite3.Connection = sqlite3.connect(":memory:", check_same_thread=False)
self._setup_database(connection)
for site_id, site_path in group.get_sites().items():
self._load_site_data(connection, Path(site_path), site_id, index_state=index_state)
if index_state.is_timeout():
index_state.set_status(IndexStatus.PARTIAL)
break
if index_state is not None and index_state.status == IndexStatus.INDEXING:
index_state.set_status(IndexStatus.COMPLETE)
self._db_cache[group.cache_key] = (connection, index_state)
# returns cached or newly created connection with IndexState
connection, index_state = self._db_cache[group.cache_key]
return connection, index_state
[docs]
def get_sites_for_directories(
self,
datasrc: Path,
ids: list[int] | None = None,
fields: list[str] | None = None
) -> list[SiteResult]:
"""
List site directories in the datasrc directory as sites.
Args:
datasrc: path to the directory containing site subdirectories
ids: optional list of site IDs to filter by
fields: optional list of fields to include in the response
Returns:
List of SiteResult objects, one for each site directory
Notes:
Returns an empty list if the datasrc directory doesn't exist.
"""
assert datasrc is not None, f"datasrc not provided ({datasrc})"
if not datasrc.exists():
logger.error(f"Directory not found ({datasrc})")
return []
# determine which fields to include
select_fields: set[str] = set(SITES_FIELDS_REQUIRED)
if fields:
valid_fields: set[str] = set(SITES_FIELDS_DEFAULT)
select_fields.update(f for f in fields if f in valid_fields)
else:
select_fields.update(SITES_FIELDS_DEFAULT)
results: list[SiteResult] = []
# get all directories that contain HTTP text files
site_directories = [d for d in datasrc.iterdir() if d.is_dir() and
not d.name.startswith(".") and not d.name in INDEXED_IGNORE_DIRECTORIES]
# map directory IDs to paths for filtering
site_directories_map: dict[int, Path] = {BaseManager.string_to_id(d.name): d for d in site_directories}
if ids:
site_directories_map = {id_val: path for id_val, path in site_directories_map.items() if id_val in ids}
# process each directory
for site_id, site_directory in sorted(site_directories_map.items()):
site_directory_stat = site_directory.stat()
created_time: datetime = datetime.fromtimestamp(site_directory_stat.st_ctime)
modified_time: datetime = datetime.fromtimestamp(site_directory_stat.st_mtime)
# check for robots.txt
robots_content = None
robots_files = list(site_directory.glob("*robots.txt*"))
if robots_files:
try:
with open(robots_files[0], "r", encoding="utf-8", errors="replace") as f:
# for robots.txt files in our format, extract only the content part
content = f.read()
parts = content.split("\n\n", 2)
if len(parts) >= 3:
response_parts = parts[2].split("\n\n", 1)
if len(response_parts) > 1:
robots_content = response_parts[1]
else:
robots_content = response_parts[0]
else:
robots_content = content
except Exception as e:
logger.error(f"Error reading robots.txt")
site = SiteResult(
path=site_directory,
id=site_id,
url=f"{INDEXED_RESOURCE_DEFAULT_PROTOCOL}{site_directory.name}/", # base URL from directory name
created=created_time if "created" in select_fields else None,
modified=modified_time if "modified" in select_fields else None,
robots=robots_content
)
results.append(site)
return results
@contextmanager
def _building_lock(self, group: SitesGroup):
"""
Context manager for database building operations.
Sets a lock during database building and releases it when done.
"""
try:
self._build_locks[group.cache_key] = (datetime.now(), "building")
yield
except Exception as e:
self._build_locks[group.cache_key] = (self._build_locks[group.cache_key][0], f"failed: {str(e)}")
raise # re-raise
finally:
# clean up the lock
self._build_locks.pop(group.cache_key, None)
def _setup_database(self, connection: sqlite3.Connection) -> None:
"""
Create the database schema for storing resource data.
Args:
connection: SQLite connection to set up
"""
# store project/site (also) in fulltext, doesn't suppport >= <=,
# and pure fts search is much faster, want to only introduce
# Resource table sql clauses when field specified (Status,
# Size, or Time explicitly queried)
with closing(connection.cursor()) as cursor:
connection.execute("PRAGMA encoding = \"UTF-8\"")
connection.execute("PRAGMA synchronous = OFF")
connection.execute("PRAGMA journal_mode = MEMORY")
cursor.execute("""
CREATE TABLE Resources (
Id INTEGER PRIMARY KEY,
Project INTEGER NOT NULL,
Created TEXT,
Modified TEXT,
Status INTEGER NOT NULL,
Size INTEGER NOT NULL,
Time INTEGER NOT NULL
)""")
cursor.execute("""
CREATE VIRTUAL TABLE ResourcesFullText USING fts5(
Id,
Project,
Url,
Type,
Headers,
Content,
tokenize="unicode61 remove_diacritics 0 tokenchars '-_'"
)""")
def _execute_batch_insert(self, connection: sqlite3.Connection, cursor: sqlite3.Cursor,
batch_records: list[ResourceResult]) -> None:
"""
Execute batch insert of records with transaction handling.
Inserts data into both ResourcesFullText and Resources tables.
Args:
connection: SQLite connection
cursor: SQLite cursor
batch_records: list of ResourceResult objects ready for insertion
"""
if not batch_records:
return
resources_base_records = []
resources_fts_records = []
for resource in batch_records:
resources_base_records.append((
resource.id,
resource.site,
to_isoformat_zulu(resource.created) if resource.created else None,
to_isoformat_zulu(resource.modified) if resource.modified else None,
resource.status,
resource.size if resource.size is not None else 0,
resource.time if resource.time is not None else 0,
))
resources_fts_records.append((
resource.id,
resource.site,
resource.url,
resource.type.value if resource.type else ResourceResultType.UNDEFINED.value,
resource.headers,
resource.content,
))
try:
connection.execute("BEGIN TRANSACTION")
cursor.executemany("""
INSERT INTO Resources (
Id, Project, Created, Modified, Status, Size, Time
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", resources_base_records)
cursor.executemany("""
INSERT INTO ResourcesFullText (
Id, Project, Url, Type, Headers, Content
) VALUES (?, ?, ?, ?, ?, ?)
""", resources_fts_records)
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
logger.error(f"Error during batch insert: {e}\n{traceback.format_exc()}")
[docs]
class IndexedCrawler(BaseCrawler):
"""
A crawler implementation for data sources that load into an in-memory sqlite.
Shares commonality between specialized crawlers.
"""
[docs]
def __init__(
self,
datasrc: Path,
get_sites_func: Callable,
get_resources_func: Callable,
resource_field_mapping: dict[str, str] = RESOURCES_DEFAULT_FIELD_MAPPING
) -> None:
"""
Initialize the IndexedCrawler with a data source path and required adapter functions.
Args:
datasrc: path to the data source
get_sites_func: function to retrieve sites from the data source
get_resources_func: function to retrieve resources from the data source
resource_field_mapping: mapping of resource field names to display names
"""
assert datasrc.is_dir(), f"{self.__class__.__name__} datasrc must be a directory"
super().__init__(datasrc, get_sites_func, get_resources_func, resource_field_mapping=resource_field_mapping)