diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index a47d8205c77..e04497a1e1c 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -106,6 +106,7 @@ class DocumentSource(str, Enum): R2 = "r2" GOOGLE_CLOUD_STORAGE = "google_cloud_storage" OCI_STORAGE = "oci_storage" + HIGHSPOT = "highspot" class BlobType(str, Enum): diff --git a/backend/danswer/connectors/factory.py b/backend/danswer/connectors/factory.py index 3c205314503..5bedb9b4245 100644 --- a/backend/danswer/connectors/factory.py +++ b/backend/danswer/connectors/factory.py @@ -22,6 +22,7 @@ from danswer.connectors.google_drive.connector import GoogleDriveConnector from danswer.connectors.google_site.connector import GoogleSitesConnector from danswer.connectors.guru.connector import GuruConnector +from danswer.connectors.highspot.connector import HighspotConnector from danswer.connectors.hubspot.connector import HubSpotConnector from danswer.connectors.interfaces import BaseConnector from danswer.connectors.interfaces import EventConnector @@ -99,6 +100,7 @@ def identify_connector_class( DocumentSource.R2: BlobStorageConnector, DocumentSource.GOOGLE_CLOUD_STORAGE: BlobStorageConnector, DocumentSource.OCI_STORAGE: BlobStorageConnector, + DocumentSource.HIGHSPOT: HighspotConnector, } connector_by_source = connector_map.get(source, {}) diff --git a/backend/danswer/connectors/highspot/__init__.py b/backend/danswer/connectors/highspot/__init__.py new file mode 100644 index 00000000000..fcf56aab50c --- /dev/null +++ b/backend/danswer/connectors/highspot/__init__.py @@ -0,0 +1,6 @@ +"""Highspot connector — indexes Spots and Items from a Highspot tenant. + +Authenticates via HTTP Basic with an API key/secret pair generated from +Highspot's admin console. Supports an optional `spot_names` allowlist; +when empty, all Spots accessible to the credential are indexed. +""" diff --git a/backend/danswer/connectors/highspot/client.py b/backend/danswer/connectors/highspot/client.py new file mode 100644 index 00000000000..ca91eac856a --- /dev/null +++ b/backend/danswer/connectors/highspot/client.py @@ -0,0 +1,216 @@ +"""Thin HTTP client for the Highspot REST API. + +Ported from upstream Onyx (`onyx-dot-app/onyx`) with only the logger +import path changed to point at this fork's `danswer.utils.logger`. +The HTTP / auth / retry shape is fork-agnostic and intentionally kept +verbatim so future upstream tweaks can be cherry-picked cleanly. +""" +import base64 +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from urllib.parse import urljoin + +import requests +from requests.adapters import HTTPAdapter +from requests.exceptions import HTTPError +from requests.exceptions import RequestException +from requests.exceptions import Timeout +from urllib3.util.retry import Retry + +from danswer.utils.logger import setup_logger + +logger = setup_logger() +PAGE_SIZE = 100 + + +class HighspotClientError(Exception): + """Base exception for Highspot API client errors.""" + + def __init__(self, message: str, status_code: Optional[int] = None): + self.message = message + self.status_code = status_code + super().__init__(self.message) + + +class HighspotAuthenticationError(HighspotClientError): + """Exception raised for authentication errors.""" + + +class HighspotRateLimitError(HighspotClientError): + """Exception raised when rate limit is exceeded.""" + + def __init__(self, message: str, retry_after: Optional[str] = None): + self.retry_after = retry_after + super().__init__(message) + + +class HighspotClient: + """ + Client for interacting with the Highspot API. + + Uses basic authentication with provided key (username) and secret + (password). Implements retry logic, error handling, and connection + pooling via a `requests.Session`. + """ + + BASE_URL = "https://api-su2.highspot.com/v1.0/" + + def __init__( + self, + key: str, + secret: str, + base_url: str = BASE_URL, + timeout: int = 30, + max_retries: int = 3, + backoff_factor: float = 0.5, + status_forcelist: Optional[List[int]] = None, + ): + if not key or not secret: + raise ValueError("API key and secret are required") + + self.key = key + self.secret = secret + self.base_url = base_url.rstrip("/") + "/" + self.timeout = timeout + + self.session = requests.Session() + retry_strategy = Retry( + total=max_retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist or [429, 500, 502, 503, 504], + allowed_methods=["GET", "POST", "PUT", "DELETE"], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + self._setup_auth() + + def _setup_auth(self) -> None: + auth = f"{self.key}:{self.secret}" + encoded_auth = base64.b64encode(auth.encode()).decode() + self.session.headers.update( + { + "Authorization": f"Basic {encoded_auth}", + "Content-Type": "application/json", + "Accept": "application/json", + } + ) + + def _make_request( + self, + method: str, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + json_data: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + url = urljoin(self.base_url, endpoint) + request_headers = {} + if headers: + request_headers.update(headers) + + try: + logger.debug("Making %s request to %s", method, url) + response = self.session.request( + method=method, + url=url, + params=params, + data=data, + json=json_data, + headers=request_headers, + timeout=self.timeout, + ) + response.raise_for_status() + + if response.content and response.content.strip(): + return response.json() + return {} + + except HTTPError as e: + status_code = e.response.status_code + error_msg = str(e) + + try: + error_data = e.response.json() + if isinstance(error_data, dict): + error_msg = error_data.get("message", str(e)) + except (ValueError, KeyError): + pass + + if status_code == 401: + raise HighspotAuthenticationError(f"Authentication failed: {error_msg}") + elif status_code == 429: + retry_after = e.response.headers.get("Retry-After") + raise HighspotRateLimitError( + f"Rate limit exceeded: {error_msg}", retry_after=retry_after + ) + else: + raise HighspotClientError( + f"API error {status_code}: {error_msg}", status_code=status_code + ) + + except Timeout: + raise HighspotClientError("Request timed out") + except RequestException as e: + raise HighspotClientError(f"Request failed: {str(e)}") + + def get_spots(self) -> List[Dict[str, Any]]: + """Get all available spots, paginated. Returns flat list.""" + all_spots: list[dict[str, Any]] = [] + has_more = True + current_offset = 0 + + while has_more: + params = {"right": "view", "start": current_offset, "limit": PAGE_SIZE} + response = self._make_request("GET", "spots", params=params) + found_spots = response.get("collection", []) + logger.info( + "Received %s spots at offset %s", len(found_spots), current_offset + ) + all_spots.extend(found_spots) + if len(found_spots) < PAGE_SIZE: + has_more = False + else: + current_offset += PAGE_SIZE + logger.info("Total spots retrieved: %s", len(all_spots)) + return all_spots + + def get_spot(self, spot_id: str) -> Dict[str, Any]: + if not spot_id: + raise ValueError("spot_id is required") + return self._make_request("GET", f"spots/{spot_id}") + + def get_spot_items( + self, spot_id: str, offset: int = 0, page_size: int = PAGE_SIZE + ) -> Dict[str, Any]: + if not spot_id: + raise ValueError("spot_id is required") + params = {"spot": spot_id, "start": offset, "limit": page_size} + return self._make_request("GET", "items", params=params) + + def get_item(self, item_id: str) -> Dict[str, Any]: + if not item_id: + raise ValueError("item_id is required") + return self._make_request("GET", f"items/{item_id}") + + def get_item_content(self, item_id: str) -> bytes: + """Fetch raw item content as bytes (for file extraction).""" + if not item_id: + raise ValueError("item_id is required") + + url = urljoin(self.base_url, f"items/{item_id}/content") + response = self.session.get(url, timeout=self.timeout) + response.raise_for_status() + return response.content + + def health_check(self) -> bool: + """Validate creds + reachability via a 1-row spots fetch.""" + try: + self._make_request("GET", "spots", params={"limit": 1}) + return True + except (HighspotClientError, HighspotAuthenticationError): + return False diff --git a/backend/danswer/connectors/highspot/connector.py b/backend/danswer/connectors/highspot/connector.py new file mode 100644 index 00000000000..7f29c12e96e --- /dev/null +++ b/backend/danswer/connectors/highspot/connector.py @@ -0,0 +1,491 @@ +"""Highspot connector for Darwin. + +Indexes Spots and the Items inside them via Highspot's REST API +(https://api-su2.highspot.com/v1.0/). For each Item we build a +`Document` whose section text is one of: + + 1. WebLink items → headless-Chromium scrape of the linked URL + (with a fallback to title+description if the scrape fails). + 2. File items with a downloadable, supported extension + (.pdf, .docx, .pptx, .xlsx, .eml, .epub, .html, .txt) → + `extract_file_text` over the bytes returned by the + `items/{id}/content` endpoint. + 3. Anything else → `title + "\\n" + description` as a default. + +Ported from upstream Onyx with these structural adjustments for this +fork: + +- Drops the Slim/perm-sync interface (this fork has no + `SlimConnectorWithPermSync`, `SlimDocument`, or + `IndexingHeartbeatInterface`). +- Uses `Section` instead of upstream's `TextSection`. +- Replaces `OnyxFileExtensions` with an inline allowlist matching this + fork's `extract_file_text` capabilities. +- `extract_file_text` argument order is `(file_name, file, ...)` here + vs upstream's `(file, file_name, ...)`. +- `doc_updated_at` is parsed to `datetime` before assignment because + this fork types it as `datetime | None`. +""" +import os +from collections.abc import Callable +from datetime import datetime +from io import BytesIO +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from playwright.sync_api import BrowserContext +from playwright.sync_api import sync_playwright +from pydantic import BaseModel + +from danswer.configs.app_configs import INDEX_BATCH_SIZE +from danswer.configs.constants import DocumentSource +from danswer.connectors.highspot.client import HighspotAuthenticationError +from danswer.connectors.highspot.client import HighspotClient +from danswer.connectors.highspot.client import HighspotClientError +from danswer.connectors.highspot.utils import scrape_url_content +from danswer.connectors.interfaces import GenerateDocumentsOutput +from danswer.connectors.interfaces import LoadConnector +from danswer.connectors.interfaces import PollConnector +from danswer.connectors.interfaces import SecondsSinceUnixEpoch +from danswer.connectors.models import ConnectorMissingCredentialError +from danswer.connectors.models import Document +from danswer.connectors.models import Section +from danswer.file_processing.extract_file_text import extract_file_text +from danswer.utils.logger import setup_logger + + +logger = setup_logger() + +# Inline allowlist mirroring this fork's extract_file_text dispatch +# (see backend/danswer/file_processing/extract_file_text.py:271). We +# only attempt binary download + extraction for items whose +# `content_name` ends in one of these — anything else falls through to +# title+description. +_SUPPORTED_FILE_EXTENSIONS = frozenset( + {".pdf", ".docx", ".pptx", ".xlsx", ".eml", ".epub", ".html", ".txt"} +) + +# How many docs to buffer before yielding to the downstream indexer. +# Decoupled from `self.batch_size` (which controls Highspot API +# pagination): the server-side page can still be 16 items, but we +# yield in smaller chunks so the indexer's `docs_indexed` counter +# updates more often. Per-item processing in this connector can be +# slow (Playwright scrape, big-PDF extraction), so yielding every +# `INDEX_BATCH_SIZE` items can mean minutes between UI counter +# updates. 4 keeps the per-yield Vespa write overhead small while +# giving admins meaningful progress feedback. +_YIELD_BATCH_SIZE = 4 + + +class HighspotSpot(BaseModel): + id: str + name: str + + +def _parse_doc_updated_at(value: Any) -> datetime | None: + """Parse Highspot's `date_updated` field. + + Highspot returns ISO-8601 strings ending in 'Z'. This fork's + `Document.doc_updated_at` is `datetime | None`, so we have to + parse before assignment. Returns None on any failure rather than + raising — a missing timestamp is preferable to dropping the doc. + """ + if not value: + return None + if isinstance(value, datetime): + return value + if not isinstance(value, str): + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + +class HighspotConnector(LoadConnector, PollConnector): + """ + Connector for loading data from Highspot. + + Retrieves content from specified Spots using the Highspot API. + If `spot_names` is empty/None, retrieves content from every Spot + accessible to the credential. + """ + + def __init__( + self, + spot_names: list[str] | None = None, + batch_size: int = INDEX_BATCH_SIZE, + ): + self.spot_names = spot_names or [] + self.batch_size = batch_size + + self._client: Optional[HighspotClient] = None + self.highspot_url: Optional[str] = None + self.key: Optional[str] = None + self.secret: Optional[str] = None + + @property + def client(self) -> HighspotClient: + if self._client is None: + if not self.key or not self.secret: + raise ConnectorMissingCredentialError("Highspot") + base_url = ( + self.highspot_url + if self.highspot_url is not None + else HighspotClient.BASE_URL + ) + self._client = HighspotClient(self.key, self.secret, base_url=base_url) + return self._client + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + logger.info("Loading Highspot credentials") + self.highspot_url = credentials.get("highspot_url") + self.key = credentials.get("highspot_key") + self.secret = credentials.get("highspot_secret") + return None + + # ------------------------------------------------------------------ + # Spot discovery + # ------------------------------------------------------------------ + def _fetch_spots(self) -> list[HighspotSpot]: + return [ + HighspotSpot(id=spot["id"], name=spot["title"]) + for spot in self.client.get_spots() + ] + + def _fetch_spots_to_process(self) -> list[HighspotSpot]: + spots = self._fetch_spots() + if not spots: + raise ValueError("No spots found in Highspot.") + + if self.spot_names: + lower_spot_names = [name.lower() for name in self.spot_names] + spots_to_process = [ + spot for spot in spots if spot.name.lower() in lower_spot_names + ] + if not spots_to_process: + raise ValueError( + f"No valid spots found in Highspot. " + f"Found {[s.name for s in spots]} but {self.spot_names} " + f"were requested." + ) + return spots_to_process + + return spots + + # ------------------------------------------------------------------ + # LoadConnector / PollConnector entry points + # ------------------------------------------------------------------ + def load_from_state(self) -> GenerateDocumentsOutput: + return self.poll_source(None, None) + + def poll_source( + self, + start: SecondsSinceUnixEpoch | None, + end: SecondsSinceUnixEpoch | None, + ) -> GenerateDocumentsOutput: + spots_to_process = self._fetch_spots_to_process() + + # Shared Playwright browser for all WebLink scrapes in this + # poll. Mirrors the lifecycle in + # `connectors/web/connector.py`: spawn ONCE, reuse across + # items via `context.new_page()` / `page.close()`, tear down + # at end. The spawn-per-item pattern (upstream Onyx) starves + # the worker's FDs/RAM and causes co-running connectors + # (e.g. Slack) to fail with `IncompleteRead` mid-response. + # Lazy-init: only spawned if we hit a WebLink item. + playwright_inst = None + browser = None + scrape_context: BrowserContext | None = None + + def _ensure_browser() -> BrowserContext: + nonlocal playwright_inst, browser, scrape_context + if scrape_context is not None: + return scrape_context + playwright_inst = sync_playwright().start() + browser = playwright_inst.chromium.launch(headless=True) + scrape_context = browser.new_context() + return scrape_context + + doc_batch: list[Document] = [] + try: + for spot in spots_to_process: + try: + offset = 0 + has_more = True + + while has_more: + logger.info( + "Retrieving items from spot %s, offset %s", + spot.name, + offset, + ) + response = self.client.get_spot_items( + spot_id=spot.id, + offset=offset, + page_size=self.batch_size, + ) + items = response.get("collection", []) + logger.info( + "Received %s items from spot %s", len(items), spot.name + ) + if not items: + has_more = False + continue + + for item in items: + item_id = None + try: + item_id = item.get("id") + if not item_id: + logger.warning("Item without ID found, skipping") + continue + + item_details = self.client.get_item(item_id) + if not item_details: + logger.warning( + "Item %s details not found, skipping", + item_id, + ) + continue + + # Time-window filter (poll mode). + if start or end: + parsed = _parse_doc_updated_at( + item_details.get("date_updated") + ) + if parsed is None: + # No usable timestamp — skip in poll + # mode rather than reindex unconditionally. + continue + ts = parsed.timestamp() + if (start and ts < start) or (end and ts > end): + continue + + content = self._get_item_content( + item_details, + scrape_context_factory=_ensure_browser, + ) + title = item_details.get("title", "") + doc_updated_at = _parse_doc_updated_at( + item_details.get("date_updated") + ) + + doc_batch.append( + Document( + id=f"HIGHSPOT_{item_id}", + sections=[ + Section( + link=item_details.get( + "url", + f"https://www.highspot.com/items/{item_id}", + ), + text=content, + ) + ], + source=DocumentSource.HIGHSPOT, + semantic_identifier=title, + metadata={ + "spot_name": spot.name, + "type": item_details.get( + "content_type", "" + ), + "created_at": item_details.get( + "date_added", "" + ), + "author": item_details.get("author", ""), + "language": item_details.get( + "language", "" + ), + "can_download": str( + item_details.get("can_download", False) + ), + }, + doc_updated_at=doc_updated_at, + ) + ) + + if len(doc_batch) >= _YIELD_BATCH_SIZE: + yield doc_batch + doc_batch = [] + + except HighspotClientError as e: + logger.error( + "Error retrieving item %s: %s", + item_id or "(unknown)", + str(e), + ) + except Exception as e: + logger.error( + "Unexpected error for item %s: %s", + item_id or "(unknown)", + str(e), + ) + + has_more = len(items) >= self.batch_size + offset += self.batch_size + + except (HighspotClientError, ValueError) as e: + logger.error("Error processing spot %s: %s", spot.name, str(e)) + raise + except Exception as e: + logger.error( + "Unexpected error processing spot %s: %s", + spot.name, + str(e), + ) + raise + + except Exception as e: + logger.error("Error in Highspot connector: %s", str(e)) + raise + + finally: + # Tear down the shared Playwright browser if we ever + # opened one. Wrapped in try/except so a teardown failure + # never masks the real exception or interrupts the + # generator's yield-on-success path. + if browser is not None: + try: + browser.close() + except Exception as e: + logger.debug("Error closing shared browser: %s", str(e)) + if playwright_inst is not None: + try: + playwright_inst.stop() + except Exception as e: + logger.debug("Error stopping shared playwright: %s", str(e)) + + if doc_batch: + yield doc_batch + + # ------------------------------------------------------------------ + # Item content extraction + # ------------------------------------------------------------------ + def _get_item_content( + self, + item_details: Dict[str, Any], + scrape_context_factory: Optional[Callable[[], BrowserContext]] = None, + ) -> str: + item_id = item_details.get("id", "") + content_name = item_details.get("content_name", "") or "" + is_valid_format = bool(content_name) and "." in content_name + file_extension = ( + "." + content_name.rsplit(".", 1)[-1].lower() if is_valid_format else "" + ) + can_download = bool(item_details.get("can_download", False)) + content_type = item_details.get("content_type", "") + + title, description = self._extract_title_and_description(item_details) + default_content = f"{title}\n{description}" + logger.info( + "Processing item %s (content_type=%s, ext=%s, content_name=%s)", + item_id, + content_type, + file_extension, + content_name, + ) + + try: + if content_type == "WebLink": + url = item_details.get("url") + if not url: + return default_content + # Use the shared browser/context owned by the + # caller; only fall back to a fresh spawn if no + # factory was provided (e.g. the __main__ smoke + # test). + shared_context = ( + scrape_context_factory() if scrape_context_factory else None + ) + content = scrape_url_content( + url, + scroll_before_scraping=True, + context=shared_context, + ) + return content if content else default_content + + elif ( + is_valid_format + and file_extension in _SUPPORTED_FILE_EXTENSIONS + and can_download + ): + content_response = self.client.get_item_content(item_id) + if not content_response: + return default_content + # Note: this fork's extract_file_text has the + # arguments in a different order than upstream. + text_content = extract_file_text( + file_name=content_name, + file=BytesIO(content_response), + break_on_unprocessable=False, + ) + return text_content if text_content else default_content + + logger.info( + "Item %s has no extractable body (ext=%s, can_download=%s); " + "using title+description.", + item_id, + file_extension, + can_download, + ) + return default_content + + except HighspotClientError as e: + logger.warning( + "Could not retrieve content for item %s: %s", + item_id or "(unknown)", + str(e), + ) + return default_content + except ValueError as e: + logger.error("Value error for item %s: %s", item_id or "(unknown)", str(e)) + return default_content + except Exception as e: + logger.error( + "Unexpected error retrieving content for item %s: %s", + item_id or "(unknown)", + str(e), + ) + return default_content + + def _extract_title_and_description( + self, item_details: Dict[str, Any] + ) -> tuple[str, str]: + title = item_details.get("title", "") or "" + description = item_details.get("description", "") or "" + return title, description + + # ------------------------------------------------------------------ + # Credential validation (called by admin UI) + # ------------------------------------------------------------------ + def validate_credentials(self) -> bool: + try: + return self.client.health_check() + except HighspotAuthenticationError: + return False + except Exception as e: + logger.error("Failed to validate Highspot credentials: %s", str(e)) + return False + + +if __name__ == "__main__": + spot_names: List[str] = [] + if os.environ.get("HIGHSPOT_SPOT_NAMES"): + spot_names = [ + s.strip() for s in os.environ["HIGHSPOT_SPOT_NAMES"].split(",") if s.strip() + ] + connector = HighspotConnector(spot_names=spot_names) + connector.load_credentials( + { + "highspot_key": os.environ.get("HIGHSPOT_KEY"), + "highspot_secret": os.environ.get("HIGHSPOT_SECRET"), + "highspot_url": os.environ.get("HIGHSPOT_URL"), + } + ) + for batch in connector.load_from_state(): + for doc in batch: + print(doc) + break diff --git a/backend/danswer/connectors/highspot/utils.py b/backend/danswer/connectors/highspot/utils.py new file mode 100644 index 00000000000..96a50e6a8c7 --- /dev/null +++ b/backend/danswer/connectors/highspot/utils.py @@ -0,0 +1,195 @@ +"""WebLink scraping helper for the Highspot connector. + +Highspot Spot items can be of type WebLink — i.e. just a URL the +Spot owner curated. These have no API-side body content; we have to +go fetch the page and extract its readable text. + +The Playwright lifecycle here mirrors `connectors/web/connector.py`: +the caller (HighspotConnector.poll_source) opens **one** browser / +context for the entire run and passes it in via `context=`. Each +WebLink scrape just opens a new page (lightweight tab) inside the +shared browser and closes it after use. This is critical: spawning a +fresh Chromium process per WebLink — as the upstream Onyx port does +— starves the worker's file descriptors / RAM and causes co-running +connectors (e.g. Slack) to fail with `IncompleteRead` mid-response. + +A `context=None` fallback path is kept so the standalone smoke test +in connector.py's `__main__` still works and so isolated callers +(future code, retries) don't have to plumb a context through. +""" +from typing import Optional +from urllib.parse import urlparse + +from bs4 import BeautifulSoup +from playwright.sync_api import BrowserContext +from playwright.sync_api import sync_playwright + +from danswer.file_processing.html_utils import web_html_cleanup +from danswer.utils.logger import setup_logger + +logger = setup_logger() + +WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS = 10 +JAVASCRIPT_DISABLED_MESSAGE = "You have JavaScript disabled in your browser" +DEFAULT_TIMEOUT = 60000 # 60 seconds — used for initial page.goto only. + +# Tight per-scroll networkidle budget. Some pages (analytics +# beacons, polling websockets, autoplay video) never reach +# networkidle, so a 60s wait × 20 iterations stalls a single +# WebLink for up to 20 minutes. Cap this at 5s so a stuck page +# costs at most ~25s total before we move on with whatever HTML +# we've already loaded. +SCROLL_NETWORKIDLE_TIMEOUT_MS = 5000 + + +def _scrape_using_page( + page: "object", # playwright.sync_api.Page + url: str, + scroll_before_scraping: bool, + timeout_ms: int, +) -> Optional[str]: + """Inner scrape: navigate, optionally scroll, run html cleanup, + iframe-fallback. Caller manages the page lifecycle.""" + logger.info("Navigating to URL: %s", url) + try: + page.goto(url, timeout=timeout_ms) + except Exception as e: + logger.error("Failed to navigate to %s: %s", url, str(e)) + return None + + if scroll_before_scraping: + logger.debug("Scrolling page to load lazy content") + scroll_attempts = 0 + previous_height = page.evaluate("document.body.scrollHeight") + while scroll_attempts < WEB_CONNECTOR_MAX_SCROLL_ATTEMPTS: + page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + try: + # Tight per-scroll budget — see comment on + # SCROLL_NETWORKIDLE_TIMEOUT_MS. Some pages never + # reach networkidle; we'd rather scrape what + # loaded than hang the whole indexing run. + page.wait_for_load_state( + "networkidle", timeout=SCROLL_NETWORKIDLE_TIMEOUT_MS + ) + except Exception as e: + logger.debug( + "Network idle wait timed out (continuing with current HTML): %s", + str(e), + ) + break + + new_height = page.evaluate("document.body.scrollHeight") + if new_height == previous_height: + break + previous_height = new_height + scroll_attempts += 1 + + content = page.content() + soup = BeautifulSoup(content, "html.parser") + + parsed_html = web_html_cleanup(soup) + + if JAVASCRIPT_DISABLED_MESSAGE in parsed_html.cleaned_text: + logger.debug("JavaScript disabled message detected, checking iframes") + try: + iframe_count = page.frame_locator("iframe").locator("html").count() + if iframe_count > 0: + iframe_texts = ( + page.frame_locator("iframe").locator("html").all_inner_texts() + ) + iframe_content = "\n".join(iframe_texts) + + if len(parsed_html.cleaned_text) < 700: + parsed_html.cleaned_text = iframe_content + else: + parsed_html.cleaned_text += "\n" + iframe_content + except Exception as e: + logger.warning("Error processing iframes: %s", str(e)) + + return parsed_html.cleaned_text + + +def scrape_url_content( + url: str, + scroll_before_scraping: bool = False, + timeout_ms: int = DEFAULT_TIMEOUT, + context: Optional[BrowserContext] = None, +) -> Optional[str]: + """Scrape `url` via headless Chromium and return cleaned page + text, or None on any failure. + + If `context` is provided (the production path from + HighspotConnector.poll_source), a new page/tab is opened in that + shared context and closed after use — no Chromium spawn cost. + + If `context` is None (the smoke-test path), this falls back to + the spawn-per-call behavior so isolated callers still work. Do + NOT use this fallback path inside the indexing loop: it costs a + Chromium process per call and starves co-running connectors. + """ + try: + validate_url(url) + except ValueError as e: + logger.error("Invalid URL %s: %s", url, str(e)) + return None + + if context is not None: + page = None + try: + page = context.new_page() + return _scrape_using_page( + page=page, + url=url, + scroll_before_scraping=scroll_before_scraping, + timeout_ms=timeout_ms, + ) + except Exception as e: + logger.error("Error scraping URL %s: %s", url, str(e)) + return None + finally: + if page is not None: + try: + page.close() + except Exception as e: + logger.debug("Error closing page: %s", str(e)) + # unreachable + return None + + # Standalone path: spawn-then-teardown. + playwright = None + browser = None + try: + playwright = sync_playwright().start() + browser = playwright.chromium.launch(headless=True) + context_local = browser.new_context() + page = context_local.new_page() + return _scrape_using_page( + page=page, + url=url, + scroll_before_scraping=scroll_before_scraping, + timeout_ms=timeout_ms, + ) + except Exception as e: + logger.error("Error scraping URL %s: %s", url, str(e)) + return None + finally: + if browser is not None: + try: + browser.close() + except Exception as e: + logger.debug("Error closing browser: %s", str(e)) + if playwright is not None: + try: + playwright.stop() + except Exception as e: + logger.debug("Error stopping playwright: %s", str(e)) + + +def validate_url(url: str) -> None: + """Validate that `url` has http(s) scheme + a hostname.""" + parse = urlparse(url) + if parse.scheme != "http" and parse.scheme != "https": + raise ValueError("URL must be of scheme https?://") + + if not parse.hostname: + raise ValueError("URL must include a hostname") diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index 78e4df06a45..799083eaaa9 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -342,6 +342,51 @@ def admin_google_drive_auth( return AuthUrl(auth_url=get_auth_url(credential_id=int(credential_id))) +class HighspotSpotResponse(BaseModel): + id: str + name: str + + +@router.get("/admin/connector/highspot/spots/{credential_id}") +def list_highspot_spots( + credential_id: int, + user: User = Depends(current_admin_user), + db_session: Session = Depends(get_session), +) -> list[HighspotSpotResponse]: + """Fetch the list of Spots visible to the given Highspot credential + so the admin UI can render a multi-select instead of a free-text + array input. Failures bubble up as 4xx/5xx with the upstream error + so the form can show a useful message.""" + from danswer.connectors.highspot.client import HighspotAuthenticationError + from danswer.connectors.highspot.client import HighspotClient + from danswer.connectors.highspot.client import HighspotClientError + + cred = fetch_credential_by_id(credential_id, user, db_session) + if cred is None: + raise HTTPException(status_code=404, detail="Credential not found") + cj = cred.credential_json or {} + if not cj.get("highspot_key") or not cj.get("highspot_secret"): + raise HTTPException( + status_code=400, + detail="Credential does not look like a Highspot credential.", + ) + base_url = cj.get("highspot_url") or HighspotClient.BASE_URL + try: + client = HighspotClient( + key=cj["highspot_key"], secret=cj["highspot_secret"], base_url=base_url + ) + return [ + HighspotSpotResponse(id=s["id"], name=s.get("title", "")) + for s in client.get_spots() + ] + except HighspotAuthenticationError as e: + raise HTTPException( + status_code=401, detail=f"Highspot authentication failed: {e}" + ) + except HighspotClientError as e: + raise HTTPException(status_code=502, detail=f"Highspot API error: {e}") + + @router.post("/admin/connector/file/upload") def upload_files( files: list[UploadFile], diff --git a/web/public/Highspot.png b/web/public/Highspot.png new file mode 100644 index 00000000000..e4112aecd0d Binary files /dev/null and b/web/public/Highspot.png differ diff --git a/web/src/app/admin/bot/SlackBotConfigCreationForm.tsx b/web/src/app/admin/bot/SlackBotConfigCreationForm.tsx index 55497149ea0..032e47aad00 100644 --- a/web/src/app/admin/bot/SlackBotConfigCreationForm.tsx +++ b/web/src/app/admin/bot/SlackBotConfigCreationForm.tsx @@ -167,8 +167,21 @@ export const SlackBotCreationForm = ({ }), curated_response_config: Yup.object().shape({ enable_curated_response_integration: Yup.boolean().required(), - response_message: Yup.string().required( - "Response message is required when curated response integration is enabled" + // Mirror jira_config: only require this when the integration + // is enabled. Without the .when() guard the field is required + // unconditionally, but the UI hides the input when the toggle + // is off — Formik silently rejects submit and no error is + // visible since the (errored) field isn't on screen. + response_message: Yup.string().when( + "enable_curated_response_integration", + { + is: true, + then: (schema) => + schema.required( + "Response message is required when curated response integration is enabled" + ), + otherwise: (schema) => schema.notRequired(), + } ), }), jira_title_filter: Yup.array() diff --git a/web/src/app/admin/connectors/highspot/page.tsx b/web/src/app/admin/connectors/highspot/page.tsx new file mode 100644 index 00000000000..6d292b8c5ba --- /dev/null +++ b/web/src/app/admin/connectors/highspot/page.tsx @@ -0,0 +1,443 @@ +"use client"; + +import { useState } from "react"; +import * as Yup from "yup"; +import { useFormikContext } from "formik"; +import { FiPlus, FiX } from "react-icons/fi"; +import { EditIcon, HighspotIcon, TrashIcon } from "@/components/icons/icons"; +import { errorHandlingFetcher as fetcher } from "@/lib/fetcher"; +import useSWR, { useSWRConfig } from "swr"; +import { LoadingAnimation } from "@/components/Loading"; +import { HealthCheckBanner } from "@/components/health/healthcheck"; +import { Button, Card, Divider, Text, Title } from "@tremor/react"; +import { + HighspotConfig, + HighspotCredentialJson, + ConnectorIndexingStatus, + Credential, +} from "@/lib/types"; +import { adminDeleteCredential, linkCredential } from "@/lib/credential"; +import { CredentialForm } from "@/components/admin/connectors/CredentialForm"; +import { TextFormField } from "@/components/admin/connectors/Field"; +import { ConnectorsTable } from "@/components/admin/connectors/table/ConnectorsTable"; +import { ConnectorForm } from "@/components/admin/connectors/ConnectorForm"; +import { usePublicCredentials } from "@/lib/hooks"; +import { AdminPageTitle } from "@/components/admin/Title"; +import { SearchMultiSelectDropdown } from "@/components/Dropdown"; + +interface HighspotSpotResponse { + id: string; + name: string; +} + +/** Multi-select for Highspot Spot names. Reads/writes + * `spot_names: string[]` on the surrounding Formik form via + * `useFormikContext`. Fetches the live list of Spots from the + * Highspot API (server-side route), and renders selected chips + + * a searchable dropdown of unselected ones. Mandatory: form-level + * yup validation enforces `min(1)`. */ +const HighspotSpotsMultiSelect = ({ + credentialId, +}: { + credentialId: number; +}) => { + const { values, setFieldValue, errors, touched } = + useFormikContext(); + const selected = values.spot_names ?? []; + + const { + data: spots, + isLoading, + error, + } = useSWR( + `/api/manage/admin/connector/highspot/spots/${credentialId}`, + fetcher + ); + + const showError = touched.spot_names && errors.spot_names; + + return ( +
+ +

+ Select one or more Spots to index. The list comes live from Highspot + using the credential above. Re-open the page if you add new Spots in + Highspot and want them to appear here. +

+ + {/* Selected chips */} + {selected.length > 0 && ( +
+ {selected.map((name) => ( +
+ setFieldValue( + "spot_names", + selected.filter((n) => n !== name) + ) + } + > + {name} + +
+ ))} +
+ )} + + {/* Loading / error / dropdown */} + {isLoading ? ( + Loading Spots from Highspot… + ) : error ? ( + + Failed to load Spots from Highspot:{" "} + {error?.info?.detail || error?.message || "unknown error"}. Verify the + credentials in Step 1. + + ) : ( + (() => { + const available = (spots ?? []) + .filter((s) => !selected.includes(s.name)) + .map((s) => ({ + name: s.name, + value: s.name, + metadata: { spotId: s.id }, + })); + if ( + available.length === 0 && + selected.length === (spots?.length ?? 0) + ) { + return ( + + All available Spots are selected. + + ); + } + return ( + { + const name = String(option.value); + if (!selected.includes(name)) { + setFieldValue("spot_names", [...selected, name]); + } + }} + itemComponent={({ option }) => ( +
+
{option.name}
+
+ +
+
+ )} + /> + ); + })() + )} + + {showError && ( + + {String(errors.spot_names)} + + )} +
+ ); +}; + +const MainSection = () => { + const { mutate } = useSWRConfig(); + const [isEditingCredential, setIsEditingCredential] = useState(false); + + const { + data: connectorIndexingStatuses, + isLoading: isConnectorIndexingStatusesLoading, + error: isConnectorIndexingStatusesError, + } = useSWR[]>( + "/api/manage/admin/connector/indexing-status", + fetcher + ); + + const { + data: credentialsData, + isLoading: isCredentialsLoading, + error: isCredentialsError, + refreshCredentials, + } = usePublicCredentials(); + + if ( + (!connectorIndexingStatuses && isConnectorIndexingStatusesLoading) || + (!credentialsData && isCredentialsLoading) + ) { + return ; + } + + if (isConnectorIndexingStatusesError || !connectorIndexingStatuses) { + return
Failed to load connectors
; + } + + if (isCredentialsError || !credentialsData) { + return
Failed to load credentials
; + } + + const highspotConnectorIndexingStatuses: ConnectorIndexingStatus< + HighspotConfig, + HighspotCredentialJson + >[] = connectorIndexingStatuses.filter( + (connectorIndexingStatus) => + connectorIndexingStatus.connector.source === "highspot" + ); + + // Discriminator field is highspot_key — there's only one credential + // shape for Highspot in this fork. + const highspotCredential: Credential | undefined = + credentialsData.find( + (credential) => credential.credential_json?.highspot_key + ); + + return ( + <> + + The Highspot connector indexes Spots and the Items inside them via + Highspot's REST API. WebLink items are scraped via headless + Chromium; downloadable file items (PDF / DOCX / PPTX / XLSX / EML / EPUB + / HTML / TXT) are extracted to text. + + + + Step 1: Provide Highspot credentials + + {highspotCredential ? ( + <> +
+ Existing Highspot Key: + + {highspotCredential.credential_json.highspot_key} + + + +
+ {isEditingCredential && ( + + + Update the Highspot key/secret. The change is saved against the + existing credential, so all linked Highspot connectors pick it + up on their next poll. + + + existingCredentialId={highspotCredential.id} + formBody={ + <> + + + + + } + validationSchema={Yup.object().shape({ + highspot_key: Yup.string().required( + "Please enter your Highspot key" + ), + highspot_secret: Yup.string().required( + "Please enter your Highspot secret" + ), + highspot_url: Yup.string().optional(), + })} + initialValues={{ + highspot_key: + highspotCredential.credential_json.highspot_key || "", + highspot_secret: + highspotCredential.credential_json.highspot_secret || "", + highspot_url: + highspotCredential.credential_json.highspot_url || "", + }} + onSubmit={(isSuccess) => { + if (isSuccess) { + setIsEditingCredential(false); + refreshCredentials(); + } + }} + extraActions={ + + } + /> + + )} + + ) : ( + <> + + Provide a Highspot API key + secret pair generated from the Highspot + admin console (Settings → API Access). The optional base URL is for + tenants on a non-default Highspot region. + + + + formBody={ + <> + + + + + } + validationSchema={Yup.object().shape({ + highspot_key: Yup.string().required( + "Please enter your Highspot key" + ), + highspot_secret: Yup.string().required( + "Please enter your Highspot secret" + ), + highspot_url: Yup.string().optional(), + })} + initialValues={{ + highspot_key: "", + highspot_secret: "", + highspot_url: "", + }} + onSubmit={(isSuccess) => { + if (isSuccess) { + refreshCredentials(); + } + }} + /> + + + )} + + + Step 2: Which Spots do you want to index? + + + {highspotConnectorIndexingStatuses.length > 0 && ( + <> + + We pull the latest items from each Spot listed below every{" "} + 10 minutes. + +
+ + connectorIndexingStatuses={highspotConnectorIndexingStatuses} + liveCredential={highspotCredential} + getCredential={(credential) => + credential.credential_json.highspot_key + } + specialColumns={[ + { + header: "Spots", + key: "spots", + getValue: (ccPairStatus) => { + const cfg = + ccPairStatus.connector.connector_specific_config; + return cfg.spot_names && cfg.spot_names.length > 0 + ? cfg.spot_names.join(", ") + : "(all)"; + }, + }, + ]} + onUpdate={() => + mutate("/api/manage/admin/connector/indexing-status") + } + onCredentialLink={async (connectorId) => { + if (highspotCredential) { + await linkCredential(connectorId, highspotCredential.id); + mutate("/api/manage/admin/connector/indexing-status"); + } + }} + /> +
+ + + )} + + {highspotCredential ? ( + +

Connect to a New Highspot Tenant

+ + nameBuilder={(values) => + `HighspotConnector-${(values.spot_names ?? []).join("_")}` + } + source="highspot" + inputType="poll" + formBodyBuilder={() => ( + + )} + validationSchema={Yup.object().shape({ + // Mandatory: pick at least one Spot. Empty/no-spot + // configs would index every Spot the credential can + // see, which is rarely what an admin actually wants + // and is a big enough blast radius (Spot count can be + // hundreds) that we don't allow it from the UI. + spot_names: Yup.array() + .of(Yup.string().required("Spot name cannot be empty")) + .min(1, "Please select at least one Spot") + .required("Please select at least one Spot"), + })} + initialValues={{ + spot_names: [], + }} + credentialId={highspotCredential.id} + refreshFreq={10 * 60} // 10 minutes default + /> +
+ ) : ( + + Please provide your Highspot key + secret in Step 1 first! Once done + with that, you can specify which Spots you want to index. + + )} + + ); +}; + +export default function Page() { + return ( +
+
+ +
+ + } title="Highspot" /> + + +
+ ); +} diff --git a/web/src/components/icons/icons.tsx b/web/src/components/icons/icons.tsx index c33a7069076..2508ec637a8 100644 --- a/web/src/components/icons/icons.tsx +++ b/web/src/components/icons/icons.tsx @@ -67,6 +67,7 @@ import dropboxIcon from "../../../public/Dropbox.png"; import s3Icon from "../../../public/S3.png"; import r2Icon from "../../../public/r2.webp"; import salesforceIcon from "../../../public/Salesforce.png"; +import highspotIcon from "../../../public/Highspot.png"; import sharepointIcon from "../../../public/Sharepoint.png"; import teamsIcon from "../../../public/Teams.png"; import mediawikiIcon from "../../../public/MediaWiki.svg"; @@ -655,6 +656,18 @@ export const SalesforceIcon = ({ ); +export const HighspotIcon = ({ + size = 16, + className = defaultTailwindCSS, +}: IconProps) => ( +
+ Logo +
+); + export const R2Icon = ({ size = 16, className = defaultTailwindCSS, diff --git a/web/src/lib/sources.ts b/web/src/lib/sources.ts index 62ff0b707ee..7af9078b221 100644 --- a/web/src/lib/sources.ts +++ b/web/src/lib/sources.ts @@ -15,6 +15,7 @@ import { GoogleDriveIcon, GoogleSitesIcon, GuruIcon, + HighspotIcon, HubSpotIcon, JiraIcon, LinearIcon, @@ -151,6 +152,11 @@ const SOURCE_METADATA_MAP: SourceMap = { displayName: "HubSpot", category: SourceCategory.AppConnection, }, + highspot: { + icon: HighspotIcon, + displayName: "Highspot", + category: SourceCategory.AppConnection, + }, document360: { icon: Document360Icon, displayName: "Document360", diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index dd7f78ae88a..dd5adb9685c 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -65,7 +65,8 @@ export type ValidSources = | "s3" | "r2" | "google_cloud_storage" - | "oci_storage"; + | "oci_storage" + | "highspot"; export type ValidInputTypes = "load_state" | "poll" | "event"; export type ValidStatuses = @@ -163,6 +164,10 @@ export interface SfKbArticlesConfig { requested_objects?: string[]; } +export interface HighspotConfig { + spot_names?: string[]; +} + export interface SharepointConfig { sites?: string[]; } @@ -366,6 +371,12 @@ export interface ProductboardCredentialJson { productboard_access_token: string; } +export interface HighspotCredentialJson { + highspot_key: string; + highspot_secret: string; + highspot_url?: string; +} + export interface SlackCredentialJson { slack_bot_token: string; }