Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
21c071b
feat: pubsub
nugaon Apr 14, 2026
6e38c54
refactor: mode id enum type
nugaon Apr 16, 2026
cbc882d
refactor: rename participant to publisher
nugaon Apr 16, 2026
6575b7d
docs: openapi
nugaon Apr 16, 2026
e5155a1
fix: eth address instead of public key on api
nugaon Apr 17, 2026
155ca50
fix: pubsub header parsing
bosi95 Apr 20, 2026
63a1435
fix: config cli option for broker mode
nugaon Apr 20, 2026
d64a383
fix: http hijacked write error
nugaon Apr 21, 2026
b6436c7
chore: debugging
nugaon Apr 21, 2026
4486a40
fix: libp2p backoff clear
bosi95 Apr 21, 2026
1b8e90c
fix: http hijack handler
bosi95 Apr 21, 2026
d6541e4
fix: hijacked websocket error
nugaon Apr 21, 2026
f7ff741
chore: logs debugging
nugaon Apr 21, 2026
5dd81fd
fix: pingpong with subscriber
nugaon Apr 21, 2026
e1d7dc7
fix: upgrade ws no compress
bosi95 Apr 21, 2026
873f9b7
fix: readdeadline on pong
nugaon Apr 21, 2026
cd8dbbf
fix(broker): eof return
nugaon Apr 21, 2026
096902b
chore: logs
nugaon Apr 21, 2026
a805913
fix: wrong connection mapping and many refactors
nugaon Apr 22, 2026
4adc494
fix: eof in websocket
nugaon Apr 22, 2026
3ad4179
fix: p2p ping
nugaon Apr 22, 2026
7822112
chore: debugging on broker side
nugaon Apr 22, 2026
023f680
chore: debug
nugaon Apr 22, 2026
791c66d
chore: add logs for debugging
bosi95 Apr 22, 2026
5f22c20
chore: debug instead of info log
nugaon Apr 22, 2026
2ac8f7b
chore: message check
nugaon Apr 22, 2026
48e218c
fix: soc sig validation
bosi95 Apr 22, 2026
a242e6a
fix: chunk span calc
bosi95 Apr 22, 2026
7aea4c5
fix: span size 8
bosi95 Apr 22, 2026
7c87b9b
fix: swap pubsub ws upgrade and connect order
bosi95 Apr 23, 2026
27460ff
fix: revert ws and svc connect order and cancel subscriberConn if pre…
bosi95 Apr 23, 2026
df93f38
fix: unregister delete overlay
bosi95 Apr 23, 2026
20e8cfd
feat: allow light node mode
nugaon Apr 23, 2026
d35f391
fix: remove and create sub conn race conditions
bosi95 Apr 23, 2026
010f8fd
fix: multiple ws on the same topic
nugaon Apr 23, 2026
8d87fd5
refactor: remove unnecessary param in runMux
nugaon Apr 29, 2026
4f1f5f9
refactor: move connection related struct defs and methods to service
nugaon Apr 29, 2026
552f123
refactor: subtract the pinging service from mode
nugaon Apr 29, 2026
c9a4301
docs: correction of size in the span
nugaon Apr 29, 2026
c7a622d
fix: removing duplicated stream close, leave only cancel
nugaon Apr 29, 2026
ce3cc21
refactor: log out dropping messages when ws buffer is full
nugaon Apr 29, 2026
d2b3c7c
fix: clear subscriberConn in runMux defer to prevent stale reference …
nugaon Apr 29, 2026
73dff46
fix: msg type in mode
nugaon Apr 30, 2026
594c93d
fix: remove devmode
nugaon May 12, 2026
0e344ae
refactor: remove api header and use only query
nugaon May 12, 2026
62540ad
refactor: remove pubsub max connections
nugaon May 15, 2026
1076add
fix: ci linting
nugaon May 15, 2026
e0bf3a9
refactor: move pubsub schemas from headers to components section
nugaon May 15, 2026
0d2d78c
fix: subscriberConn null set concurrency
nugaon May 15, 2026
2b91b2c
test: init
nugaon May 21, 2026
da8ccfd
perf: bmt SIMD hasher (#5381)
acud May 19, 2026
acbface
fix: linting
nugaon May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
optionAutoTLSRegistrationEndpoint = "autotls-registration-endpoint"
optionAutoTLSCAEndpoint = "autotls-ca-endpoint"
optionUseSIMD = "use-simd-hashing"
optionNamePubsubBrokerMode = "pubsub-broker-mode"

// blockchain-rpc
optionNameBlockchainRpcEndpoint = "blockchain-rpc-endpoint"
Expand Down Expand Up @@ -339,6 +340,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionAutoTLSDomain, p2pforge.DefaultForgeDomain, "autotls domain")
cmd.Flags().String(optionAutoTLSRegistrationEndpoint, p2pforge.DefaultForgeEndpoint, "autotls registration endpoint")
cmd.Flags().String(optionAutoTLSCAEndpoint, p2pforge.DefaultCAEndpoint, "autotls certificate authority endpoint")
cmd.Flags().Bool(optionNamePubsubBrokerMode, true, "enable pubsub broker mode")
cmd.Flags().Bool(optionUseSIMD, false, "use SIMD BMT hasher (available only on linux amd64 platforms)")
}

Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
WarmupTime: c.config.GetDuration(optionWarmUpTime),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress),
PubsubBrokerMode: c.config.GetBool(optionNamePubsubBrokerMode),
})

return b, err
Expand Down
54 changes: 54 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2541,3 +2541,57 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

"/pubsub/{topic}":
get:
summary: Connect to a pubsub topic via WebSocket
description: |
Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write)
or subscriber (read-only) depending on the presence of GSOC query params.

**WebSocket protocol:**
- Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:8B][payload:N B]`
- Outbound (node → client): raw SOC payload `[sig:65B][span:8B][payload:N B]`
tags:
- Pubsub
parameters:
- in: path
name: topic
schema:
type: string
required: true
description: Topic identifier (hex-encoded address or arbitrary string to be hashed)
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic"
- in: header
name: swarm-keep-alive
schema:
type: integer
required: false
description: "WebSocket ping period in seconds (default: 60)"
responses:
"101":
description: WebSocket upgrade successful
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"

"/pubsub/":
get:
summary: List all pubsub topics
description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber)
tags:
- Pubsub
responses:
"200":
description: List of pubsub topics
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
52 changes: 51 additions & 1 deletion openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,33 @@ components:
properties:
transactionHash:
$ref: "#/components/schemas/TransactionHash"

PubsubTopicInfo:
type: object
properties:
topicAddress:
type: string
description: "Hex-encoded topic address"
mode:
type: integer
description: "Pubsub mode identifier"
role:
type: string
description: "Role of this node: 'broker' or 'subscriber'"
connections:
type: array
items:
type: string
description: "List of connected peer overlays"

PubsubTopicListResponse:
type: object
properties:
topics:
type: array
items:
$ref: "#/components/schemas/PubsubTopicInfo"

headers:
SwarmTag:
description: "Tag UID"
Expand Down Expand Up @@ -1118,7 +1145,6 @@ components:
required: false
description: "Indicates which feed version was resolved (v1 or v2)"


parameters:
GasPriceParameter:
in: header
Expand Down Expand Up @@ -1331,6 +1357,30 @@ components:
required: false
description: "ACT history Unix timestamp"

SwarmPubsubPeer:
in: query
name: peer
schema:
type: string
required: true
description: "Multiaddress of the broker peer to connect to for pubsub"

SwarmPubsubGsocEthAddress:
in: query
name: gsoc-eth-address
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with gsoc-topic to upgrade to publisher."

SwarmPubsubGsocTopic:
in: query
name: gsoc-topic
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC topic identifier (hex) for publisher role. Required together with gsoc-eth-address to upgrade to publisher."

responses:
"200":
description: Success
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/resolver"
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
Expand Down Expand Up @@ -93,11 +94,10 @@ const (
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
SwarmActPublisherHeader = "Swarm-Act-Publisher"
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
GasLimitHeader = "Gas-Limit"
ETagHeader = "ETag"
ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
GasLimitHeader = "Gas-Limit"
ETagHeader = "ETag"

AuthorizationHeader = "Authorization"
AcceptEncodingHeader = "Accept-Encoding"
Expand Down Expand Up @@ -185,6 +185,7 @@ type Service struct {

topologyDriver topology.Driver
p2p p2p.DebugService
pubsubSvc *pubsub.Service
accounting accounting.Interface
chequebook chequebook.Service
pseudosettle settlement.Interface
Expand Down Expand Up @@ -268,6 +269,7 @@ type ExtraOptions struct {
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
PubsubService *pubsub.Service
}

func New(
Expand Down Expand Up @@ -355,6 +357,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.lightNodes = e.LightNodes
s.pseudosettle = e.Pseudosettle
s.blockTime = e.BlockTime
s.pubsubSvc = e.PubsubService

s.statusSem = semaphore.NewWeighted(1)
s.postageSem = semaphore.NewWeighted(1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
contractMock "github.com/ethersphere/bee/v2/pkg/postage/postagecontract/mock"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/pusher"
"github.com/ethersphere/bee/v2/pkg/resolver"
resolverMock "github.com/ethersphere/bee/v2/pkg/resolver/mock"
Expand Down Expand Up @@ -136,6 +137,7 @@ type testServerOptions struct {
ChequebookDisabled bool
SwapDisabled bool
Erc20ServiceNil bool
PubsubService *pubsub.Service
}

func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.Conn, string, *chanStorer) {
Expand Down Expand Up @@ -210,6 +212,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
Staking: o.StakingContract,
NodeStatus: o.NodeStatus,
PinIntegrity: o.PinIntegrity,
PubsubService: o.PubsubService,
}

// By default bee mode is set to full mode.
Expand Down
137 changes: 137 additions & 0 deletions pkg/api/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"encoding/hex"
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
ma "github.com/multiformats/go-multiaddr"
)

func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("pubsub").Build()

paths := struct {
Topic string `map:"topic" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

var topicAddr [32]byte
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
copy(topicAddr[:], decoded)
} else {
h := swarm.NewHasher()
_, _ = h.Write([]byte(paths.Topic))
copy(topicAddr[:], h.Sum(nil))
}

peerMultiaddr := r.URL.Query().Get("peer")
if peerMultiaddr == "" {
jsonhttp.BadRequest(w, "missing peer query param")
return
}
underlay, err := ma.NewMultiaddr(peerMultiaddr)
if err != nil {
logger.Info("invalid peer multiaddr", "value", peerMultiaddr, "error", err)
jsonhttp.BadRequest(w, "invalid peer query param")
return
}

var connectOpts pubsub.ConnectOptions

gsocEthAddrHex := r.URL.Query().Get("gsoc-eth-address")
gsocTopicHex := r.URL.Query().Get("gsoc-topic")
if gsocEthAddrHex != "" && gsocTopicHex != "" {
gsocOwner, err := hex.DecodeString(gsocEthAddrHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid gsoc-eth-address query param")
return
}
gsocID, err := hex.DecodeString(gsocTopicHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid gsoc-topic query param")
return
}
connectOpts.GsocOwner = gsocOwner
connectOpts.GsocID = gsocID
connectOpts.ReadWrite = true
}

headers := struct {
KeepAlive int `map:"Swarm-Keep-Alive"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

// Connect to broker peer
ctx, cancel := context.WithCancel(context.Background())
mode, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
if err != nil {
cancel()
logger.Info("pubsub connect failed", "error", err)
jsonhttp.InternalServerError(w, "pubsub connect failed")
return
}
// Upgrade to WebSocket
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkWithSpanSize,
WriteBufferSize: swarm.ChunkWithSpanSize,
CheckOrigin: s.checkOrigin,
}

logger.Info("upgrading to websocket")
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// Upgrade() hijacks the connection before returning an error,
// so do NOT write an HTTP response here.
cancel()
logger.Info("websocket upgrade failed", "error", err)
logger.Error(nil, "websocket upgrade failed")
return
}
logger.Info("websocket upgrade successful")

pingPeriod := time.Duration(headers.KeepAlive) * time.Second
if pingPeriod == 0 {
pingPeriod = time.Minute
}

isPublisher := connectOpts.ReadWrite

s.wsWg.Add(1)
go func() {
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, mode, isPublisher)
cancel()
_ = conn.Close()
s.wsWg.Done()
}()
}

func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
if s.pubsubSvc == nil {
jsonhttp.NotFound(w, "pubsub service not available")
return
}

topics := s.pubsubSvc.Topics()
jsonhttp.OK(w, struct {
Topics []pubsub.TopicInfo `json:"topics"`
}{
Topics: topics,
})
}
Loading
Loading