Source code for mcp_server_webcrawl.extras.snippets


import re
import lxml.html

from lxml import etree
from lxml.etree import ParserError
from logging import Logger
from typing import Final

from mcp_server_webcrawl.utils.logger import get_logger
from mcp_server_webcrawl.utils.search import SearchQueryParser

MAX_SNIPPETS_MATCHED_COUNT: Final[int] = 15
MAX_SNIPPETS_RETURNED_COUNT: Final[int] = 3
MAX_SNIPPETS_CONTEXT_SIZE: Final[int] = 48

__RE_SNIPPET_START_TRIM: Final[re.Pattern] = re.compile(r"^[^\w\[]+")
__RE_SNIPPET_END_TRIM: Final[re.Pattern] = re.compile(r"[^\w\]]+$")

logger: Logger = get_logger()

[docs] class SnippetContentExtractor: """ lxml-based HTML parser for extracting different types of content from HTML. Content separates into components: text, markup, attributes (values), and comments. These can be prioritized in search so that text is the displayed hit over noisier types. """ PRIORITY_ORDER: list[str] = ["url", "document_text", "document_attributes", "document_comments", "headers", "document_markup"] __RE_SPLIT: re.Pattern = re.compile(r"[\s_]+|(?<!\w)-(?!\w)") __RE_WHITESPACE: re.Pattern = re.compile(r"\s+") __MAX_CONTENT_BYTES: int = 2 * 1024 * 1024 # 2MB
[docs] def __init__(self, url: str, headers: str, content: str): self.__document: lxml.html.HtmlElement | None = None self.url: str = url self.content: str = "" # headers one liner to facilitate snippet self.headers: str = re.sub(r"\s+", " ", headers).strip() self.document_text: str = "" self.document_markup: str = "" self.document_attributes: str = "" self.document_comments: str = "" if len(content) > self.__MAX_CONTENT_BYTES: # ignore large files, slow return else: self.content = content load_success: bool = self.__load_content() if load_success == True: _ = self.__extract() else: self.document_text = self.__normalize_whitespace(self.content)
def __load_content(self) -> bool: """ Load content string into lxml doc. """ if not self.content or not self.content.strip(): return False try: self.__document = lxml.html.fromstring(self.content.encode("utf-8")) return True except (ParserError, ValueError, UnicodeDecodeError): try: wrapped_content = f"<html><body>{self.content}</body></html>" self.__document = lxml.html.fromstring(wrapped_content.encode("utf-8")) return True except (ParserError, ValueError, UnicodeDecodeError): return False def __extract(self) -> bool: """ Extract content from lxml doc. """ if self.__document is None: return False text_values = [] markup_values = [] attribute_values = [] comment_values = [] element: lxml.html.HtmlElement | None = None for element in self.__document.iter(): # HTML outliers if element.tag is etree.Comment or element.tag is etree.ProcessingInstruction: if element.text is not None: comment_values.append(str(element.text.strip())) # avoid regular element text processing continue if element.tag is etree.Entity or element.tag is etree.CDATA: if element.text is not None: text_values.append(str(element.text.strip())) continue # HTML tags and attributes if element.tag: markup_values.append(element.tag) if element.tag in ("script", "style"): continue if element.text: text_values.append(element.text.strip()) if element.tail: text_values.append(element.tail.strip()) for attr_name, attr_value in element.attrib.items(): markup_values.append(attr_name) if attr_value: values = [v for v in self.__RE_SPLIT.split(attr_value) if v] attribute_values.extend(values) self.document_text = self.__normalize_values(text_values) self.document_markup = self.__normalize_values(markup_values) self.document_attributes = self.__normalize_values(attribute_values) self.document_comments = self.__normalize_values(comment_values) return True def __normalize_values(self, values: list[str]) -> str: """ Concatenate values and normalize whitespace for list of values. """ text = " ".join([value for value in values if value]) return self.__normalize_whitespace(text) def __normalize_whitespace(self, text: str) -> str: """ Normalize whitespace using pre-compiled pattern. """ return self.__RE_WHITESPACE.sub(" ", text).strip()
[docs] def get_snippets(url: str, headers: str, content: str, query: str) -> str | None: """ Takes a query and content, reduces the HTML to text content and extracts hits as excerpts of text. Arguments: headers: Header content to search content: The HTML or text content to search in query: The search query string Returns: A string of snippets with context around matched terms, separated by " ... " or None """ if query in (None, ""): return None url = url or "" content = content or "" headers = headers or "" search_terms_parser = SearchQueryParser() search_terms: list[str] = search_terms_parser.get_fulltext_terms(query) if not search_terms: return None snippets = [] search_terms_parser = SnippetContentExtractor(url, headers, content) # priority order url, text, attributes, comments, headers, markup # most interesting to least, as search hits for group_name in search_terms_parser.PRIORITY_ORDER: search_group_text = getattr(search_terms_parser, group_name) if not search_group_text: continue group_snippets = find_snippets_in_text(search_group_text, search_terms, max_snippets=MAX_SNIPPETS_MATCHED_COUNT+1, group_name=group_name) snippets.extend(group_snippets) if len(snippets) > MAX_SNIPPETS_MATCHED_COUNT: break if snippets: total_snippets = len(snippets) displayed_snippets = snippets[:MAX_SNIPPETS_RETURNED_COUNT] result = " ... ".join(displayed_snippets) if total_snippets > MAX_SNIPPETS_MATCHED_COUNT: result += f" ... + >{MAX_SNIPPETS_MATCHED_COUNT} more" elif total_snippets > MAX_SNIPPETS_RETURNED_COUNT: remaining = total_snippets - MAX_SNIPPETS_RETURNED_COUNT result += f" ... +{remaining} more" return result return None
[docs] def find_snippets_in_text( text: str, terms: list[str], max_snippets: int = MAX_SNIPPETS_MATCHED_COUNT, group_name: str = "") -> list[str]: """ Searches for whole-word matches of the given terms in the text and extracts surrounding context to create highlighted snippets. Each snippet shows the matched term in context with markdown-style bold highlighting (**term**). Args: text: The text to search within terms: List of search terms to find (case-insensitive, whole words only) max_snippets: Maximum number of snippets to return (default: MAX_SNIPPETS_MATCHED_COUNT) group_name: Regex group identifier (reserved for future use) Returns: List of unique snippet strings with matched terms highlighted using **bold** markdown. Each snippet includes surrounding context up to MAX_SNIPPETS_CONTEXT_SIZE characters on each side of the match. Returns empty list if no matches found or invalid input. """ if not text or not terms: return [] snippets: list[str] = [] seen_snippets: set[str] = set() text_lower: str = text.lower() escaped_terms = [re.escape(term) for term in terms] pattern: str = rf"\b({'|'.join(escaped_terms)})\b" highlight_patterns: list[tuple[re.Pattern, str]] = [ (re.compile(rf"\b({re.escape(term)})\b", re.IGNORECASE), term) for term in terms ] matches = list(re.finditer(pattern, text_lower)) for match in matches: if len(snippets) >= max_snippets: break context_start: int = max(0, match.start() - MAX_SNIPPETS_CONTEXT_SIZE) context_end: int = min(len(text), match.end() + MAX_SNIPPETS_CONTEXT_SIZE) if context_start > 0: while context_start > 0 and text[context_start].isalnum(): context_start -= 1 if context_end < len(text): while context_end < len(text) and text[context_end].isalnum(): context_end += 1 snippet: str = text[context_start:context_end].strip() snippet = __RE_SNIPPET_START_TRIM.sub("", snippet) snippet = __RE_SNIPPET_END_TRIM.sub("", snippet) highlighted_snippet: str = snippet for pattern, _ in highlight_patterns: highlighted_snippet = pattern.sub(r"**\1**", highlighted_snippet) if highlighted_snippet and highlighted_snippet not in seen_snippets: seen_snippets.add(highlighted_snippet) snippets.append(highlighted_snippet) return snippets