Skip to content
This repository was archived by the owner on Jun 3, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,36 @@ PINECONE_REGION=us-east-1
# EMBEDDING_MODEL=all-MiniLM-L6-v2
EMBEDDING_MODEL=gemini-embedding-001

# =============================================================================
# Original Storage + v2 Hybrid Search (optional, v2 only)
# =============================================================================
ORIGINAL_STORAGE_ENABLED=false
ORIGINAL_STORAGE_PROVIDER=s3
ORIGINAL_STORAGE_FAIL_CLOSED=false
ORIGINAL_STORAGE_TIMEOUT_SECONDS=180

ORIGINAL_S3_BUCKET=
ORIGINAL_S3_REGION=us-east-1
ORIGINAL_S3_PREFIX=originals
ORIGINAL_S3_ENDPOINT_URL=
ORIGINAL_S3_KMS_KEY_ID=
ORIGINAL_S3_MULTIPART_THRESHOLD_BYTES=8388608
ORIGINAL_S3_MULTIPART_CHUNK_BYTES=8388608

ORIGINAL_CHUNK_SIZE_TOKENS=350
ORIGINAL_CHUNK_OVERLAP_TOKENS=40
ORIGINAL_INDEX_BATCH_SIZE=64
ORIGINAL_EMBED_CONCURRENCY=4
ORIGINAL_INDEX_CONCURRENCY=2
ORIGINAL_BATCH_ITEM_CONCURRENCY=3
ORIGINAL_MAX_BYTES=10485760
ORIGINAL_INCLUDE_AGENT_RESPONSE=true
ORIGINAL_INCLUDE_IMAGE_URL=false

HYBRID_SEARCH_MEMORY_TOP_K=10
HYBRID_SEARCH_ORIGINAL_TOP_K=10
HYBRID_SEARCH_MIN_SCORE=0.0

# =============================================================================
# Database Configuration
# =============================================================================
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- Add v2-only original document preservation in S3 with indexed original chunks and hybrid memory search.
- Add modular Razorpay billing, credit wallets, ledger reservations, and v2 memory workflow metering.
- Add durable Temporal-backed v2 memory and scanner workflow APIs with job status, retry, cancel, and dead-letter endpoints.
- Add modular LoCoMo and BEAM benchmark runners for the Python XMem API.
Expand Down
12 changes: 12 additions & 0 deletions src/api/routes/v2/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from src.billing.context import use_billing_context
from src.billing.service import commit_job_billing, release_job_billing
from src.jobs.durable import get_default_job_store
from src.storage.original import preserve_original

try: # pragma: no cover - no-op fallback keeps imports working without SDK.
from temporalio import activity
Expand Down Expand Up @@ -179,6 +180,16 @@ async def memory_run_pipeline_activity(payload: Dict[str, Any]) -> Dict[str, Any
return await memory_v1._run_ingest_payload(payload, payload["user_id"])


@activity.defn
async def memory_store_original_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
pipeline = get_ingest_pipeline()
return await preserve_original(
payload,
vector_store=pipeline.vector_store,
embed_fn=pipeline.embed_fn,
)


@activity.defn
async def memory_scrape_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
def _run_scrape() -> Dict[str, Any]:
Expand Down Expand Up @@ -286,6 +297,7 @@ async def scanner_phase2_activity(payload: Dict[str, Any]) -> Dict[str, Any]:
memory_classify_activity,
memory_domain_activity,
memory_run_pipeline_activity,
memory_store_original_activity,
memory_scrape_activity,
scanner_scan_activity,
scanner_phase2_activity,
Expand Down
128 changes: 126 additions & 2 deletions src/api/routes/v2/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse

from src.api.dependencies import enforce_rate_limit, require_api_key, require_ready
from src.api.dependencies import (
enforce_rate_limit,
get_retrieval_pipeline,
require_api_key,
require_ready,
)
from src.api.routes import memory as memory_v1
from src.api.routes.v2.shared import (
_error,
Expand All @@ -18,10 +23,20 @@
read_user_job,
)
from src.api.routes.v2.temporal_client import start_job_workflow
from src.api.schemas import APIResponse, BatchIngestRequest, IngestRequest, ScrapeRequest, StatusEnum
from src.api.schemas import (
APIResponse,
BatchIngestRequest,
HybridSearchRequest,
HybridSearchResponse,
IngestRequest,
ScrapeRequest,
SourceRecord,
StatusEnum,
)
from src.billing import InsufficientCredits, get_default_billing_service
from src.config import settings
from src.jobs.durable import QUEUED, get_default_job_store, idempotency_key, new_attempt_id, stable_hash
from src.storage.original import ORIGINAL_CHUNK_DOMAIN, original_config_snapshot

router = APIRouter(
prefix="/v2/memory",
Expand All @@ -44,6 +59,18 @@ def _durable_job_id(job_type: str, fields: Dict[str, Any]) -> str:
return f"{job_type}:{idempotency_key(job_type, fields)}"


def _attach_original_storage_config(payload: Dict[str, Any]) -> None:
payload["original_storage_enabled"] = bool(settings.original_storage_enabled)
payload["original_storage_fail_closed"] = bool(settings.original_storage_fail_closed)
payload["original_storage_timeout_seconds"] = float(
settings.original_storage_timeout_seconds
)
payload["original_batch_item_concurrency"] = int(
settings.original_batch_item_concurrency
)
payload["original_config"] = original_config_snapshot()


class WorkflowStartFailed(RuntimeError):
def __init__(self, job: Dict[str, Any], error: str) -> None:
super().__init__(error)
Expand Down Expand Up @@ -122,6 +149,7 @@ async def ingest_memory_v2(req: IngestRequest, request: Request, user: dict = De
payload = req.model_dump()
payload["user_id"] = user_id
payload["timeout_seconds"] = float(settings.memory_ingest_timeout_seconds)
_attach_original_storage_config(payload)
idempotency_fields = {
"user_id": user_id,
"org_id": payload.get("org_id", "default"),
Expand All @@ -132,6 +160,7 @@ async def ingest_memory_v2(req: IngestRequest, request: Request, user: dict = De
"image_url": req.image_url,
"effort_level": req.effort_level,
}),
"original_storage_enabled": bool(settings.original_storage_enabled),
}
job_id = _durable_job_id("memory_ingest", idempotency_fields)
billing_service = get_default_billing_service()
Expand Down Expand Up @@ -217,9 +246,11 @@ async def batch_ingest_memory_v2(req: BatchIngestRequest, request: Request, user
min(len(req.items) * float(settings.memory_ingest_timeout_seconds), 3600.0),
),
}
_attach_original_storage_config(payload)
idempotency_fields = {
"user_id": user_id,
"content_hash": _content_hash({"items": items}),
"original_storage_enabled": bool(settings.original_storage_enabled),
}
job_id = _durable_job_id("memory_batch_ingest", idempotency_fields)
billing_service = get_default_billing_service()
Expand Down Expand Up @@ -278,6 +309,99 @@ async def batch_ingest_memory_v2(req: BatchIngestRequest, request: Request, user
return _error(request, str(exc), 500, elapsed_ms(start))


async def _search_original_chunks(
pipeline,
query: str,
user_id: str,
top_k: int,
) -> list[SourceRecord]:
raw = await pipeline.vector_store.search_by_text(
query_text=query,
top_k=top_k,
filters={"user_id": user_id, "domain": ORIGINAL_CHUNK_DOMAIN},
)
results: list[SourceRecord] = []
for item in raw:
score = float(item.score or 0.0)
if score < float(settings.hybrid_search_min_score):
continue
results.append(
SourceRecord(
domain=ORIGINAL_CHUNK_DOMAIN,
content=item.content,
score=round(score, 3),
metadata={"id": item.id, **item.metadata},
)
)
return results


@router.post(
"/hybrid-search",
response_model=APIResponse,
summary="v2-only hybrid search across extracted memories and original chunks",
)
async def hybrid_search_memory_v2(
req: HybridSearchRequest,
request: Request,
user: dict = Depends(require_api_key),
):
start = time.perf_counter()
pipeline = get_retrieval_pipeline()
user_id = memory_v1._current_user_id(user, req.user_id)
memory_top_k = req.memory_top_k or int(settings.hybrid_search_memory_top_k)
original_top_k = req.original_top_k or int(settings.hybrid_search_original_top_k)

try:
memory_results: list[SourceRecord] = []
if "profile" in req.domains:
profile_results = await asyncio.to_thread(
memory_v1._search_profile,
pipeline,
user_id,
)
memory_results.extend(profile_results)
if "temporal" in req.domains:
temporal_results = await asyncio.to_thread(
memory_v1._search_temporal,
pipeline,
req.query,
user_id,
memory_top_k,
)
memory_results.extend(temporal_results)
if "summary" in req.domains:
memory_results.extend(
await memory_v1._search_summary(
pipeline,
req.query,
user_id,
memory_top_k,
)
)

original_chunks: list[SourceRecord] = []
if req.include_original_chunks and settings.original_storage_enabled:
original_chunks = await _search_original_chunks(
pipeline,
req.query,
user_id,
original_top_k,
)

all_results = memory_results + original_chunks
data = HybridSearchResponse(
memory_results=memory_results,
original_chunks=original_chunks,
results=all_results,
total=len(all_results),
original_storage_enabled=bool(settings.original_storage_enabled),
)
return _wrap(request, data, elapsed_ms(start))
except Exception as exc:
return _error(request, str(exc), 500, elapsed_ms(start))


@scrape_router.post("/scrape", response_model=APIResponse, summary="Start an async durable scrape job")
async def scrape_chat_link_v2(req: ScrapeRequest, request: Request):
start = time.perf_counter()
Expand Down
Loading
Loading