Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
453 changes: 453 additions & 0 deletions packages/agentic-server/__tests__/router.test.ts

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion packages/agentic-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
"@types/express": "^5.0.6",
"@types/node": "^22.19.11",
"@types/pg": "^8.20.0",
"@types/supertest": "^7.2.0",
"express": "^5.2.1",
"makage": "^0.3.0"
"makage": "^0.3.0",
"supertest": "^7.2.2"
},
"peerDependencies": {
"express": "^5.0.0"
Expand Down
4 changes: 1 addition & 3 deletions packages/agentic-server/src/billing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
*/

import { Logger } from '@pgpmjs/logger';
import type { ConstructiveContext } from '@constructive-io/express-context';

import type { BillingConfig, InferenceLogConfig } from './discovery';
import type { BillingConfig, ConstructiveContext, InferenceLogConfig } from '@constructive-io/express-context';

const log = new Logger('agentic-server:billing');

Expand Down
78 changes: 38 additions & 40 deletions packages/agentic-server/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import express, { Router, Request, Response } from 'express';
import { Logger } from '@pgpmjs/logger';
import { OllamaAdapter } from '@agentic-kit/ollama';

import type { BillingConfig, InferenceLogConfig } from '@constructive-io/express-context';

import { checkQuota, logInference, recordUsage } from './billing';
import type { InferenceLogEntry } from './billing';
import { getAgentDiscovery, getDatabaseConfig } from './discovery';
import type { AgentDiscovery, BillingConfig, InferenceLogConfig } from './discovery';
import { getEnvOptions } from './env';

const log = new Logger('agentic-server');
Expand Down Expand Up @@ -109,23 +109,18 @@ async function handleCreateThread(
return;
}

if (!ctx.api.dbname) {
res.status(400).json({ error: 'Database not resolved' });
return;
}

const discovery = await getAgentDiscovery(ctx.pool, ctx.api.dbname);
if (!discovery?.thread) {
const agentChat = await ctx.useModule('agentChat');
if (!agentChat?.threadTableName) {
res.status(404).json({ error: 'Agent module not provisioned for this database' });
return;
}

const body: CreateThreadBody = req.body || {};
const { thread } = discovery;
const { schemaName, threadTableName } = agentChat;

const result = await ctx.withPgClient(async (client) => {
const { rows } = await client.query(
`INSERT INTO "${thread.schemaName}"."${thread.tableName}"
`INSERT INTO "${schemaName}"."${threadTableName}"
(entity_id, owner_id, mode, model, system_prompt, title)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, mode, model, system_prompt, status, created_at`,
Expand Down Expand Up @@ -162,13 +157,8 @@ async function handleSendMessage(
return;
}

if (!ctx.api.dbname) {
res.status(400).json({ error: 'Database not resolved' });
return;
}

const discovery = await getAgentDiscovery(ctx.pool, ctx.api.dbname);
if (!discovery?.thread || !discovery?.message) {
const agentChat = await ctx.useModule('agentChat');
if (!agentChat?.threadTableName || !agentChat?.messageTableName) {
res.status(404).json({ error: 'Agent module not provisioned for this database' });
return;
}
Expand All @@ -179,15 +169,15 @@ async function handleSendMessage(
return;
}

const { thread, message: msgTable } = discovery;
const { schemaName, threadTableName, messageTableName } = agentChat;
const threadId = req.params.thread_id;
const userId = ctx.userId;

// Verify thread exists (RLS enforced)
const threadRow = await ctx.withPgClient(async (client) => {
const { rows } = await client.query(
`SELECT id, mode, model, system_prompt, status
FROM "${thread.schemaName}"."${thread.tableName}"
FROM "${schemaName}"."${threadTableName}"
WHERE id = $1`,
[threadId],
);
Expand All @@ -199,10 +189,12 @@ async function handleSendMessage(
return;
}

// Resolve billing + inference log config
const dbConfig = ctx.databaseId
? await getDatabaseConfig(ctx.pool, ctx.databaseId)
: { billing: null, inferenceLog: null };
// Resolve billing + inference log config via loaders
const [billing, inferenceLog] = await Promise.all([
ctx.useModule('billing'),
ctx.useModule('inferenceLog'),
]);
const dbConfig = { billing: billing ?? null, inferenceLog: inferenceLog ?? null };

const ollama = resolveOllamaAdapter();
if (!ollama) {
Expand Down Expand Up @@ -231,9 +223,9 @@ async function handleSendMessage(
for (const msg of body.messages) {
if (msg.role === 'user') {
await client.query(
`INSERT INTO "${msgTable.schemaName}"."${msgTable.tableName}"
`INSERT INTO "${schemaName}"."${messageTableName}"
(thread_id, owner_id, entity_id, author_role, parts)
VALUES ($1, $2, (SELECT entity_id FROM "${thread.schemaName}"."${thread.tableName}" WHERE id = $1), $3, $4)`,
VALUES ($1, $2, (SELECT entity_id FROM "${schemaName}"."${threadTableName}" WHERE id = $1), $3, $4)`,
[threadId, userId, 'user', JSON.stringify([{ type: 'text', text: msg.content }])],
);
}
Expand All @@ -244,7 +236,7 @@ async function handleSendMessage(
const history = await ctx.withPgClient(async (client) => {
const { rows } = await client.query(
`SELECT author_role, parts, created_at
FROM "${msgTable.schemaName}"."${msgTable.tableName}"
FROM "${schemaName}"."${messageTableName}"
WHERE thread_id = $1
ORDER BY created_at ASC`,
[threadId],
Expand Down Expand Up @@ -277,13 +269,15 @@ async function handleSendMessage(
await handleStreamingResponse(req, res, {
ctx, ollama, model, llmMessages, body,
entityId, userId, threadId,
thread, msgTable, dbConfig, startTime, meterSlug,
schemaName, threadTableName, messageTableName,
dbConfig, startTime, meterSlug,
});
} else {
await handleBatchResponse(req, res, {
ctx, ollama, model, llmMessages, body,
entityId, userId, threadId,
thread, msgTable, dbConfig, startTime, meterSlug,
schemaName, threadTableName, messageTableName,
dbConfig, startTime, meterSlug,
});
}
}
Expand All @@ -297,8 +291,9 @@ interface MessageContext {
entityId: string;
userId: string;
threadId: string;
thread: NonNullable<AgentDiscovery['thread']>;
msgTable: NonNullable<AgentDiscovery['message']>;
schemaName: string;
threadTableName: string;
messageTableName: string;
dbConfig: { billing: BillingConfig | null; inferenceLog: InferenceLogConfig | null };
startTime: number;
meterSlug: string;
Expand All @@ -309,7 +304,7 @@ async function handleStreamingResponse(
res: Response,
mc: MessageContext,
): Promise<void> {
const { ctx, ollama, model, llmMessages, body, entityId, userId, threadId, thread, msgTable, dbConfig, startTime, meterSlug } = mc;
const { ctx, ollama, model, llmMessages, body, entityId, userId, threadId, schemaName, threadTableName, messageTableName, dbConfig, startTime, meterSlug } = mc;

res.writeHead(200, {
'Content-Type': 'text/event-stream',
Expand Down Expand Up @@ -372,9 +367,9 @@ async function handleStreamingResponse(
if (content) {
ctx.withPgClient(async (client) => {
await client.query(
`INSERT INTO "${msgTable.schemaName}"."${msgTable.tableName}"
`INSERT INTO "${schemaName}"."${messageTableName}"
(thread_id, owner_id, entity_id, author_role, parts, model)
VALUES ($1, $2, (SELECT entity_id FROM "${thread.schemaName}"."${thread.tableName}" WHERE id = $1), $3, $4, $5)`,
VALUES ($1, $2, (SELECT entity_id FROM "${schemaName}"."${threadTableName}" WHERE id = $1), $3, $4, $5)`,
[threadId, userId, 'assistant', JSON.stringify([{ type: 'text', text: content }]), model],
);
}).catch((err) => log.error('Failed to persist assistant message:', err));
Expand Down Expand Up @@ -416,7 +411,7 @@ async function handleBatchResponse(
res: Response,
mc: MessageContext,
): Promise<void> {
const { ctx, ollama, model, llmMessages, body, entityId, userId, threadId, thread, msgTable, dbConfig, startTime, meterSlug } = mc;
const { ctx, ollama, model, llmMessages, body, entityId, userId, threadId, schemaName, threadTableName, messageTableName, dbConfig, startTime, meterSlug } = mc;

const systemMsg = llmMessages.find(m => m.role === 'system');
const nonSystem = llmMessages.filter(m => m.role !== 'system');
Expand Down Expand Up @@ -451,9 +446,9 @@ async function handleBatchResponse(
// Persist assistant message
await ctx.withPgClient(async (client) => {
await client.query(
`INSERT INTO "${msgTable.schemaName}"."${msgTable.tableName}"
`INSERT INTO "${schemaName}"."${messageTableName}"
(thread_id, owner_id, entity_id, author_role, parts, model)
VALUES ($1, $2, (SELECT entity_id FROM "${thread.schemaName}"."${thread.tableName}" WHERE id = $1), $3, $4, $5)`,
VALUES ($1, $2, (SELECT entity_id FROM "${schemaName}"."${threadTableName}" WHERE id = $1), $3, $4, $5)`,
[threadId, userId, 'assistant', JSON.stringify([{ type: 'text', text: content }]), model],
);
});
Expand Down Expand Up @@ -520,9 +515,12 @@ async function handleEmbed(req: Request, res: Response): Promise<void> {
const model = body.model ?? embedder.model;
const inputs = Array.isArray(body.input) ? body.input : [body.input];

const dbConfig = ctx.databaseId
? await getDatabaseConfig(ctx.pool, ctx.databaseId)
: { billing: null, inferenceLog: null };
// Resolve billing + inference log config via loaders
const [billing, inferenceLog] = await Promise.all([
ctx.useModule('billing'),
ctx.useModule('inferenceLog'),
]);
const dbConfig = { billing: billing ?? null, inferenceLog: inferenceLog ?? null };

// Quota check
if (dbConfig.billing) {
Expand Down
8 changes: 7 additions & 1 deletion packages/express-context/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@

// Types
export type {
AgentChatConfig,
ApiConfigResult,
ApiError,
ApiModule,
ApiStructure,
AuthSettings,
BillingConfig,
BuiltinModuleMap,
ConstructiveAPIToken,
ConstructiveContext,
CorsModuleData,
DatabaseSettings,
GenericModuleData,
InferenceLogConfig,
PublicKeyChallengeData,
PubkeyChallengeSettings,
BuiltinModuleMap,
RlsModule,
WebauthnSettings,
WithPgClient,
Expand Down Expand Up @@ -77,12 +80,15 @@ export type {
ModuleLoader,
} from './loaders';
export {
agentChatLoader,
authSettingsLoader,
billingLoader,
corsLoader,
createDefaultRegistry,
createLoaderRegistry,
createModuleLoader,
databaseSettingsLoader,
inferenceLogLoader,
pubkeyLoader,
rlsLoader,
webauthnLoader,
Expand Down
55 changes: 55 additions & 0 deletions packages/express-context/src/loaders/agent-chat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Agent Chat Module Loader
*
* Resolves per-database agent chat config from metaschema_modules_public.agent_chat_module.
* Returns the schema and table names for threads, messages, and tasks.
*/

import type { AgentChatConfig } from '../types';
import type { LoaderContext, ModuleLoader } from './types';
import { createModuleLoader } from './create-loader';

// ─── SQL ────────────────────────────────────────────────────────────────────

const AGENT_CHAT_MODULE_SQL = `
SELECT
s.schema_name,
acm.thread_table_name,
acm.message_table_name,
acm.task_table_name
FROM metaschema_modules_public.agent_chat_module acm
JOIN metaschema_public.schema s ON s.id = acm.schema_id
LIMIT 1
`;

// ─── Row Types ──────────────────────────────────────────────────────────────

interface AgentChatModuleRow {
schema_name: string;
thread_table_name: string | null;
message_table_name: string | null;
task_table_name: string | null;
}

// ─── Loader ─────────────────────────────────────────────────────────────────

export const agentChatLoader: ModuleLoader<AgentChatConfig> = createModuleLoader<AgentChatConfig>({
name: 'agentChat',
ttlMs: 60_000,
async resolve(ctx: LoaderContext) {
const { tenantPool } = ctx;

const result = await tenantPool.query<AgentChatModuleRow>(
AGENT_CHAT_MODULE_SQL,
);
const row = result.rows[0];
if (!row) return undefined;

return {
schemaName: row.schema_name,
threadTableName: row.thread_table_name,
messageTableName: row.message_table_name,
taskTableName: row.task_table_name,
};
},
});
56 changes: 56 additions & 0 deletions packages/express-context/src/loaders/billing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Billing Module Loader
*
* Resolves per-database billing config from metaschema_modules_public.billing_module.
* Returns schema names and function names needed for quota checks and usage recording.
*/

import type { BillingConfig } from '../types';
import type { LoaderContext, ModuleLoader } from './types';
import { createModuleLoader } from './create-loader';

// ─── SQL ────────────────────────────────────────────────────────────────────

const BILLING_MODULE_SQL = `
SELECT
s.schema_name AS public_schema,
ps.schema_name AS private_schema,
bm.record_usage_function
FROM metaschema_modules_public.billing_module bm
JOIN metaschema_public.schema s ON bm.schema_id = s.id
JOIN metaschema_public.schema ps ON bm.private_schema_id = ps.id
WHERE bm.database_id = $1
LIMIT 1
`;

// ─── Row Types ──────────────────────────────────────────────────────────────

interface BillingModuleRow {
public_schema: string;
private_schema: string;
record_usage_function: string;
}

// ─── Loader ─────────────────────────────────────────────────────────────────

export const billingLoader: ModuleLoader<BillingConfig> = createModuleLoader<BillingConfig>({
name: 'billing',
ttlMs: 5 * 60_000,
async resolve(ctx: LoaderContext) {
const { tenantPool, databaseId } = ctx;

const result = await tenantPool.query<BillingModuleRow>(
BILLING_MODULE_SQL,
[databaseId],
);
const row = result.rows[0];
if (!row?.record_usage_function) return undefined;

return {
publicSchema: row.public_schema,
privateSchema: row.private_schema,
recordUsageFunction: row.record_usage_function,
checkBillingQuotaFunction: 'check_billing_quota',
};
},
});
Loading
Loading