Conversation
每次写入 OTS 行时在 attribute_columns 中携带 _schema_version 字段, 用于 SDK 写入端与 Core 读取端(funagent-core)的 schema 兼容性协调。 - 在 model.py 中定义 SCHEMA_VERSION_COLUMN 和 6 个表级版本常量 - 修改 __ots_backend_async_template.py 中所有 put_* 方法 - codegen 生成同步方法(ots_backend.py) - 新增 TestSchemaVersionAsync 测试类验证写入正确性 Made-with: Cursor
…for initializing conversation search index - Introduced `init_conversation_search_index_async` and `init_conversation_search_index` methods to create the conversation search index, allowing for repeated calls if the index already exists. - Updated references in `session_store.py` and `ots_backend.py` to utilize the new methods for initializing the conversation search index. This enhancement improves the flexibility and usability of the conversation service's indexing capabilities.
There was a problem hiding this comment.
Pull request overview
This PR introduces per-table OTS schema versioning by writing a _schema_version attribute on writes, and adjusts initialization APIs so LangChain/LangGraph setups create only the Conversation search index.
Changes:
- Add
_schema_versioncolumn constant + per-table schema version constants inconversation_service.model. - Write
_schema_versionon all OTSput_*write paths (session/event/state/checkpoint/writes/blob) inOTSBackend(sync + async). - Add Conversation-only search index init methods and switch
SessionStore.init_langchain_*/init_langgraph_*to use them; add async unit tests asserting_schema_versionis written.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unittests/conversation_service/test_ots_backend.py | Adds helper + async tests asserting _schema_version is written on async put methods. |
| agentrun/conversation_service/session_store.py | Switches LangChain/LangGraph init flows to create only the Conversation search index. |
| agentrun/conversation_service/ots_backend.py | Adds schema version attribute writes and adds Conversation-only search index init methods. |
| agentrun/conversation_service/model.py | Defines _schema_version column + per-table schema version constants and upgrade guidelines. |
| agentrun/conversation_service/__session_store_async_template.py | Updates async template to call Conversation-only search index init for LangChain/LangGraph. |
| agentrun/conversation_service/__ots_backend_async_template.py | Updates async backend template with schema version writes + Conversation-only search index init. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class TestSchemaVersionAsync: | ||
| """验证所有 put_* 方法在写入时携带 _schema_version 列。""" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_put_session_has_schema_version(self) -> None: | ||
| backend = _make_async_backend() | ||
| session = ConversationSession("a", "u", "s", 100, 200) | ||
| await backend.put_session_async(session) | ||
|
|
||
| call_args = backend._async_client.put_row.call_args | ||
| row_arg = call_args[0][1] | ||
| attrs = _extract_attr_columns_dict(row_arg) | ||
| assert attrs[SCHEMA_VERSION_COLUMN] == CONVERSATION_SCHEMA_VERSION |
There was a problem hiding this comment.
已新增 schema_version 写入逻辑并在这里补充了异步侧的断言,但目前 TestSchemaVersionAsync 只覆盖 putasync;同步 put(put_session/put_event/put_state/put_checkpoint/...)同样新增了写入逻辑但缺少对应测试。建议增加一组同步测试(或参数化复用同一断言),以防同步路径回归。
| # 每张表独立计数,用于 SDK 写入端与 Core 读取端(funagent-core)的兼容性协调。 | ||
| # 每次 PutRow 时在 attribute_columns 中写入 _schema_version 字段。 | ||
| # Core 端读取时检查该字段,版本不匹配时打 WARN 日志并尽力解析。 | ||
| # 历史数据(无此字段)视为 v0。 | ||
| # | ||
| # 升级流程: | ||
| # 1. 递增对应表的 *_SCHEMA_VERSION 常量 | ||
| # 2. 在 PR 描述中记录变更的列名/类型/语义 | ||
| # 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量 | ||
| # 4. 如涉及 breaking change,提供数据迁移指引 | ||
| # | ||
| # 兼容性规则: | ||
| # - 只加不删:新增列允许,删除/重命名列视为 breaking change | ||
| # - PK 不可变:主键结构永不改变 | ||
| # - 索引名不可变:Search Index 名称一旦确定不再修改 | ||
| # - 语义不可变:已有列的类型和含义不改变 | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| SCHEMA_VERSION_COLUMN = "_schema_version" | ||
|
|
||
| CONVERSATION_SCHEMA_VERSION = 1 | ||
| EVENT_SCHEMA_VERSION = 1 | ||
| STATE_SCHEMA_VERSION = 1 # state / app_state / user_state 共享 | ||
| CHECKPOINT_SCHEMA_VERSION = 1 |
There was a problem hiding this comment.
该段注释说明“每次 PutRow 时在 attribute_columns 中写入 _schema_version 字段”,但实际写入路径包含 UpdateRow(state)以及 BatchWriteRow(checkpoint_writes)。另外注释写“每张表独立计数”,但 STATE_SCHEMA_VERSION 又被 state/app_state/user_state 三张表共享,表述上不一致。建议补充说明覆盖 put_row/update_row/batch_write_row,并明确 state 三张表共享同一版本(或改为分别计数)。
| # 升级流程: | ||
| # 1. 递增对应表的 *_SCHEMA_VERSION 常量 | ||
| # 2. 在 PR 描述中记录变更的列名/类型/语义 | ||
| # 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量 | ||
| # 4. 如涉及 breaking change,提供数据迁移指引 |
There was a problem hiding this comment.
这里的升级流程要求“在 PR 描述中记录变更的列名/类型/语义”,但当前 PR 描述仍是未填写的模板内容,缺少对新增 _schema_version 列与索引初始化 API 调整的说明。建议补全 PR 描述以便评审与发布时追踪兼容性影响。
|
|
||
| 包含核心表(Conversation + Event + 二级索引)和多元索引。 | ||
| 表或索引已存在时跳过,可重复调用。 | ||
| """ | ||
| await self._backend.init_core_tables_async() | ||
| await self._backend.init_search_index_async() | ||
| await self._backend.init_conversation_search_index_async() |
There was a problem hiding this comment.
init_langchain_tables_async/init_langchain_tables 的实现已改为只创建 Conversation 的 search index(init_conversation_search_index_*),但 docstring 仍笼统描述为“多元索引”。建议在注释中明确仅创建 conversation_search_index(不包含 state_search_index),避免调用方误以为包含 State 相关索引。
| @@ -2561,7 +2572,7 @@ def delete_state_row( | |||
| self._client.delete_row(table_name, row, condition) | |||
|
|
|||
| # ----------------------------------------------------------------------- | |||
| # Checkpoint CRUD(LangGraph)(异步) | |||
| # Checkpoint CRUD(LangGraph)(同步) | |||
| # ----------------------------------------------------------------------- | |||
There was a problem hiding this comment.
这里的分段注释标题与实际方法段落不匹配:delete_state_row(同步)仍属于 State CRUD,但其前置标题被改成了“Checkpoint CRUD(异步)”;紧接着 put_checkpoint_async(异步)前的标题又写成了“Checkpoint CRUD(同步)”。建议修正分段注释,避免阅读/维护时混淆同步与异步以及 State vs Checkpoint 的边界。
- LangChain 异步 init 仅创建 Conversation 多元索引(与同步一致) - model.py:澄清写入 API(PutRow/UpdateRow/BatchWriteRow)与 state 三表共享版本号 - 模板分段注释去掉「异步」后缀,避免 codegen 后标题与代码错位;已 make codegen - 新增 TestSchemaVersionSync 覆盖同步 put_* 的 _schema_version 断言 类型检查:uv run mypy --config-file mypy.ini agentrun tests — 通过,310 个源文件 Made-with: Cursor
Fix bugs
Bug detail
Pull request tasks
Update docs
Reason for update
Pull request tasks
Add contributor
Contributed content
Content detail
Others
Reason for update