import sqlite3
import traceback
import re
import anyio
from urllib.parse import urlparse
from pathlib import Path
from typing import Any, Optional
from mcp.types import TextContent, ImageContent, EmbeddedResource, Tool
from mcp.server.models import InitializationOptions
from mcp.server import NotificationOptions, Server
from mcp_server_webcrawl.models.resources import ResourceResult, ResourceResultType
from mcp_server_webcrawl.crawlers.base.api import BaseJsonApi
from mcp_server_webcrawl.models.resources import RESOURCES_TOOL_NAME
from mcp_server_webcrawl.models.sites import SITES_TOOL_NAME
from mcp_server_webcrawl.utils.logger import get_logger
from mcp_server_webcrawl.utils.blobs import ThumbnailManager
OVERRIDE_ERROR_MESSAGE: str = """BaseCrawler subclasses must implement the following \
methods: handle_list_tools, handle_call_tool, at minimum."""
logger = get_logger()
[docs]
class BaseCrawler:
"""
Base crawler class that implements MCP server functionality.
This class provides the foundation for specialized crawlers to interact with
the MCP server and handle tool operations for web resources.
"""
[docs]
def __init__(self, datasrc: str):
"""
Initialize the BaseCrawler with a data source.
Args:
datasrc: String path to the database
"""
# avoid circular import
from mcp_server_webcrawl import __name__ as module_name, __version__ as module_version
self._module_name: str = module_name
self._module_version: str = module_version
self.datasrc: Path = Path(datasrc)
self.thumbnails = False
self._server = Server(self._module_name)
self._server.list_tools()(self.mcp_list_tools)
self._server.call_tool()(self.mcp_call_tool)
# untapped features: list_prompts/get_prompt | list_resources/get_resources
# currently focused on tools (search and retrieval)
self._server.list_prompts()(self.mcp_list_prompts)
self._server.list_resources()(self.mcp_list_resources)
[docs]
async def mcp_list_prompts(self) -> list:
"""List available prompts (currently none)."""
return []
[docs]
async def mcp_list_resources(self) -> list:
"""List available resources (currently none)."""
return []
[docs]
async def serve(self, stdin: anyio.AsyncFile[str] | None, stdout: anyio.AsyncFile[str] | None) -> dict[str, Any]:
"""
Launch the awaitable server.
Args:
stdin: Input stream for the server
stdout: Output stream for the server
Returns:
The MCP server over stdio
"""
# awaiting on caller end as well, but if not awaiting here
# RuntimeWarning: coroutine 'Server.run' was never awaited (serial)
return await self._server.run(stdin, stdout, self.get_initialization_options())
[docs]
def get_initialization_options(self) -> InitializationOptions:
"""
Get the MCP initialization object.
Returns:
Dictionary containing project information
"""
notification_events = NotificationOptions(prompts_changed=False, resources_changed=False, tools_changed=False)
capabilities = self._server.get_capabilities(notification_options=notification_events, experimental_capabilities={})
return InitializationOptions(server_name=self._module_name, server_version=self._module_version, capabilities=capabilities)
[docs]
def get_sites_api_json(self, **kwargs) -> str:
"""
Get sites API result as JSON.
Returns:
JSON string of sites API results
"""
json_result = self.get_sites_api(**kwargs)
return json_result.to_json()
[docs]
def get_resources_api_json(self, **kwargs) -> str:
"""
Get resources API result as JSON.
Returns:
JSON string of resources API results
"""
json_result = self.get_resources_api(**kwargs)
return json_result.to_json()
[docs]
def get_sites_api(self, ids: Optional[list[int]] = None, fields: Optional[list[str]] = None) -> str:
"""
Get sites API object.
Args:
ids: Optional list of site IDs
fields: Optional list of fields to include
Raises:
NotImplementedError: This method must be implemented by subclasses
"""
# each crawler subclass must provide this method
raise NotImplementedError(OVERRIDE_ERROR_MESSAGE)
[docs]
def get_resources_api(self, querystring: str) -> str:
"""
Get resources API object.
Args:
querystring: Query string for filtering resources
Raises:
NotImplementedError: This method must be implemented by subclasses
"""
# each crawler subclass must provide this method
raise NotImplementedError(OVERRIDE_ERROR_MESSAGE)
[docs]
def get_thumbnails(self, results: list[ResourceResult]) -> list[ImageContent]:
thumbnails_result: list[ImageContent] = []
if self.thumbnails:
image_paths = list(set([result.url for result in results if result.url and result.type == ResourceResultType.IMAGE]))
valid_paths = []
for path in image_paths:
parsed = urlparse(path)
if parsed.scheme in ("http", "https") and parsed.netloc:
clean_path: str = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
valid_paths.append(clean_path)
elif re.search(r"\.(jpg|jpeg|png|gif|bmp|webp)$", path, re.IGNORECASE):
clean_path: str = path.split("?")[0]
valid_paths.append(clean_path)
if valid_paths:
try:
thumbnail_manager = ThumbnailManager()
thumbnail_data = thumbnail_manager.get_thumbnails(valid_paths)
for thumbnail_url, thumbnail_base64 in thumbnail_data.items():
if thumbnail_base64 is None:
logger.debug(f"Thumbnail encountered error during request. {thumbnail_url}")
continue
image_content = ImageContent(type="image", data=thumbnail_base64, mimeType="image/webp")
thumbnails_result.append(image_content)
logger.debug(f"Fetched {len(thumbnail_data)} thumbnails out of {len(valid_paths)} requested URLs")
# print(thumbnail_data)
except Exception as ex:
logger.error(f"Error fetching thumbnails: {ex}\n{traceback.format_exc()}")
return thumbnails_result
def _convert_to_resource_types(self, types: Optional[list[str]]) -> Optional[list[ResourceResultType]]:
"""
Convert string type values to ResourceResultType enums. Silently ignore invalid type strings.
Args:
types: Optional list of string type values
Returns:
Optional list of ResourceResultType enums, or None if no valid types
"""
if not types:
return None
result = [rt for rt in ResourceResultType if rt.value in types]
return result if result else None