Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
tinygo flash -stack-size=3KB -monitor -scheduler tasks -target=pico -tags "pico_bb_proto_1" main.go

## Flashing ISOC Power Board via USB port on Pico2
tinygo flash -stack-size=8KB -monitor -scheduler tasks -target=pico2 -tags "pico_bb_proto_1" main.go
tinygo flash -stack-size=3KB -monitor -scheduler tasks -target=pico2 -tags "pico_bb_proto_1" main.go

-------------------

Expand Down
163 changes: 162 additions & 1 deletion bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type Subscription struct {
ch chan *Message
bus *Bus
conn *Connection
set *SubscriptionSet
}

func (s *Subscription) Topic() Topic { return s.topic }
Expand All @@ -176,6 +177,137 @@ func (s *Subscription) Reply(to *Message, payload any, retained bool) {
s.conn.Reply(to, payload, retained)
}

// -----------------------------------------------------------------------------
// SubscriptionSet
// -----------------------------------------------------------------------------

// SubscriptionSet lets a reactor wait on one readiness channel for several
// subscriptions without spawning a goroutine per subscription. Readiness is
// coalesced: once Ready() is signalled, callers should drain the subscriptions
// they own until no immediate work remains.
type SubscriptionSet struct {
conn *Connection
ready chan struct{}
mu sync.Mutex
subs []*Subscription
closed bool
}

func (c *Connection) NewSubscriptionSet() *SubscriptionSet {
return &SubscriptionSet{
conn: c,
ready: make(chan struct{}, 1),
}
}

func (ss *SubscriptionSet) Ready() <-chan struct{} {
if ss == nil {
return nil
}
return ss.ready
}

func (ss *SubscriptionSet) Subscribe(tp Topic) *Subscription {
if ss == nil || ss.conn == nil {
return nil
}
ss.mu.Lock()
if ss.closed {
ss.mu.Unlock()
return nil
}
ss.mu.Unlock()
ct := toConcrete(tp)
sub := &Subscription{topic: ct, ch: make(chan *Message, ss.conn.bus.qLen), bus: ss.conn.bus, conn: ss.conn, set: ss}
ss.conn.bus.addSubscription(ct, sub)
ss.conn.mu.Lock()
ss.conn.subs = append(ss.conn.subs, sub)
ss.conn.mu.Unlock()
ss.mu.Lock()
if !ss.closed {
ss.subs = append(ss.subs, sub)
} else {
sub.set = nil
}
ss.mu.Unlock()
if sub.set == nil {
ss.conn.Unsubscribe(sub)
return nil
}
return sub
}

func (ss *SubscriptionSet) Request(msg *Message) *Subscription {
if ss == nil || ss.conn == nil {
return nil
}
if topicLen(msg.ReplyTo) == 0 {
msg.ReplyTo = TNoIntern("_rr", ss.conn.rrCtr.Add(1))
}
sub := ss.Subscribe(msg.ReplyTo)
ss.conn.Publish(msg)
return sub
}

func (ss *SubscriptionSet) Unsubscribe(sub *Subscription) {
if ss == nil || sub == nil || ss.conn == nil {
return
}
ss.conn.Unsubscribe(sub)
}

func (ss *SubscriptionSet) Close() {
if ss == nil || ss.conn == nil {
return
}
ss.mu.Lock()
if ss.closed {
ss.mu.Unlock()
return
}
subs := append([]*Subscription(nil), ss.subs...)
ss.subs = nil
ss.closed = true
close(ss.ready)
ss.mu.Unlock()
for _, sub := range subs {
ss.conn.Unsubscribe(sub)
}
}

func (ss *SubscriptionSet) remove(sub *Subscription) {
if ss == nil || sub == nil {
return
}
ss.mu.Lock()
ss.subs = removeSub(ss.subs, sub)
ss.mu.Unlock()
}

func (ss *SubscriptionSet) signal() {
if ss == nil {
return
}
ss.mu.Lock()
ready := ss.ready
closed := ss.closed
ss.mu.Unlock()
if closed || ready == nil {
return
}
defer func() { _ = recover() }()
select {
case ready <- struct{}{}:
default:
}
}

func (s *Subscription) signalReady() {
if s != nil && s.set != nil {
s.set.signal()
}
}

// -----------------------------------------------------------------------------
// Trie node (shared for subscribers and retained messages)
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -308,10 +440,12 @@ func drainOne(ch chan *Message) {
func (b *Bus) tryDeliver(sub *Subscription, msg *Message) {
defer func() { _ = recover() }() // channel may be closed; best-effort
if trySend(sub.ch, msg) {
sub.signalReady()
return
}
drainOne(sub.ch)
_ = trySend(sub.ch, msg)
sub.signalReady()
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -472,6 +606,16 @@ func (b *Bus) NewConnection(id string) *Connection {
return &Connection{bus: b, id: id}
}

// NewChildConnection creates a separate connection on the same bus.
// Services should use separate connections so subscriptions, request-reply
// counters, and Disconnect lifetimes remain locally owned.
func (c *Connection) NewChildConnection(id string) *Connection {
if c == nil || c.bus == nil {
return nil
}
return c.bus.NewConnection(id)
}

func (c *Connection) NewMessage(tp Topic, payload any, retained bool) *Message {
return c.bus.NewMessage(tp, payload, retained)
}
Expand All @@ -489,10 +633,19 @@ func (c *Connection) Subscribe(tp Topic) *Subscription {
}

func (c *Connection) Unsubscribe(sub *Subscription) {
if sub == nil {
return
}
c.bus.unsubscribe(sub.topic, sub)
c.mu.Lock()
c.subs = removeSub(c.subs, sub)
c.mu.Unlock()
if sub.set != nil {
set := sub.set
sub.set = nil
set.remove(sub)
}
defer func() { _ = recover() }()
close(sub.ch)
}

Expand All @@ -504,7 +657,15 @@ func (c *Connection) Disconnect() {

for _, sub := range subs {
c.bus.unsubscribe(sub.topic, sub)
close(sub.ch)
if sub.set != nil {
set := sub.set
sub.set = nil
set.remove(sub)
}
func(ch chan *Message) {
defer func() { _ = recover() }()
close(ch)
}(sub.ch)
}
}

Expand Down
52 changes: 52 additions & 0 deletions bus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,55 @@ func TestTopic_InvalidTokenPanics(t *testing.T) {
// []byte is not comparable, so T should panic
_ = T([]byte{1, 2, 3})
}

func TestSubscriptionSetSignalsForAnyMember(t *testing.T) {
b := NewBus(4, "+", "#")
c := b.NewConnection("test")
ss := c.NewSubscriptionSet()
defer ss.Close()

a := ss.Subscribe(T("a"))
bb := ss.Subscribe(T("b"))

c.Publish(c.NewMessage(T("b"), "bee", false))
select {
case <-ss.Ready():
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for subscription set readiness")
}

select {
case m := <-bb.Channel():
if m.Payload != "bee" {
t.Fatalf("unexpected b payload: %#v", m.Payload)
}
default:
t.Fatal("b subscription was not ready after set signal")
}

select {
case m := <-a.Channel():
t.Fatalf("unexpected a message: %#v", m)
default:
}
}

func TestSubscriptionSetCoalescesReadiness(t *testing.T) {
b := NewBus(4, "+", "#")
c := b.NewConnection("test")
ss := c.NewSubscriptionSet()
defer ss.Close()

sub := ss.Subscribe(T("a"))
c.Publish(c.NewMessage(T("a"), "one", false))
c.Publish(c.NewMessage(T("a"), "two", false))

select {
case <-ss.Ready():
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for first readiness")
}

got := drainPayloads(t, sub, 2)
assertUnorderedEqual(t, got, []string{"one", "two"})
}
27 changes: 27 additions & 0 deletions cmd/fabric-selftest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Fabric self-test firmware

This is a narrow board-level protocol test image. It does not start the main
appliance Reactor, HAL polling, Telemetry, or the normal UART sessions.

It starts only:

- an in-memory bus;
- the Updater service with the `fabric_uart_hwtest` staging backend;
- one MCU-side Fabric session;
- a tiny in-process CM5 peer cross-wired through shmring UART-shaped transports.

It then performs `hello`, `prepare-update`, `xfer_begin`, `xfer_chunk*`,
`xfer_commit`, and waits for `xfer_done`. It does not call `commit-update` and it
does not exercise the production A/B flash writer.

Example Pico 2 run:

```sh
tinygo flash -stack-size=3KB -monitor -scheduler tasks \
-target=pico2 -tags "pico_bb_proto_1 fabric_uart_hwtest fabric_uart_selftest" \
./cmd/fabric-selftest
```

If this image needs more than 3 KB stack, the active Fabric transfer hot path
itself needs further stack reduction before production transfer is enabled at the
3 KB appliance gate.
Loading