Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ import (
"github.com/gorilla/mux"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -462,7 +465,7 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H
// ignore
}

span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger)
span, _, ctx := s.tracer.StartSpanFromContext(ctx, spanName, s.logger, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

err = s.tracer.AddContextHTTPHeader(ctx, r.Header)
Expand All @@ -472,6 +475,23 @@ func (s *Service) newTracingHandler(spanName string) func(h http.Handler) http.H
}

h.ServeHTTP(w, r.WithContext(ctx))

// The response status code is captured upstream by responseCodeMetricsHandler's
// *responseWriter, which exposes Status(). Read it to annotate the span with the
// standard HTTP server attributes so the request is queryable in Tempo/Grafana.
if sw, ok := w.(interface{ Status() int }); ok {
code := sw.Status()
span.SetAttributes(
semconv.HTTPRequestMethodKey.String(r.Method),
semconv.HTTPRoute(spanName),
semconv.HTTPResponseStatusCode(code),
)
// OTel server-span convention: 5xx marks the span as Error; 4xx is a client
// error and stays Unset, still queryable via http.response.status_code.
if code >= 500 {
span.SetStatus(codes.Error, http.StatusText(code))
}
}
})
}
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func init() {

type testServerOptions struct {
Storer api.Storer
Tracer *tracing.Tracer
StateStorer storage.StateStorer
Resolver resolver.Interface
Pss pss.Interface
Expand Down Expand Up @@ -231,12 +232,16 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
s.SetSwarmAddress(&o.Overlay)
s.SetProbe(o.Probe)

noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{
Enabled: false,
})
testutil.CleanupCloser(t, tracerCloser)
tracer := o.Tracer
if tracer == nil {
noOpTracer, tracerCloser, _ := tracing.NewTracer(&tracing.Options{
Enabled: false,
})
testutil.CleanupCloser(t, tracerCloser)
tracer = noOpTracer
}

s.Configure(signer, noOpTracer, api.Options{
s.Configure(signer, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
WsPingPeriod: o.WsPingPeriod,
}, extraOpts, 1, erc20APIService)
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type bytesPostResponse struct {

// bytesUploadHandler handles upload of raw binary data of arbitrary length.
func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bytes", s.logger.WithName("post_bytes").Build())
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bytes-post", s.logger.WithName("post_bytes").Build())
defer span.End()

headers := struct {
Expand Down Expand Up @@ -73,7 +73,7 @@ func (s *Service) bytesUploadHandler(w http.ResponseWriter, r *http.Request) {
tracing.RecordError(span, err, attribute.String("action", "tag.create"))
return
}
span.SetAttributes(attribute.Int64("tagID", int64(tag)))
span.SetAttributes(attribute.Int64("tag_id", int64(tag)))
}

defer s.observeUploadSpeed(w, r, time.Now(), "bytes", deferred)
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func lookaheadBufferSize(size int64) int {
}

func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build())
span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "bzz-post", s.logger.WithName("post_bzz").Build())
defer span.End()

headers := struct {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) {
tracing.RecordError(span, err, attribute.String("action", "tag.create"))
return
}
span.SetAttributes(attribute.Int64("tagID", int64(tag)))
span.SetAttributes(attribute.Int64("tag_id", int64(tag)))
}

putter, err := s.newStamperPutter(ctx, putterOptions{
Expand Down Expand Up @@ -340,7 +340,7 @@ func (s *Service) fileUploadHandler(

if tagID != 0 {
w.Header().Set(SwarmTagHeader, fmt.Sprint(tagID))
span.SetAttributes(attribute.Int64("tagID", int64(tagID)))
span.SetAttributes(attribute.Int64("tag_id", int64(tagID)))
}
w.Header().Set(ETagHeader, fmt.Sprintf("%q", reference.String()))
w.Header().Set(AccessControlExposeHeaders, SwarmTagHeader)
Expand Down
153 changes: 153 additions & 0 deletions pkg/api/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api_test

import (
"bytes"
"context"
"net/http"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
"github.com/ethersphere/bee/v2/pkg/spinlock"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/tracing"
"gitlab.com/nolash/go-mockbytes"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

// TestTracingHTTPSpan verifies that the HTTP tracing middleware annotates the
// span with the standard server attributes (method, route, status code), marks
// it as a server span, and maps a 5xx response to an Error status while a 2xx
// response leaves the status Unset. These are the fields used to query spans in
// Tempo/Grafana.
func TestTracingHTTPSpan(t *testing.T) {
t.Parallel()

const resource = "/bytes"

sr := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })

storerMock := mockstorer.New()
client, _, _, _ := newTestServer(t, testServerOptions{
Storer: storerMock,
Tracer: tracing.NewTracerFromProvider(tp),
Logger: log.Noop,
Post: mockpost.New(mockpost.WithAcceptAll()),
})

// Upload some content so it can be downloaded with a 200 OK.
g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
content, err := g.SequentialBytes(swarm.ChunkSize * 2)
if err != nil {
t.Fatal(err)
}

var res api.BytesPostResponse
jsonhttptest.Request(t, client, http.MethodPost, resource, http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestBody(bytes.NewReader(content)),
jsonhttptest.WithUnmarshalJSONResponse(&res),
)

// Successful download -> 200 OK.
jsonhttptest.Request(t, client, http.MethodGet, resource+"/"+res.Reference.String(), http.StatusOK,
jsonhttptest.WithExpectedResponse(content),
)

// Invalid address triggers an internal error -> 500.
jsonhttptest.Request(t, client, http.MethodGet, resource+"/abcd", http.StatusInternalServerError,
jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{
Message: "joiner failed",
Code: http.StatusInternalServerError,
}),
)

// span.End() runs in the middleware's deferred call, which can fire after the
// client has already received the response, so wait for both download spans.
var spans []sdktrace.ReadOnlySpan
if err := spinlock.Wait(time.Second, func() bool {
spans = endedSpansByName(sr, "bytes-download")
return len(spans) == 2
}); err != nil {
t.Fatalf("expected 2 bytes-download spans, got %d", len(spans))
}

var ok200, err500 sdktrace.ReadOnlySpan
for _, s := range spans {
switch spanStatusCode(s) {
case http.StatusOK:
ok200 = s
case http.StatusInternalServerError:
err500 = s
}
}
if ok200 == nil || err500 == nil {
t.Fatalf("missing spans by status code: have ok=%t err=%t", ok200 != nil, err500 != nil)
}

// Both spans carry the standard HTTP server attributes and are server-kind.
for _, s := range []sdktrace.ReadOnlySpan{ok200, err500} {
if s.SpanKind() != trace.SpanKindServer {
t.Errorf("span kind = %v, want server", s.SpanKind())
}
if got := spanAttrString(s, semconv.HTTPRequestMethodKey); got != http.MethodGet {
t.Errorf("http.request.method = %q, want %q", got, http.MethodGet)
}
if got := spanAttrString(s, semconv.HTTPRouteKey); got != "bytes-download" {
t.Errorf("http.route = %q, want %q", got, "bytes-download")
}
}

// A 5xx response marks the span as Error; a 2xx response stays Unset.
if got := err500.Status().Code; got != codes.Error {
t.Errorf("500 span status = %v, want %v", got, codes.Error)
}
if got := ok200.Status().Code; got != codes.Unset {
t.Errorf("200 span status = %v, want %v", got, codes.Unset)
}
}

func endedSpansByName(sr *tracetest.SpanRecorder, name string) []sdktrace.ReadOnlySpan {
var out []sdktrace.ReadOnlySpan
for _, s := range sr.Ended() {
if s.Name() == name {
out = append(out, s)
}
}
return out
}

func spanStatusCode(s sdktrace.ReadOnlySpan) int {
for _, a := range s.Attributes() {
if a.Key == semconv.HTTPResponseStatusCodeKey {
return int(a.Value.AsInt64())
}
}
return 0
}

func spanAttrString(s sdktrace.ReadOnlySpan, key attribute.Key) string {
for _, a := range s.Attributes() {
if a.Key == key {
return a.Value.AsString()
}
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, trace.WithAttributes(
attribute.String("address", chunkAddress.String()),
attribute.Int64("tagID", int64(chunk.TagID())),
attribute.Int64("tag_id", int64(chunk.TagID())),
attribute.String("sender_address", p.Address.String()),
))

Expand Down
7 changes: 7 additions & 0 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ func NewTracer(o *Options) (*Tracer, io.Closer, error) {
return &Tracer{tracer: tp.Tracer(instrumentationName)}, providerCloser{tp: tp}, nil
}

// NewTracerFromProvider wraps an existing OTel TracerProvider in a Tracer. It is
// primarily useful for tests that need a recording tracer (e.g. one backed by an
// in-memory span recorder) rather than the OTLP exporter pipeline NewTracer builds.
func NewTracerFromProvider(tp trace.TracerProvider) *Tracer {
return &Tracer{tracer: tp.Tracer(instrumentationName)}
}

// newResource builds the OTel resource describing this node. The env options
// come before WithAttributes so the configured values win over colliding
// OTEL_SERVICE_NAME/OTEL_RESOURCE_ATTRIBUTES, while the environment can still
Expand Down
Loading