From 6c92066035d5b61a87662372d3b8b796a65553e9 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 15 Jun 2026 14:36:57 +0200 Subject: [PATCH] feat(api): annotate HTTP spans with status and semantic attributes --- pkg/api/api.go | 22 +++++- pkg/api/api_test.go | 15 ++-- pkg/api/bytes.go | 4 +- pkg/api/bzz.go | 6 +- pkg/api/tracing_test.go | 153 +++++++++++++++++++++++++++++++++++++++ pkg/pushsync/pushsync.go | 2 +- pkg/tracing/tracing.go | 7 ++ 7 files changed, 197 insertions(+), 12 deletions(-) create mode 100644 pkg/api/tracing_test.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 18086405020..7490f16399e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -63,6 +63,9 @@ import ( "github.com/gorilla/mux" "github.com/hashicorp/go-multierror" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/semaphore" ) @@ -462,7 +465,7 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H // ignore } - span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger) + span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger, trace.WithSpanKind(trace.SpanKindServer)) defer span.End() err = s.tracer.AddContextHTTPHeader(ctx, r.Header) @@ -472,6 +475,23 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H } h.ServeHTTP(w, r.WithContext(ctx)) + + // The response status code is captured upstream by responseCodeMetricsHandler's + // *responseWriter, which exposes Status(). Read it to annotate the span with the + // standard HTTP server attributes so the request is queryable in Tempo/Grafana. + if sw, ok := w.(interface{ Status() int }); ok { + code := sw.Status() + span.SetAttributes( + semconv.HTTPRequestMethodKey.String(r.Method), + semconv.HTTPRoute(spanName), + semconv.HTTPResponseStatusCode(code), + ) + // OTel server-span convention: 5xx marks the span as Error; 4xx is a client + // error and stays Unset, still queryable via http.response.status_code. + if code >= 500 { + span.SetStatus(codes.Error, http.StatusText(code)) + } + } }) } } diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index babd816dd06..23782bcbc6b 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -90,6 +90,7 @@ func init() { type testServerOptions struct { Storer api.Storer + Tracer *tracing.Tracer StateStorer storage.StateStorer Resolver resolver.Interface Pss pss.Interface @@ -231,12 +232,16 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket. s.SetSwarmAddress(&o.Overlay) s.SetProbe(o.Probe) - noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{ - Enabled: false, - }) - testutil.CleanupCloser(t, tracerCloser) + tracer := o.Tracer + if tracer == nil { + noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{ + Enabled: false, + }) + testutil.CleanupCloser(t, tracerCloser) + tracer = noOpTracer + } - s.Configure(signer, noOpTracer, api.Options{ + s.Configure(signer, tracer, api.Options{ CORSAllowedOrigins: o.CORSAllowedOrigins, WsPingPeriod: o.WsPingPeriod, }, extraOpts, 1, erc20APIService) diff --git a/pkg/api/bytes.go b/pkg/api/bytes.go index 0855c3fba54..c8e0b318037 100644 --- a/pkg/api/bytes.go +++ b/pkg/api/bytes.go @@ -30,7 +30,7 @@ type bytesPostResponse struct { // bytesUploadHandler handles upload of raw binary data of arbitrary length. func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { - span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bytes", s.logger.WithName("post_bytes").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bytes-post", s.logger.WithName("post_bytes").Build()) defer span.End() headers := struct { @@ -73,7 +73,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) { tracing.RecordError(span, err, attribute.String("action", "tag.create")) return } - span.SetAttributes(attribute.Int64("tagID", int64(tag))) + span.SetAttributes(attribute.Int64("tag_id", int64(tag))) } defer s.observeUploadSpeed(w, r, time.Now(), "bytes", deferred) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index 9f62e6f4895..141b3b55d24 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -65,7 +65,7 @@ func lookaheadBufferSize(size int64) int { } func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { - span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build()) + span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bzz-post", s.logger.WithName("post_bzz").Build()) defer span.End() headers := struct { @@ -107,7 +107,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { tracing.RecordError(span, err, attribute.String("action", "tag.create")) return } - span.SetAttributes(attribute.Int64("tagID", int64(tag))) + span.SetAttributes(attribute.Int64("tag_id", int64(tag))) } putter, err := s.newStamperPutter(ctx, putterOptions{ @@ -340,7 +340,7 @@ func (s *Service) fileUploadHandler( if tagID != 0 { w.Header().Set(SwarmTagHeader, fmt.Sprint(tagID)) - span.SetAttributes(attribute.Int64("tagID", int64(tagID))) + span.SetAttributes(attribute.Int64("tag_id", int64(tagID))) } w.Header().Set(ETagHeader, fmt.Sprintf("%q", reference.String())) w.Header().Set(AccessControlExposeHeaders, SwarmTagHeader) diff --git a/pkg/api/tracing_test.go b/pkg/api/tracing_test.go new file mode 100644 index 00000000000..f07fb5565c7 --- /dev/null +++ b/pkg/api/tracing_test.go @@ -0,0 +1,153 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package api_test + +import ( + "bytes" + "context" + "net/http" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" + "github.com/ethersphere/bee/v2/pkg/log" + mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" + "github.com/ethersphere/bee/v2/pkg/spinlock" + mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/tracing" + "gitlab.com/nolash/go-mockbytes" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.opentelemetry.io/otel/trace" +) + +// TestTracingHTTPSpan verifies that the HTTP tracing middleware annotates the +// span with the standard server attributes (method, route, status code), marks +// it as a server span, and maps a 5xx response to an Error status while a 2xx +// response leaves the status Unset. These are the fields used to query spans in +// Tempo/Grafana. +func TestTracingHTTPSpan(t *testing.T) { + t.Parallel() + + const resource = "/bytes" + + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + + storerMock := mockstorer.New() + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storerMock, + Tracer: tracing.NewTracerFromProvider(tp), + Logger: log.Noop, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + + // Upload some content so it can be downloaded with a 200 OK. + g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255) + content, err := g.SequentialBytes(swarm.ChunkSize * 2) + if err != nil { + t.Fatal(err) + } + + var res api.BytesPostResponse + jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"), + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(content)), + jsonhttptest.WithUnmarshalJSONResponse(&res), + ) + + // Successful download -> 200 OK. + jsonhttptest.Request(t, client, http.MethodGet, resource+"/"+res.Reference.String(), http.StatusOK, + jsonhttptest.WithExpectedResponse(content), + ) + + // Invalid address triggers an internal error -> 500. + jsonhttptest.Request(t, client, http.MethodGet, resource+"/abcd", http.StatusInternalServerError, + jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ + Message: "joiner failed", + Code: http.StatusInternalServerError, + }), + ) + + // span.End() runs in the middleware's deferred call, which can fire after the + // client has already received the response, so wait for both download spans. + var spans []sdktrace.ReadOnlySpan + if err := spinlock.Wait(time.Second, func() bool { + spans = endedSpansByName(sr, "bytes-download") + return len(spans) == 2 + }); err != nil { + t.Fatalf("expected 2 bytes-download spans, got %d", len(spans)) + } + + var ok200, err500 sdktrace.ReadOnlySpan + for _, s := range spans { + switch spanStatusCode(s) { + case http.StatusOK: + ok200 = s + case http.StatusInternalServerError: + err500 = s + } + } + if ok200 == nil || err500 == nil { + t.Fatalf("missing spans by status code: have ok=%t err=%t", ok200 != nil, err500 != nil) + } + + // Both spans carry the standard HTTP server attributes and are server-kind. + for _, s := range []sdktrace.ReadOnlySpan{ok200, err500} { + if s.SpanKind() != trace.SpanKindServer { + t.Errorf("span kind = %v, want server", s.SpanKind()) + } + if got := spanAttrString(s, semconv.HTTPRequestMethodKey); got != http.MethodGet { + t.Errorf("http.request.method = %q, want %q", got, http.MethodGet) + } + if got := spanAttrString(s, semconv.HTTPRouteKey); got != "bytes-download" { + t.Errorf("http.route = %q, want %q", got, "bytes-download") + } + } + + // A 5xx response marks the span as Error; a 2xx response stays Unset. + if got := err500.Status().Code; got != codes.Error { + t.Errorf("500 span status = %v, want %v", got, codes.Error) + } + if got := ok200.Status().Code; got != codes.Unset { + t.Errorf("200 span status = %v, want %v", got, codes.Unset) + } +} + +func endedSpansByName(sr *tracetest.SpanRecorder, name string) []sdktrace.ReadOnlySpan { + var out []sdktrace.ReadOnlySpan + for _, s := range sr.Ended() { + if s.Name() == name { + out = append(out, s) + } + } + return out +} + +func spanStatusCode(s sdktrace.ReadOnlySpan) int { + for _, a := range s.Attributes() { + if a.Key == semconv.HTTPResponseStatusCodeKey { + return int(a.Value.AsInt64()) + } + } + return 0 +} + +func spanAttrString(s sdktrace.ReadOnlySpan, key attribute.Key) string { + for _, a := range s.Attributes() { + if a.Key == key { + return a.Value.AsString() + } + } + return "" +} diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 59a5e842afe..f11dfd55f9c 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -209,7 +209,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, trace.WithAttributes( attribute.String("address", chunkAddress.String()), - attribute.Int64("tagID", int64(chunk.TagID())), + attribute.Int64("tag_id", int64(chunk.TagID())), attribute.String("sender_address", p.Address.String()), )) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index d59fc99245f..ccf0f596a27 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -183,6 +183,13 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) { return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil } +// NewTracerFromProvider wraps an existing OTel TracerProvider in a Tracer. It is +// primarily useful for tests that need a recording tracer (e.g. one backed by an +// in-memory span recorder) rather than the OTLP exporter pipeline NewTracer builds. +func NewTracerFromProvider(tp trace.TracerProvider) *Tracer { + return &Tracer{tracer: tp.Tracer(instrumentationName)} +} + // newResource builds the OTel resource describing this node. The env options // come before WithAttributes so the configured values win over colliding // OTEL_SERVICE_NAME/OTEL_RESOURCE_ATTRIBUTES, while the environment can still