Source code for mcp_server_webcrawl.interactive.search

import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from datetime import datetime
from typing import Optional, TYPE_CHECKING

from mcp_server_webcrawl.crawlers.base.crawler import BaseJsonApi
from mcp_server_webcrawl.interactive.ui import UiFocusable, UiState
from mcp_server_webcrawl.models.resources import ResourceResult

if TYPE_CHECKING:
    from mcp_server_webcrawl.interactive.session import InteractiveSession

SEARCH_DEBOUNCE_DELAY_SECONDS = 0.2
SEARCH_RESULT_LIMIT: int = 10

[docs] class SearchManager: """ Manages search operations including async search and debouncing. Works with session's controlled interface - never touches private state directly. """
[docs] def __init__(self, session: 'InteractiveSession'): self.__session: 'InteractiveSession' = session self.__search_last_state_hash: str = "" self.__search_timer: Optional[threading.Timer] = None self.__executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="SearchManager") self.__search_lock: threading.RLock = threading.RLock() self.__search_in_progress: bool = False self.__active_search_future: Optional[Future] = None self.__pending_results: Optional[list[ResourceResult]] = None self.__pending_indexer_status: str = "" self.__pending_indexer_processed: int = 0 self.__pending_indexer_duration: float = 0 self.__pending_total: int = 0
[docs] def autosearch(self, immediate: bool = False) -> None: """ Trigger search with optional immediate execution. Args: immediate: If True, execute search synchronously without debouncing. If False, use debounced async execution (default). """ current_state_hash: str = self.__get_input_hash() if not immediate and current_state_hash == self.__search_last_state_hash: return self.__search_last_state_hash = current_state_hash self.cancel_pending() if immediate: self.__execute_search_immediate() else: self.__search_timer = threading.Timer(SEARCH_DEBOUNCE_DELAY_SECONDS, self.__execute_debounced_search) self.__search_timer.start()
[docs] def cancel_pending(self) -> None: """ Cancel any pending search timer. """ if self.__search_timer is not None: self.__search_timer.cancel() self.__search_timer = None with self.__search_lock: if self.__active_search_future is not None: self.__active_search_future.cancel() self.__active_search_future = None
[docs] def check_pending(self) -> None: """ Check if there are pending search results and update the UI. """ with self.__search_lock: if self.__pending_results is not None: self.__session.results.update(self.__pending_results, self.__pending_total, self.__pending_indexer_status, self.__pending_indexer_processed, self.__pending_indexer_duration) self.__pending_results = None self.__pending_total = 0 self.__pending_indexer_processed = 0 self.__pending_indexer_duration = 0
[docs] def cleanup(self) -> None: """ Clean up any pending operations. """ self.cancel_pending() self.__executor.shutdown(wait=True)
[docs] def has_pending(self) -> bool: """ Check if there's a pending debounced search. """ return self.__search_timer is not None
[docs] def is_searching(self) -> bool: """ Check if a search is currently in progress or on a timer. """ with self.__search_lock: return self.__search_in_progress or self.__search_timer is not None
def __background_search(self) -> None: """ Execute search in background thread and store results. """ with self.__search_lock: self.__search_in_progress = True self.__session.searchform.set_search_attempted() results, total_results, index_status, index_processed_count, index_duration_value = self.__execute_search_query() self.__set_pending_results(results, total_results, index_status, index_processed_count, index_duration_value, False) def __build_search_query(self, base_query: str) -> str: """ Build the final search query with filter applied (if present). """ if self.__session.searchform.filter == "html": if base_query.strip(): return f"(type: html) AND {base_query}" else: return "type: html" else: return base_query def __execute_debounced_search(self) -> None: """ Execute search after debounce delay in separate thread. """ current_state_hash: str = self.__get_input_hash() if current_state_hash != self.__search_last_state_hash: return # show split view on results if self.__session.ui_focused == UiFocusable.SEARCH_RESULTS: self.__session.set_ui_state(UiState.SEARCH_RESULTS, UiFocusable.SEARCH_RESULTS) else: self.__session.set_ui_state(UiState.SEARCH_RESULTS, UiFocusable.SEARCH_FORM) self.__search_timer = None with self.__search_lock: self.__active_search_future = self.__executor.submit(self.__background_search) def __execute_search_immediate(self) -> None: """ Execute search immediately on main thread (for ENTER key). """ self.__session.searchform.set_search_attempted() self.__set_pending_results(None, 0, "", -1, -1, False) self.__session.results.clear() results, total_results, index_status, index_processed_count, index_duration_value = self.__execute_search_query() self.__set_pending_results(results, total_results, index_status, index_processed_count, index_duration_value, False) def __execute_search_query(self) -> tuple[list[ResourceResult], int, str, int, float]: """ Centralized search execution logic shared by both sync and async paths. Returns: tuple: (results, total_results, index_status, index_processed_count, index_duration_value) """ api: BaseJsonApi | None = self.__get_results(offset=self.__session.searchform.offset) if api is None: return [], 0, 0, 0 results: list[ResourceResult] = api.get_results() total_results: int = api.total index_status: str = "" index_processed_count: int = -1 index_duration_value: float = -1 if api.meta_index is not None: if "status" in api.meta_index: index_status = api.meta_index["status"] if "processed" in api.meta_index: index_processed_count = api.meta_index["processed"] if "duration" in api.meta_index: index_duration_string: str = api.meta_index["duration"] or "" if index_duration_string: try: dt: datetime = datetime.strptime(index_duration_string, "%H:%M:%S.%f") index_duration_value = dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond / 1000000 except ValueError: index_duration_value = 0 return results, total_results, index_status, index_processed_count, index_duration_value def __get_input_hash(self) -> str: """ Generate a hash representing the complete current search state. """ query: str = self.__session.searchform.query.strip() selected_sites = self.__session.searchform.get_selected_sites() selected_sites_ids: list[int] = [s.id for s in selected_sites] filter: str = str(self.__session.searchform.filter) sort: str = str(self.__session.searchform.sort) offset: int = self.__session.searchform.offset limit: int = self.__session.searchform.limit search_state: str = f"{query}|{selected_sites_ids}|{filter}|{offset}|{limit}|{sort}" return hashlib.md5(search_state.encode()).hexdigest() def __get_results(self, offset: int = 0) -> BaseJsonApi | None: """ Execute search with given offset and return API response object. Centralizes the API call logic used by both sync and async search paths. Args: offset: Starting position for search results pagination Returns: BaseJsonApi: API response object containing search results and metadata """ selected_site_ids: list[int] = self.__get_selected_site_ids() query: str = self.__build_search_query(self.__session.searchform.query) sort: str = self.__session.searchform.sort query_api: BaseJsonApi = self.__session.crawler.get_resources_api( sites=selected_site_ids if selected_site_ids else None, query=query, fields=["size", "status"], offset=offset, limit=SEARCH_RESULT_LIMIT, extras=["snippets"], sort=sort ) return query_api def __get_selected_site_ids(self) -> list[int]: """ Get list of selected site IDs using property access. """ selected_sites = self.__session.searchform.get_selected_sites() return [site.id for site in selected_sites] def __set_pending_results(self, results, total_results, index_status, index_processed_count, index_duration_value, search_in_progress) -> None: try: with self.__search_lock: self.__pending_results = results self.__pending_total = total_results self.__pending_indexer_status = index_status self.__pending_indexer_processed = index_processed_count self.__pending_indexer_duration = index_duration_value self.__search_in_progress = search_in_progress except Exception as ex: with self.__search_lock: self.__session.results.clear() self.__search_in_progress = False