diff --git a/.changeset/fuzzy-nails-trace.md b/.changeset/fuzzy-nails-trace.md new file mode 100644 index 00000000..4f30d314 --- /dev/null +++ b/.changeset/fuzzy-nails-trace.md @@ -0,0 +1,5 @@ +--- +"@cleverbrush/otel": minor +--- + +Add `@cleverbrush/otel/client` with typed-client tracing middleware that creates outbound CLIENT spans and injects W3C trace context for distributed service-to-service traces. diff --git a/README.md b/README.md index f2fd1a7a..635fa6bf 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ The flagship package is **`@cleverbrush/schema`** — a schema validation librar | [`@cleverbrush/deep`](./libs/deep) | Deep operations on objects: deep equality, deep merge, flattening, hashing | | [`@cleverbrush/scheduler`](./libs/scheduler) | Cron-like job scheduler for Node.js using worker threads | | [`@cleverbrush/client`](./libs/client) | Typed HTTP client for `@cleverbrush/server` API contracts — Proxy-based, zero codegen, full type inference. Optional React + TanStack Query integration via `/react` subpath | +| [`@cleverbrush/otel`](./libs/otel) | OpenTelemetry instrumentation — traces for HTTP, SQL, and outbound client calls; OTLP log sink with trace correlation; DI integration | | [`@cleverbrush/knex-clickhouse`](./libs/knex-clickhouse) | Knex query builder dialect for ClickHouse | --- diff --git a/libs/otel/README.md b/libs/otel/README.md index 0c3f76c1..e9a06869 100644 --- a/libs/otel/README.md +++ b/libs/otel/README.md @@ -58,7 +58,23 @@ const server = createServer() A `SpanKind.SERVER` span is opened per request, named `operationId` or `METHOD route` and tagged with the standard HTTP semantic-convention attributes. W3C `traceparent` is extracted, so spans link to upstream callers. -### 3. Trace SQL queries +### 3. Trace typed client calls between services + +```ts +import { createClient } from '@cleverbrush/client'; +import { clientTracingMiddleware } from '@cleverbrush/otel/client'; + +const client = createClient(api, { + baseUrl: 'http://todo-backend:3000', + middlewares: [clientTracingMiddleware()] +}); +``` + +`clientTracingMiddleware()` opens a `SpanKind.CLIENT` span around each typed client call and injects W3C `traceparent` / `tracestate` / `baggage` headers. When the downstream Cleverbrush service uses `tracingMiddleware()`, SigNoz and other OTel backends show both services under one distributed trace. + +Put the tracing middleware first in the client middleware list so it wraps retries, timeouts, and batching. If you use `batching()`, keep batching last so each logical subrequest carries its own trace context. + +### 4. Trace SQL queries ```ts import { instrumentKnex } from '@cleverbrush/otel'; @@ -71,7 +87,7 @@ const db = instrumentKnex( Every Knex query becomes a `SpanKind.CLIENT` span with `db.system.name`, `db.namespace`, `db.operation.name`, `db.query.text`, and parented under the active server span automatically. -### 4. Send logs as OTLP records (with trace correlation) +### 5. Send logs as OTLP records (with trace correlation) ```ts import { createLogger, consoleSink } from '@cleverbrush/log'; @@ -90,6 +106,7 @@ const logger = createLogger({ | --------------------------------- | --------------------------------------------------------- | | `setupOtel(config)` | Boot the Node SDK; returns `{ shutdown(), sdk }` | | `tracingMiddleware(opts?)` | `@cleverbrush/server` middleware; opens SERVER span | +| `clientTracingMiddleware(opts?)` | `@cleverbrush/client` middleware; opens CLIENT span | | `instrumentKnex(knex, opts?)` | Hook a Knex instance; emits CLIENT span per query | | `otelLogSink(opts?)` | `@cleverbrush/log` sink → OTLP log records | | `traceEnricher()` | `@cleverbrush/log` enricher → adds `TraceId` / `SpanId` | diff --git a/libs/otel/package.json b/libs/otel/package.json index 1b7b7cae..9071ece8 100644 --- a/libs/otel/package.json +++ b/libs/otel/package.json @@ -27,6 +27,10 @@ "types": "./dist/index.d.ts", "import": "./dist/index.js" }, + "./client": { + "types": "./dist/client.d.ts", + "import": "./dist/client.js" + }, "./instrumentations": { "types": "./dist/instrumentations.d.ts", "import": "./dist/instrumentations.js" diff --git a/libs/otel/src/client.test.ts b/libs/otel/src/client.test.ts new file mode 100644 index 00000000..ac89a53d --- /dev/null +++ b/libs/otel/src/client.test.ts @@ -0,0 +1,230 @@ +import { batching } from '@cleverbrush/client/batching'; +import { + propagation, + SpanKind, + SpanStatusCode, + trace +} from '@opentelemetry/api'; +import { W3CTraceContextPropagator } from '@opentelemetry/core'; +import { + InMemorySpanExporter, + SimpleSpanProcessor +} from '@opentelemetry/sdk-trace-base'; +import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; +import { + afterAll, + beforeAll, + beforeEach, + describe, + expect, + it, + vi +} from 'vitest'; +import { clientTracingMiddleware } from './client.js'; +import { tracingMiddleware } from './middleware/tracing.js'; + +const exporter = new InMemorySpanExporter(); +const provider = new NodeTracerProvider({ + spanProcessors: [new SimpleSpanProcessor(exporter)] +}); + +beforeAll(() => { + provider.register(); + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); +}); + +afterAll(async () => { + await provider.shutdown(); +}); + +beforeEach(() => exporter.reset()); + +function parseTraceparent(value: string): { + traceId: string; + spanId: string; +} { + const parts = value.split('-'); + return { + traceId: parts[1]!, + spanId: parts[2]! + }; +} + +function batchResponse(count: number): Response { + return new Response( + JSON.stringify({ + responses: Array.from({ length: count }, () => ({ + status: 200, + headers: { 'content-type': 'application/json' }, + body: '{}' + })) + }), + { + status: 200, + headers: { 'content-type': 'application/json' } + } + ); +} + +describe('clientTracingMiddleware', () => { + it('creates a CLIENT span and injects traceparent from that span', async () => { + const mw = clientTracingMiddleware(); + let injectedTraceparent = ''; + const send = mw(async (_url, init) => { + const headers = init.headers as Record; + injectedTraceparent = headers.traceparent; + expect(headers.Accept).toBe('application/json'); + return new Response(null, { status: 200 }); + }); + + const tracer = trace.getTracer('client-test'); + await tracer.startActiveSpan('service-a.request', async parent => { + try { + await send('http://service-b.local/api/todos', { + method: 'GET', + headers: { Accept: 'application/json' } + }); + } finally { + parent.end(); + } + }); + + const spans = exporter.getFinishedSpans(); + const parent = spans.find(s => s.name === 'service-a.request')!; + const client = spans.find(s => s.kind === SpanKind.CLIENT)!; + const traceparent = parseTraceparent(injectedTraceparent); + + expect(client.name).toBe('GET /api/todos'); + expect(client.parentSpanContext?.spanId).toBe( + parent.spanContext().spanId + ); + expect(traceparent.traceId).toBe(client.spanContext().traceId); + expect(traceparent.spanId).toBe(client.spanContext().spanId); + }); + + it('lets downstream server middleware extract the client span as parent', async () => { + let outboundHeaders: Record = {}; + const clientSend = clientTracingMiddleware()(async (_url, init) => { + outboundHeaders = init.headers as Record; + return new Response(null, { status: 204 }); + }); + + const tracer = trace.getTracer('distributed-test'); + await tracer.startActiveSpan('service-a.request', async parent => { + try { + await clientSend('http://service-b.local/api/downstream', { + method: 'POST' + }); + } finally { + parent.end(); + } + }); + + const clientSpan = exporter + .getFinishedSpans() + .find(s => s.kind === SpanKind.CLIENT)!; + const serverMw = tracingMiddleware(); + const ctx = { + method: 'POST', + url: new URL('http://service-b.local/api/downstream'), + headers: outboundHeaders, + items: new Map(), + response: { + statusCode: 200, + setHeader: vi.fn() + } + }; + + await serverMw(ctx, async () => {}); + + const serverSpan = exporter + .getFinishedSpans() + .find(s => s.kind === SpanKind.SERVER)!; + + expect(serverSpan.spanContext().traceId).toBe( + clientSpan.spanContext().traceId + ); + expect(serverSpan.parentSpanContext?.spanId).toBe( + clientSpan.spanContext().spanId + ); + }); + + it('uses typed client endpoint metadata for span names and attributes', async () => { + const send = clientTracingMiddleware()(async () => { + return new Response(null, { status: 201 }); + }); + + await send('http://service-b.local/api/todos', { + method: 'POST', + headers: {}, + __endpointMeta: { + group: 'todos', + endpoint: 'create', + path: '/api/todos', + operationId: 'createTodo', + tags: ['todos'] + } + } as RequestInit); + + const span = exporter.getFinishedSpans()[0]!; + expect(span.name).toBe('createTodo'); + expect(span.attributes['http.route']).toBe('/api/todos'); + expect(span.attributes['cleverbrush.client.group']).toBe('todos'); + expect(span.attributes['cleverbrush.client.endpoint']).toBe('create'); + expect(span.attributes['cleverbrush.endpoint.operationId']).toBe( + 'createTodo' + ); + expect(span.attributes['cleverbrush.endpoint.tags']).toBe('todos'); + }); + + it('marks HTTP error responses as failed client spans', async () => { + const send = clientTracingMiddleware()(async () => { + return new Response(null, { status: 503 }); + }); + + await send('http://service-b.local/api/todos', { method: 'GET' }); + + const span = exporter.getFinishedSpans()[0]!; + expect(span.status.code).toBe(SpanStatusCode.ERROR); + expect(span.attributes['http.response.status_code']).toBe(503); + }); + + it('records thrown fetch errors on the client span', async () => { + const send = clientTracingMiddleware()(async () => { + throw new TypeError('fetch failed'); + }); + + await expect( + send('http://service-b.local/api/todos', { method: 'GET' }) + ).rejects.toThrow('fetch failed'); + + const span = exporter.getFinishedSpans()[0]!; + expect(span.status.code).toBe(SpanStatusCode.ERROR); + expect(span.events.some(e => e.name === 'exception')).toBe(true); + }); + + it('preserves logical subrequest trace headers when wrapping batching', async () => { + const fetch = vi.fn().mockResolvedValue(batchResponse(2)); + const send = clientTracingMiddleware()( + batching({ windowMs: 1 })(fetch) + ); + + await Promise.all([ + send('http://service-b.local/api/a', { method: 'GET' }), + send('http://service-b.local/api/b', { method: 'GET' }) + ]); + + expect(fetch).toHaveBeenCalledOnce(); + const init = fetch.mock.calls[0]![1] as RequestInit; + const body = JSON.parse(String(init.body)) as { + requests: Array<{ headers: Record }>; + }; + + const firstTraceparent = body.requests[0]!.headers.traceparent; + const secondTraceparent = body.requests[1]!.headers.traceparent; + + expect(firstTraceparent).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-/); + expect(secondTraceparent).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-/); + expect(firstTraceparent).not.toBe(secondTraceparent); + }); +}); diff --git a/libs/otel/src/client.ts b/libs/otel/src/client.ts new file mode 100644 index 00000000..841ca833 --- /dev/null +++ b/libs/otel/src/client.ts @@ -0,0 +1,329 @@ +import { + context, + propagation, + type Span, + type SpanAttributes, + SpanKind, + SpanStatusCode, + type Tracer, + trace +} from '@opentelemetry/api'; +import { + ATTR_HTTP_REQUEST_METHOD, + ATTR_HTTP_RESPONSE_STATUS_CODE, + ATTR_HTTP_ROUTE, + ATTR_SERVER_ADDRESS, + ATTR_SERVER_PORT, + ATTR_URL_FULL, + ATTR_URL_PATH, + ATTR_URL_SCHEME +} from '@opentelemetry/semantic-conventions'; + +/** + * A function with the same shape as `fetch`. + * + * Kept local instead of importing from `@cleverbrush/client` so this + * entrypoint remains structurally compatible without a runtime dependency. + */ +export type FetchLike = (url: string, init: RequestInit) => Promise; + +/** + * Middleware shape accepted by `@cleverbrush/client`. + */ +export type ClientMiddleware = (next: FetchLike) => FetchLike; + +/** + * Endpoint metadata attached to `RequestInit` by `@cleverbrush/client`. + * + * This is intentionally structural and partial so the OTel package can read + * metadata when present without coupling to client internals at runtime. + */ +export interface ClientTracingEndpointMeta { + group?: string; + endpoint?: string; + method?: string; + path?: string; + collectionPath?: string; + operationId?: string | null; + tags?: readonly string[]; +} + +/** + * Information passed to the `enrichSpan` hook. + */ +export interface ClientTracingInfo { + url: string; + method: string; + headers: Record; + endpoint?: ClientTracingEndpointMeta; +} + +/** + * Configuration for {@link clientTracingMiddleware}. + */ +export interface ClientTracingMiddlewareOptions { + /** + * Tracer name used when resolving the OTel tracer. + * + * @default '@cleverbrush/otel' + */ + tracerName?: string; + + /** Tracer version. */ + tracerVersion?: string; + + /** + * Predicate for skipping tracing on selected outbound requests. + */ + skip?: (url: string, init: RequestInit) => boolean; + + /** + * Hook for adding custom attributes/events before the request is sent. + * Errors thrown here are swallowed. + */ + enrichSpan?: (span: Span, info: ClientTracingInfo) => void; + + /** + * Whether to record `url.full`. + * + * Disabled by default because full URLs can include query strings with + * sensitive values. + * + * @default false + */ + recordUrlFull?: boolean; +} + +const DEFAULT_TRACER_NAME = '@cleverbrush/otel'; + +function flattenHeaders( + headers: RequestInit['headers'] | undefined +): Record { + if (!headers) return {}; + + if (headers instanceof Headers) { + const result: Record = {}; + headers.forEach((value, key) => { + result[key] = value; + }); + return result; + } + + if (Array.isArray(headers)) { + const result: Record = {}; + for (const [key, value] of headers) { + result[key] = value; + } + return result; + } + + return { ...headers } as Record; +} + +function getEndpointMeta( + init: RequestInit +): ClientTracingEndpointMeta | undefined { + return (init as any).__endpointMeta as + | ClientTracingEndpointMeta + | undefined; +} + +function parseUrl(url: string): + | { + path: string; + scheme?: string; + host?: string; + port?: number; + } + | undefined { + try { + const absolute = /^[a-z][a-z0-9+.-]*:\/\//i.test(url); + const parsed = new URL(url, 'http://localhost'); + const result: { + path: string; + scheme?: string; + host?: string; + port?: number; + } = { + path: parsed.pathname + }; + + if (absolute) { + result.scheme = parsed.protocol.replace(/:$/, ''); + result.host = parsed.hostname; + if (parsed.port) { + const port = Number(parsed.port); + if (Number.isFinite(port)) result.port = port; + } + } + + return result; + } catch { + return undefined; + } +} + +function getRoute(meta: ClientTracingEndpointMeta | undefined): string | null { + return meta?.path ?? meta?.collectionPath ?? null; +} + +function getSpanName( + method: string, + url: string, + meta: ClientTracingEndpointMeta | undefined +): string { + if (meta?.operationId) return meta.operationId; + + const route = getRoute(meta); + if (route) return `${method} ${route}`; + + const parsed = parseUrl(url); + return `${method} ${parsed?.path ?? url}`; +} + +function buildAttributes( + method: string, + url: string, + meta: ClientTracingEndpointMeta | undefined, + recordUrlFull: boolean +): SpanAttributes { + const parsed = parseUrl(url); + const route = getRoute(meta); + const attributes: SpanAttributes = { + [ATTR_HTTP_REQUEST_METHOD]: method + }; + + if (parsed?.path) attributes[ATTR_URL_PATH] = parsed.path; + if (parsed?.scheme) attributes[ATTR_URL_SCHEME] = parsed.scheme; + if (parsed?.host) attributes[ATTR_SERVER_ADDRESS] = parsed.host; + if (parsed?.port !== undefined) { + attributes[ATTR_SERVER_PORT] = parsed.port; + } + if (recordUrlFull) attributes[ATTR_URL_FULL] = url; + if (route) attributes[ATTR_HTTP_ROUTE] = route; + if (meta?.group) attributes['cleverbrush.client.group'] = meta.group; + if (meta?.endpoint) { + attributes['cleverbrush.client.endpoint'] = meta.endpoint; + } + if (meta?.operationId) { + attributes['cleverbrush.endpoint.operationId'] = meta.operationId; + } + if (meta?.tags?.length) { + attributes['cleverbrush.endpoint.tags'] = meta.tags.join(','); + } + + return attributes; +} + +function markError(span: Span, err: unknown): void { + if (err instanceof Error) { + span.recordException(err); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message + }); + return; + } + + span.recordException(new Error(String(err))); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(err) + }); +} + +/** + * Creates a `@cleverbrush/client` middleware that traces outbound HTTP calls + * and injects W3C Trace Context headers. + * + * Register this as the first client middleware so it wraps retries, timeouts, + * and batching. The server-side `tracingMiddleware` already extracts these + * headers, so downstream services join the same distributed trace. + * + * @example + * ```ts + * import { createClient } from '@cleverbrush/client'; + * import { clientTracingMiddleware } from '@cleverbrush/otel/client'; + * + * const client = createClient(api, { + * baseUrl: 'http://service-b:3000', + * middlewares: [clientTracingMiddleware()] + * }); + * ``` + */ +export function clientTracingMiddleware( + options: ClientTracingMiddlewareOptions = {} +): ClientMiddleware { + const tracerName = options.tracerName ?? DEFAULT_TRACER_NAME; + const tracerVersion = options.tracerVersion; + const skip = options.skip; + const enrichSpan = options.enrichSpan; + const recordUrlFull = options.recordUrlFull ?? false; + + let cachedTracer: Tracer | undefined; + const getTracer = (): Tracer => { + if (!cachedTracer) { + cachedTracer = trace.getTracer(tracerName, tracerVersion); + } + return cachedTracer; + }; + + return next => async (url, init) => { + if (skip?.(url, init)) { + return next(url, init); + } + + const method = (init.method ?? 'GET').toUpperCase(); + const meta = getEndpointMeta(init); + const spanName = getSpanName(method, url, meta); + const attributes = buildAttributes(method, url, meta, recordUrlFull); + + const tracer = getTracer(); + return tracer.startActiveSpan( + spanName, + { kind: SpanKind.CLIENT, attributes }, + async span => { + const headers = flattenHeaders(init.headers); + propagation.inject(context.active(), headers); + + const tracedInit = { + ...init, + headers + }; + const info: ClientTracingInfo = { + url, + method, + headers, + ...(meta ? { endpoint: meta } : {}) + }; + + if (enrichSpan) { + try { + enrichSpan(span, info); + } catch { + // Ignore enrichment errors. + } + } + + try { + const response = await next(url, tracedInit); + span.setAttribute( + ATTR_HTTP_RESPONSE_STATUS_CODE, + response.status + ); + if (response.status >= 400) { + span.setStatus({ code: SpanStatusCode.ERROR }); + } else { + span.setStatus({ code: SpanStatusCode.OK }); + } + return response; + } catch (err) { + markError(span, err); + throw err; + } finally { + span.end(); + } + } + ); + }; +} diff --git a/libs/otel/tsup.config.ts b/libs/otel/tsup.config.ts index c7131595..097a8912 100644 --- a/libs/otel/tsup.config.ts +++ b/libs/otel/tsup.config.ts @@ -1,7 +1,7 @@ import { defineConfig } from 'tsup'; export default defineConfig({ - entry: ['src/index.ts', 'src/instrumentations.ts'], + entry: ['src/index.ts', 'src/client.ts', 'src/instrumentations.ts'], format: ['esm'], tsconfig: './tsconfig.build.json', minify: true, diff --git a/websites/docs/app/otel/page.tsx b/websites/docs/app/otel/page.tsx index 323e4edf..19a0c14d 100644 --- a/websites/docs/app/otel/page.tsx +++ b/websites/docs/app/otel/page.tsx @@ -10,8 +10,11 @@ export default function OtelPage() {

@cleverbrush/otel

End-to-end OpenTelemetry instrumentation for the - framework — traces, logs, and metrics over OTLP for{' '} + framework — traces for inbound/outbound HTTP, SQL, and + typed client calls; OTLP log sink with trace + correlation; metrics — for{' '} @cleverbrush/server,{' '} + @cleverbrush/client,{' '} @cleverbrush/orm, and{' '} @cleverbrush/log.

@@ -52,7 +55,8 @@ export default function OtelPage() { @cleverbrush/otel provides one{' '} SDK bootstrap ( setupOtel{' '} ), middleware for{' '} - @cleverbrush/server, a{' '} + @cleverbrush/server and{' '} + @cleverbrush/client, a{' '} Knex hook for{' '} @cleverbrush/orm, an{' '} OTLP log sink + enricher for{' '} @@ -118,7 +122,37 @@ const server = createServer() automatically.

-

3. Trace SQL queries

+

3. Trace typed client calls

+
+                    
+                
+

+ clientTracingMiddleware() opens a{' '} + SpanKind.CLIENT span around each typed client + call and injects W3C traceparent /{' '} + tracestate / baggage headers. When + the downstream Cleverbrush service uses{' '} + tracingMiddleware(), OTel backends show both + services under one distributed trace. +

+

+ Put tracing middleware first in the client middleware list + so it wraps retries, timeouts, and batching. If you use{' '} + batching(), keep batching last so each logical + subrequest carries its own trace context. +

+ +

4. Trace SQL queries

                     
 
-                

4. Send logs as OTLP records

+

5. Send logs as OTLP records

                     
                         
+                        
+                            
+                                clientTracingMiddleware(opts?)
+                            
+                            
+                                @cleverbrush/client middleware;
+                                opens SpanKind.CLIENT span per
+                                outbound call, injects W3C trace context
+                            
+                        
                         
                             
                                 instrumentKnex(knex, opts?)