import anyio
import re
import sqlite3
import traceback
from pathlib import Path
from typing import Any, Callable, Final
from urllib.parse import urlparse
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.types import EmbeddedResource, ImageContent, TextContent, Tool
from mcp_server_webcrawl.crawlers.base.api import BaseJsonApi
from mcp_server_webcrawl.crawlers.base.adapter import IndexState
from mcp_server_webcrawl.models import METADATA_VALUE_TYPE
from mcp_server_webcrawl.models.resources import (
ResourceResult,
ResourceResultType,
RESOURCES_DEFAULT_FIELD_MAPPING,
RESOURCE_EXTRAS_ALLOWED,
RESOURCES_TOOL_NAME,
)
from mcp_server_webcrawl.utils.blobs import ThumbnailManager
from mcp_server_webcrawl.utils.extras import get_markdown, get_snippets
from mcp_server_webcrawl.utils.logger import get_logger
from mcp_server_webcrawl.models.sites import SITES_TOOL_NAME
OVERRIDE_ERROR_MESSAGE: Final[str] = """BaseCrawler subclasses must implement the following \
methods: handle_list_tools, handle_call_tool, at minimum."""
logger = get_logger()
[docs]
class BaseCrawler:
"""
Base crawler class that implements MCP server functionality.
This class provides the foundation for specialized crawlers to interact with
the MCP server and handle tool operations for web resources.
"""
[docs]
def __init__(
self,
datasrc: Path,
get_sites_func: Callable,
get_resources_func: Callable,
resource_field_mapping: dict[str, str] = RESOURCES_DEFAULT_FIELD_MAPPING,
) -> None:
"""
Initialize the BaseCrawler with a data source path and required adapter functions.
Args:
datasrc: path to the data source
get_sites_func: function to retrieve sites from the data source
get_resources_func: function to retrieve resources from the data source
resource_field_mapping: mapping of resource field names to display names
"""
from mcp_server_webcrawl import __name__ as module_name, __version__ as module_version
assert datasrc is not None, f"{self.__class__.__name__} needs a datasrc, regardless of action"
assert callable(get_sites_func), f"{self.__class__.__name__} requires a callable get_sites_func"
assert callable(get_resources_func), f"{self.__class__.__name__} requires a callable get_resources_func"
assert isinstance(resource_field_mapping, dict), f"{self.__class__.__name__} resource_field_mapping must be a dict"
self._datasrc: Path = Path(datasrc)
self._module_name: str = module_name
self._module_version: str = module_version
self._server = Server(self._module_name)
self._server.list_tools()(self.mcp_list_tools)
self._server.call_tool()(self.mcp_call_tool)
self._server.list_prompts()(self.mcp_list_prompts)
self._server.list_resources()(self.mcp_list_resources)
self._resource_field_mapping = resource_field_mapping
self._adapter_get_sites = get_sites_func
self._adapter_get_resources = get_resources_func
@property
def datasrc(self) -> Path:
return self._datasrc
[docs]
async def mcp_list_prompts(self) -> list:
"""List available prompts (currently none)."""
return []
[docs]
async def mcp_list_resources(self) -> list:
"""List available resources (currently none)."""
return []
[docs]
async def serve(self, stdin: anyio.AsyncFile[str] | None, stdout: anyio.AsyncFile[str] | None) -> dict[str, Any]:
"""
Launch the awaitable server.
Args:
stdin: input stream for the server
stdout: output stream for the server
Returns:
The MCP server over stdio
"""
return await self._server.run(stdin, stdout, self.get_initialization_options())
[docs]
def get_initialization_options(self) -> InitializationOptions:
"""
Get the MCP initialization object.
Returns:
Dictionary containing project information
"""
notification_events = NotificationOptions(prompts_changed=False, resources_changed=False, tools_changed=False)
capabilities = self._server.get_capabilities(notification_options=notification_events, experimental_capabilities={})
return InitializationOptions(server_name=self._module_name, server_version=self._module_version, capabilities=capabilities)
[docs]
def get_sites_api_json(self, **kwargs) -> str:
"""
Get sites API result as JSON.
Returns:
JSON string of sites API results
"""
json_result = self.get_sites_api(**kwargs)
return json_result.to_json()
[docs]
def get_resources_api_json(self, **kwargs) -> str:
"""
Get resources API result as JSON.
Returns:
JSON string of resources API results
"""
json_result = self.get_resources_api(**kwargs)
return json_result.to_json()
[docs]
def get_sites_api(
self,
ids: list[int] | None = None,
fields: list[str] | None = None,
) -> BaseJsonApi:
sites = self._adapter_get_sites(self._datasrc, ids=ids, fields=fields)
sites_kwargs = {
"ids": ids,
"fields": fields,
}
json_result = BaseJsonApi("GetProjects", sites_kwargs)
json_result.set_results(sites, len(sites), 0, len(sites))
return json_result
[docs]
def get_resources_api(
self,
sites: list[int] | None = None,
query: str = "",
fields: list[str] | None = None,
sort: str | None = None,
limit: int = 20,
offset: int = 0,
extras: list[str] | None = None,
) -> BaseJsonApi:
resources_kwargs: dict[str, METADATA_VALUE_TYPE] = {
"sites": sites,
"query": query,
"fields": fields,
"sort": sort,
"limit": limit,
"offset": offset,
}
def no_results() -> BaseJsonApi:
api_result = BaseJsonApi("GetResources", resources_kwargs, index_state=IndexState())
api_result.set_results([], 0, 0, limit)
return api_result
if not sites:
all_sites = self._adapter_get_sites(self._datasrc)
if not all_sites:
return no_results()
# set to default of all sites if not specified
sites = [site.id for site in all_sites]
# sometimes the AI gets it in its head this is a good idea
# but it means no query, just take care of it here
if query.strip() in ('""',"''", "``", "*"):
query = ""
site_matches = self._adapter_get_sites(self._datasrc, ids=sites)
if not site_matches:
return no_results()
# Handle stealth fields for extras
extras = extras or []
fields = fields or []
fields_extras_override: list[str] = fields.copy()
if ("markdown" in extras or "snippets" in extras) and "content" not in fields:
fields_extras_override.append("content")
if "snippets" in extras and "headers" not in fields:
fields_extras_override.append("headers")
results, total, index_state = self._adapter_get_resources(
self._datasrc,
sites=sites,
query=query,
fields=fields_extras_override,
sort=sort,
limit=limit,
offset=offset,
)
if "markdown" in extras:
result: ResourceResult
for result in results:
markdown_result: str | None = get_markdown(result.content)
result.set_extra("markdown", markdown_result)
if "snippets" in extras and query.strip():
result: ResourceResult
for result in results:
snippets: str | None = get_snippets(result.headers, result.content, query)
result.set_extra("snippets", snippets)
extras_only_fields = set(fields_extras_override) - set(fields)
if extras_only_fields:
for result in results:
for field in extras_only_fields:
if hasattr(result, field):
setattr(result, field, None)
# note: thumbnails extra a special case, handled in mcp_call_tool
api_result = BaseJsonApi("GetResources", resources_kwargs, index_state=index_state)
api_result.set_results(results, total, offset, limit)
return api_result
[docs]
def get_thumbnails(self, results: list[ResourceResult]) -> list[ImageContent]:
thumbnails_result: list[ImageContent] = []
image_paths = list(set([result.url for result in results if result.url and
result.type == ResourceResultType.IMAGE]))
valid_paths = []
for path in image_paths:
parsed = urlparse(path)
if parsed.scheme in ("http", "https") and parsed.netloc:
clean_path: str = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
valid_paths.append(clean_path)
elif re.search(r"\.(jpg|jpeg|png|gif|bmp|webp)$", path, re.IGNORECASE):
clean_path: str = path.split("?")[0]
valid_paths.append(clean_path)
if valid_paths:
try:
thumbnail_manager = ThumbnailManager()
thumbnail_data = thumbnail_manager.get_thumbnails(valid_paths)
for thumbnail_url, thumbnail_base64 in thumbnail_data.items():
if thumbnail_base64 is None:
logger.debug(f"Thumbnail encountered error during request. {thumbnail_url}")
continue
image_content = ImageContent(type="image", data=thumbnail_base64, mimeType="image/webp")
thumbnails_result.append(image_content)
logger.debug(f"Fetched {len(thumbnail_data)} thumbnails out of {len(valid_paths)} requested URLs")
except Exception as ex:
logger.error(f"Error fetching thumbnails: {ex}\n{traceback.format_exc()}")
return thumbnails_result
def _convert_to_resource_types(self, types: list[str] | None) -> list[ResourceResultType] | None:
"""
Convert string type values to ResourceResultType enums. Silently ignore invalid type strings.
Args:
types: optional list of string type values
Returns:
Optional list of ResourceResultType enums, or None if no valid types
"""
if not types:
return None
result = [rt for rt in ResourceResultType if rt.value in types]
return result if result else None