Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions agent_core/core/impl/action/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ async def select_action(
# Build the instruction prompt for the LLM
full_prompt = SELECT_ACTION_PROMPT.format(
event_stream=self.context_engine.get_event_stream(),
memory_context=self.context_engine.get_memory_context(query),
query=query,
action_candidates=self._format_candidates(action_candidates),
integration_essentials=integration_essentials,
Expand Down Expand Up @@ -255,9 +254,6 @@ async def select_action_in_task(

# Build the instruction prompt for the LLM
task_state = self.context_engine.get_task_state(session_id=session_id)
memory_context = self.context_engine.get_memory_context(
query, session_id=session_id
)
event_stream_content = self.context_engine.get_event_stream(
session_id=session_id
)
Expand Down Expand Up @@ -290,15 +286,13 @@ async def select_action_in_task(
decision_prompt_name = "SELECT_ACTION_IN_TASK"
static_prompt = SELECT_ACTION_IN_TASK_PROMPT.format(
task_state=task_state,
memory_context=memory_context,
event_stream="", # Empty for static prompt
query=query,
action_candidates=self._format_candidates(action_candidates),
integration_essentials=integration_essentials,
)
full_prompt = SELECT_ACTION_IN_TASK_PROMPT.format(
task_state=task_state,
memory_context=memory_context,
event_stream=event_stream_content,
query=query,
action_candidates=self._format_candidates(action_candidates),
Expand Down Expand Up @@ -407,9 +401,6 @@ async def select_action_in_simple_task(

# Build the instruction prompt
task_state = self.context_engine.get_task_state(session_id=session_id)
memory_context = self.context_engine.get_memory_context(
query, session_id=session_id
)
event_stream_content = self.context_engine.get_event_stream(
session_id=session_id
)
Expand Down Expand Up @@ -439,7 +430,6 @@ async def select_action_in_simple_task(
static_prompt = SELECT_ACTION_IN_SIMPLE_TASK_PROMPT.format(
agent_state=self.context_engine.get_agent_state(session_id=session_id),
task_state=task_state,
memory_context=memory_context,
event_stream="", # Empty for static prompt
query=query,
action_candidates=self._format_candidates(action_candidates),
Expand All @@ -448,7 +438,6 @@ async def select_action_in_simple_task(
full_prompt = SELECT_ACTION_IN_SIMPLE_TASK_PROMPT.format(
agent_state=self.context_engine.get_agent_state(session_id=session_id),
task_state=task_state,
memory_context=memory_context,
event_stream=event_stream_content,
query=query,
action_candidates=self._format_candidates(action_candidates),
Expand Down Expand Up @@ -552,9 +541,6 @@ async def select_action_in_GUI(

# Build the instruction prompt for the LLM
task_state = self.context_engine.get_task_state(session_id=session_id)
memory_context = self.context_engine.get_memory_context(
query, session_id=session_id
)
event_stream_content = self.context_engine.get_event_stream(
session_id=session_id
)
Expand All @@ -563,14 +549,12 @@ async def select_action_in_GUI(
agent_state=self.context_engine.get_agent_state(session_id=session_id),
task_state=task_state,
event_stream="", # Empty for static prompt
memory_context=memory_context,
gui_action_space=GUI_ACTION_SPACE_PROMPT,
)
full_prompt = SELECT_ACTION_IN_GUI_PROMPT.format(
agent_state=self.context_engine.get_agent_state(session_id=session_id),
task_state=task_state,
event_stream=event_stream_content,
memory_context=memory_context,
gui_action_space=GUI_ACTION_SPACE_PROMPT,
)

Expand Down
145 changes: 0 additions & 145 deletions agent_core/core/impl/context/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@
from agent_core.core.state import get_state, get_session_or_none


# Import memory mode check (deferred to avoid circular imports)
def _is_memory_enabled() -> bool:
"""Check if memory mode is enabled. Returns True if unknown."""
try:
from app.ui_layer.settings.memory_settings import is_memory_enabled

return is_memory_enabled()
except ImportError:
return True # Default to enabled if settings module not available


# Set up logger - use shared agent_core logger for consistency
from agent_core.utils.logger import logger

Expand Down Expand Up @@ -598,140 +587,6 @@ def get_user_info(self) -> str:
"""Get current user info for user prompts (WCA-specific via hook)."""
return self._get_user_info()

def _build_memory_query(
self, query: Optional[str], session_id: Optional[str]
) -> Optional[str]:
"""Build a semantic query for memory retrieval.

Combines task instruction with recent conversation messages (both user
and agent) to provide better context for memory search.

Args:
query: Optional explicit query string.
session_id: Optional session ID for session-specific state lookup.

Returns:
A query string suitable for semantic memory search, or None if no context.
"""
# Get task instruction as the base query
session = get_session_or_none(session_id)
if session and session.current_task:
task_instruction = session.current_task.instruction
else:
current_task = get_state().current_task
task_instruction = current_task.instruction if current_task else None

if not task_instruction:
# Fall back to explicit query if no task
return query if query else None

# Get recent conversation messages for additional context
recent_context = self._get_recent_conversation_for_memory(session_id, limit=5)

if recent_context:
return f"{task_instruction}\n\nRecent conversation:\n{recent_context}"
else:
return task_instruction

def _get_recent_conversation_for_memory(
self, session_id: Optional[str], limit: int = 5
) -> str:
"""Get recent conversation messages for memory query context.

Args:
session_id: Optional session ID for session-specific event stream.
limit: Maximum number of messages to include.

Returns:
Formatted string of recent user and agent messages.
"""
try:
event_stream_manager = self.state_manager.event_stream_manager
if not event_stream_manager:
return ""

# Get messages from conversation history (includes both user and agent)
recent_messages = event_stream_manager.get_recent_conversation_messages(
limit
)
if not recent_messages:
return ""

# Format messages simply for semantic search
lines = []
for event in recent_messages:
# Simplify the kind label for the query
if "user message" in event.kind:
lines.append(f"User: {event.message}")
elif "agent message" in event.kind:
lines.append(f"Agent: {event.message}")

return "\n".join(lines)

except Exception as e:
logger.warning(f"[MEMORY] Failed to get recent conversation: {e}")
return ""

def get_memory_context(
self,
query: Optional[str] = None,
top_k: int = 5,
session_id: Optional[str] = None,
) -> str:
"""Get relevant memories for inclusion in prompts.

Args:
query: Optional query string for memory retrieval. If not provided,
uses current task instruction combined with recent conversation.
top_k: Number of top memories to retrieve.
session_id: Optional session ID for session-specific state lookup.
"""
if not self._memory_manager:
return ""

# Check if memory is enabled in settings
if not _is_memory_enabled():
return ""

# Build semantic query from task instruction + recent conversation
# This provides better context than using the raw trigger description
memory_query = self._build_memory_query(query, session_id)
if not memory_query:
return ""

try:
pointers = self._memory_manager.retrieve(
memory_query, top_k=top_k, min_relevance=0.3
)

if not pointers:
return ""

lines = ["<relevant_memories>"]
lines.append(
"Historical context from previous interactions (verify against current event stream):"
)
lines.append("")

for ptr in pointers:
lines.append(
f"- [{ptr.file_path}] {ptr.section_path}: {ptr.summary} "
f"(relevance: {ptr.relevance_score:.2f})"
)

lines.append("")
lines.append(
"Note: Memories may be outdated. Trust current event stream over memories if they conflict."
)
lines.append("Use memory_search action to retrieve full content if needed.")
lines.append("</relevant_memories>")

return "\n".join(lines)

except Exception as e:
logger.warning(f"[MEMORY] Failed to retrieve memory context: {e}")
return ""

# ──────────────────────── USER MESSAGE COMPONENTS ────────────────────────

def create_user_query(self, query) -> str:
Expand Down
2 changes: 2 additions & 0 deletions agent_core/core/impl/event_stream/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def _is_memory_enabled() -> bool:
"error",
# System events
"waiting_for_user",
# Memory retrieval pointers — re-derivable on demand, not a distillable fact
"relevant_memories",
}


Expand Down
113 changes: 113 additions & 0 deletions agent_core/core/impl/memory/bm25_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
"""
In-memory BM25 keyword index for memory chunks.

Sits alongside ChromaDB to backstop semantic search on terms vector embeddings
struggle with (proper nouns, dates, IDs, code identifiers). The index is fully
rebuilt from the current chunk set on every refresh — at ~200 memory items it
costs <50ms and avoids the complexity of incremental BM25 updates.
"""

from __future__ import annotations

import re
import threading
from typing import Dict, List, Optional, Tuple

try:
from rank_bm25 import BM25Okapi
_HAS_BM25 = True
except ImportError:
BM25Okapi = None
_HAS_BM25 = False

from agent_core.utils.logger import logger


_TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")


def tokenize(text: str) -> List[str]:
"""Lowercase word/number tokenizer. Keeps identifiers intact."""
if not text:
return []
return [t.lower() for t in _TOKEN_RE.findall(text)]


class BM25Index:
"""Thread-safe BM25 index keyed by chunk_id.

On a fresh install or when ``rank_bm25`` is not installed, BM25 retrieval
silently degrades to an empty result set. The MemoryManager then falls back
to pure vector search, so retrieval keeps working — just without the
keyword channel.
"""

def __init__(self) -> None:
self._lock = threading.Lock()
self._chunk_ids: List[str] = []
self._tokenized: List[List[str]] = []
self._bm25: Optional["BM25Okapi"] = None

def rebuild(self, chunks: Dict[str, str]) -> None:
"""Rebuild the index from ``chunk_id -> raw text``.

Args:
chunks: Mapping of chunk_id to the searchable text body.
"""
with self._lock:
self._chunk_ids = list(chunks.keys())
self._tokenized = [tokenize(chunks[cid]) for cid in self._chunk_ids]

if not _HAS_BM25:
self._bm25 = None
return

if not self._tokenized:
self._bm25 = None
return

# rank_bm25 raises on empty docs; replace with a single sentinel token
sanitized = [doc if doc else ["__empty__"] for doc in self._tokenized]
try:
self._bm25 = BM25Okapi(sanitized)
except Exception as e:
logger.warning(f"[BM25Index] Failed to build index: {e}")
self._bm25 = None

def search(self, query: str, top_k: int = 20) -> List[Tuple[str, float]]:
"""Return ``[(chunk_id, score)]`` sorted high-to-low. Empty when index unavailable."""
if not query or not query.strip():
return []

with self._lock:
if self._bm25 is None or not self._chunk_ids:
return []

tokens = tokenize(query)
if not tokens:
return []

try:
scores = self._bm25.get_scores(tokens)
except Exception as e:
logger.warning(f"[BM25Index] Query failed: {e}")
return []

indexed = [
(self._chunk_ids[i], float(scores[i]))
for i in range(len(self._chunk_ids))
if scores[i] > 0
]
indexed.sort(key=lambda x: x[1], reverse=True)
return indexed[:top_k]

@property
def size(self) -> int:
with self._lock:
return len(self._chunk_ids)

@property
def is_available(self) -> bool:
"""True when rank_bm25 is installed AND the index has documents."""
return _HAS_BM25 and self.size > 0
Loading