import json
import os
import sqlite3
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
from mcp_server_webcrawl.crawlers.base.adapter import (
BaseManager,
IndexState,
IndexStatus,
SitesGroup,
INDEXED_BATCH_SIZE,
INDEXED_TYPE_MAPPING,
INDEXED_IGNORE_DIRECTORIES,
)
from mcp_server_webcrawl.crawlers.base.indexed import IndexedManager
from mcp_server_webcrawl.models.resources import (
ResourceResult,
ResourceResultType,
RESOURCES_LIMIT_DEFAULT,
)
from mcp_server_webcrawl.models.sites import (
SiteResult,
SITES_FIELDS_BASE,
SITES_FIELDS_DEFAULT,
)
from mcp_server_webcrawl.utils.logger import get_logger
logger = get_logger()
[docs]
class ArchiveBoxManager(IndexedManager):
"""
Manages ArchiveBox in-memory SQLite databases for session-level reuse.
"""
[docs]
def __init__(self) -> None:
"""
Initialize the ArchiveBox manager with empty cache and statistics.
"""
super().__init__()
def _load_site_data(self, connection: sqlite3.Connection, site_directory: Path,
site_id: int, index_state: IndexState = None) -> None:
"""
Load ArchiveBox site data into the database.
Args:
connection: SQLite connection
site_directory: path to the ArchiveBox site directory (e.g., "example" or "pragmar")
site_id: ID for the site
index_state: IndexState object for tracking progress
"""
# The site_directory should be something like "example" or "pragmar"
# We need to look for the "archive" subdirectory within it
archive_directory: Path = site_directory / "archive"
if not archive_directory.exists() or not archive_directory.is_dir():
logger.error(f"Archive directory not found in site: {archive_directory}")
return
if index_state is not None:
index_state.set_status(IndexStatus.INDEXING)
# page directories are timestamped (e.g. example/archive/1756357684.13023)
# these contiain page data/media
page_directories = self._get_page_directories(archive_directory)
if not page_directories:
logger.warning(f"No timestamped entries found in archive: {archive_directory}")
return
all_resources: list[ResourceResult] = []
# process each timestamped entry
for page_directory in page_directories:
if index_state is not None and index_state.is_timeout():
index_state.set_status(IndexStatus.PARTIAL)
break
try:
metadata = self._get_page_metadata(page_directory)
main_url: str = metadata["url"] if "url" in metadata else \
f"archivebox://unknown/{page_directory.name}"
# primary resource
main_resource = self._create_page_resource(page_directory, site_id, main_url, metadata)
if main_resource:
all_resources.append(main_resource)
if index_state is not None:
index_state.increment_processed()
# collect assets (external js/css/fonts/whatever)
domain_assets = self._get_page_domain_assets(page_directory, main_url)
for file_path, asset_url in domain_assets:
asset_resource = self._create_asset_resource(file_path, site_id, asset_url, page_directory)
if asset_resource:
all_resources.append(asset_resource)
if index_state is not None:
index_state.increment_processed()
except Exception as ex:
logger.error(f"Error processing entry {page_directory}: {ex}")
deduplicated_resources = self._dedupe_resources(all_resources)
with closing(connection.cursor()) as cursor:
for i in range(0, len(deduplicated_resources), INDEXED_BATCH_SIZE):
batch = deduplicated_resources[i:i+INDEXED_BATCH_SIZE]
self._execute_batch_insert(connection, cursor, batch)
if index_state is not None and index_state.status == IndexStatus.INDEXING:
index_state.set_status(IndexStatus.COMPLETE)
def _create_page_resource(self, resource_directory: Path, site_id: int,
url: str, metadata: dict) -> ResourceResult | None:
"""
Create ResourceResult for the main captured page.
"""
try:
# created/modified is directory stat
resource_stat: os.stat_result = resource_directory.stat()
created: datetime = datetime.fromtimestamp(resource_stat.st_ctime, tz=timezone.utc)
modified: datetime = datetime.fromtimestamp(resource_stat.st_mtime, tz=timezone.utc)
# select best content, with appropriate fallbacks
html_file: Path = None
if "canonical" in metadata:
# dom first, wget second, ignore singlefile (datauris generate too much storage)
canonical: dict[str, str] = metadata["canonical"]
prioritized_paths = ["dom_path", "wget_path"]
for path_key in prioritized_paths:
if path_key in canonical and canonical[path_key] is not None:
candidate_file = resource_directory / canonical[path_key]
if candidate_file.resolve().is_relative_to(resource_directory.resolve()) and candidate_file.exists():
html_file = candidate_file
break
# fallback to ArchiveBox index file (metadata file - barely useful, but dependable)
if html_file is None:
html_file = resource_directory / "index.html"
# read content
content: str|None = None
file_size: int = 0
if html_file.exists():
try:
with open(html_file, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
file_size: int = html_file.stat().st_size
except Exception as ex:
logger.warning(f"Could not read HTML from {html_file}: {ex}")
# assemble metadata
status_code: int = 200
headers_reconstructed: str = ""
if "http_headers" in metadata:
http_headers = metadata["http_headers"]
if "status" in http_headers:
try:
status_code = int(str(http_headers["status"]).split()[0])
except (ValueError, IndexError):
pass
headers_reconstructed = self._get_http_headers_string(http_headers)
if not headers_reconstructed:
headers_reconstructed = BaseManager.get_basic_headers(
file_size, ResourceResultType.PAGE)
return ResourceResult(
id=BaseManager.string_to_id(url),
site=site_id,
created=created,
modified=modified,
url=url,
type=ResourceResultType.PAGE,
status=status_code,
headers=headers_reconstructed,
content=content,
size=file_size,
time=0
)
except Exception as ex:
logger.error(f"Error creating main resource for {resource_directory}: {ex}")
return None
def _create_asset_resource(self, file_path: Path, site_id: int, url: str, entry_dir: Path) -> ResourceResult | None:
"""
Create ResourceResult for a domain asset file.
"""
try:
# get file info
if not file_path.exists():
return None
file_stat = file_path.stat()
created: datetime = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc)
modified: datetime = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc)
file_size: int = file_stat.st_size
extension: str = file_path.suffix.lower()
# ArchiveBox will stuff URL args into @... in the filename
# sometimes it's the filename, sometimes the extension
# both need cleaning
clean_url: str = url.split("@")[0]
clean_extension: str = extension.split("@")[0]
resource_type: str = INDEXED_TYPE_MAPPING.get(clean_extension, ResourceResultType.OTHER)
# read content for text files
content: str | None = BaseManager.read_file_contents(file_path, resource_type)
return ResourceResult(
id=BaseManager.string_to_id(clean_url),
site=site_id,
created=created,
modified=modified,
url=clean_url,
type=resource_type,
status=200, # assume assets successful
headers=BaseManager.get_basic_headers(file_size, resource_type, file_path),
content=content,
size=file_size,
time=0
)
except Exception as ex:
logger.error(f"Error creating asset resource for {file_path}: {ex}")
return None
def _get_page_directories(self, archive_directory: Path) -> list[Path]:
"""
Get webpage directories within ArchiveBox archive.
Args:
archive_directory: path to the ArchiveBox archive directory
Returns:
List of timestamped entry directory paths
"""
# page_directories are the timestamped directories,
# e.g. archive/1756342555.086082
page_directories = []
if not archive_directory.is_dir():
return page_directories
for item in archive_directory.iterdir():
# 1756342555.086082.replace(".", "") is numeric
if (item.is_dir() and item.name.replace(".", "").isdigit()):
data_files: list[Path] = [
(item / "index.json"),
(item / "headers.json"),
(item / "index.html"),
]
for data_file in data_files:
if data_file.exists():
page_directories.append(item)
break
return sorted(page_directories)
def _get_page_metadata(self, entry_directory: Path) -> dict:
"""
Extract metadata from ArchiveBox entry files.
Args:
entry_directory: path to the timestamped entry directory
Returns:
Dictionary containing extracted metadata
"""
page_metadata: dict[str, str] = {}
# read index.json for primary URL and metadata
index_json_path: Path = entry_directory / "index.json"
if index_json_path.exists():
try:
with open(index_json_path, "r", encoding="utf-8", errors="replace") as f:
index_data = json.load(f)
page_metadata.update(index_data)
except (json.JSONDecodeError, UnicodeDecodeError) as ex:
logger.warning(f"Could not parse index.json from {entry_directory}: {ex}")
except Exception as ex:
logger.error(f"Error reading index.json from {entry_directory}: {ex}")
# read headers.json for HTTP headers
headers_json_path = entry_directory / "headers.json"
if headers_json_path.exists():
try:
with open(headers_json_path, "r", encoding="utf-8", errors="replace") as f:
http_headers = json.load(f)
page_metadata["http_headers"] = http_headers
except (json.JSONDecodeError, UnicodeDecodeError) as ex:
logger.warning(f"Could not parse headers.json from {entry_directory}: {ex}")
except Exception as ex:
logger.error(f"Error reading headers.json from {entry_directory}: {ex}")
return page_metadata
def _get_page_domain_assets(self, entry_dir: Path, main_url: str) -> list[tuple[Path, str]]:
"""
Collect all domain asset files within an entry.
Args:
entry_dir: path to the timestamped entry
main_url: the main captured URL
Returns:
List of (file_path, reconstructed_url) tuples
"""
assets: list[tuple] = []
# skip metadata directories
skip_directories: set[str] = {"media", "mercury"}
collapse_filenames: list[str] = ["/index.html", "/index.htm"]
for item in entry_dir.iterdir():
if item.is_dir() and item.name not in skip_directories:
# this is an archivebox domain directory
domain_name: str = item.name
# walk domain directories for assets
# (e.g. example/archive/1756357684.13023/example.com)
for root, _, files in os.walk(item):
for filename in files:
# *orig$ are dupes, not reliably in fileext form
if filename.endswith("orig"):
continue
file_path = Path(root) / filename
# clean up ArchiveBox's @timestamp suffixes for URL construction
clean_filename: str = filename.split("@")[0]
clean_file_path: Path = Path(root) / clean_filename
relative_path = clean_file_path.relative_to(item)
url = f"https://{domain_name}/{str(relative_path).replace(os.sep, '/')}"
for collapse_filename in collapse_filenames:
# turn ./index.html and variants into ./ (dir index) to help the indexer
if url.endswith(collapse_filename):
url = url[:-(len(collapse_filename))] + "/"
break
# Use original file_path for reading, clean url for storage
assets.append((file_path, url))
return assets
def _dedupe_resources(self, resources: list[ResourceResult]) -> list[ResourceResult]:
"""
Deduplicate resources based on URL and metadata
Args:
resources: list of ResourceResult objects
Returns:
Deduplicated list of ResourceResult objects
"""
seen_urls: dict[str, ResourceResult] = {}
deduplicated: list[ResourceResult] = []
resource: ResourceResult
for resource in resources:
if resource.url in seen_urls:
# url collision, check if content differs, prefer newer
existing = seen_urls[resource.url]
if resource.modified and existing.modified:
if resource.modified > existing.modified:
deduplicated = [r for r in deduplicated if r.url != resource.url]
deduplicated.append(resource)
seen_urls[resource.url] = resource
else:
# keep existing
seen_urls[resource.url] = resource
deduplicated.append(resource)
return deduplicated
def _get_http_headers_string(self, http_headers: dict) -> str:
"""
Format headers dictionary as HTTP headers string.
"""
if not http_headers:
return ""
headers_lines: list[str] = []
status: int = http_headers.get("Status-Code", 200)
headers_lines.append(f"HTTP/1.0 {status}")
for key, value in http_headers.items():
if key.lower() not in ["status-code"]:
headers_lines.append(f"{key}: {value}")
return "\r\n".join(headers_lines) + "\r\n\r\n"
manager: ArchiveBoxManager = ArchiveBoxManager()
[docs]
def get_sites(
datasrc: Path,
ids: list[int] | None = None,
fields: list[str] | None = None
) -> list[SiteResult]:
"""
List ArchiveBox instances as separate sites.
Each subdirectory of datasrc that contains an "archive" folder is treated as a separate ArchiveBox instance.
Args:
datasrc: path to the directory containing ArchiveBox instance directories
ids: optional list of site IDs to filter by
fields: optional list of fields to include in the response
Returns:
List of SiteResult objects, one for each ArchiveBox instance
"""
assert datasrc is not None, f"datasrc not provided ({datasrc})"
if not datasrc.exists():
logger.error(f"Directory not found ({datasrc})")
return []
# determine which fields to include
selected_fields: set[str] = set(SITES_FIELDS_BASE)
if fields:
valid_fields: set[str] = set(SITES_FIELDS_DEFAULT)
selected_fields.update(f for f in fields if f in valid_fields)
else:
selected_fields.update(SITES_FIELDS_DEFAULT)
results: list[SiteResult] = []
# get all directories that contain an "archive" subdirectory
site_directories: list[Path] = []
for datasrc_item in datasrc.iterdir():
if (
datasrc_item.is_dir() and
not datasrc_item.name.startswith(".") and
datasrc_item.name not in INDEXED_IGNORE_DIRECTORIES and
(datasrc_item / "archive").is_dir()
):
site_directories.append(datasrc_item)
# map directory IDs to paths for filtering
site_directories_map: dict[int, Path] = {BaseManager.string_to_id(d.name): d for d in site_directories}
if ids:
site_directories_map = {id_val: path for id_val, path in site_directories_map.items() if id_val in ids}
# process each ArchiveBox instance directory
for site_id, site_directory in sorted(site_directories_map.items()):
site_directory_stat = site_directory.stat()
created_time: datetime = datetime.fromtimestamp(site_directory_stat.st_ctime)
modified_time: datetime = datetime.fromtimestamp(site_directory_stat.st_mtime)
site = SiteResult(
path=site_directory,
id=site_id,
url=f"archivebox://{site_directory.name}/",
created=created_time if "created" in selected_fields else None,
modified=modified_time if "modified" in selected_fields else None,
)
results.append(site)
return results
[docs]
def get_resources(
datasrc: Path,
sites: list[int] | None = None,
query: str = "",
fields: list[str] | None = None,
sort: str | None = None,
limit: int = RESOURCES_LIMIT_DEFAULT,
offset: int = 0,
) -> tuple[list[ResourceResult], int, IndexState]:
"""
Get resources from ArchiveBox instances using in-memory SQLite.
Args:
datasrc: path to the directory containing ArchiveBox instance directories
sites: optional list of site IDs to filter by
query: search query string
fields: optional list of fields to include in response
sort: sort order for results
limit: maximum number of results to return
offset: number of results to skip for pagination
Returns:
Tuple of (list of ResourceResult objects, total count, IndexState)
"""
sites_results: list[SiteResult] = get_sites(datasrc=datasrc, ids=sites)
assert sites_results, "At least one site is required to search"
# use the actual site directories as paths (e.g., "example", "pragmar")
site_paths = [site.path for site in sites_results]
sites_group = SitesGroup(datasrc, sites or [site.id for site in sites_results], site_paths)
return manager.get_resources_for_sites_group(sites_group, query, fields, sort, limit, offset)