Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions services/reactor/fabric_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

const (
fabricUART = "uart1"
fabricLogPrefix = "[" + fabricUART + "] "
fabricStopWaitTimeout = 500 * time.Millisecond
)

Expand Down Expand Up @@ -47,15 +48,15 @@ func (r *Reactor) startPassiveFabric(ctx context.Context, ev types.SerialSession
rx := shmring.Get(shmring.Handle(ev.RXHandle))
tx := shmring.Get(shmring.Handle(ev.TXHandle))
if rx == nil || tx == nil {
log.Println("[uart1] fabric session missing rings")
log.Println(fabricLogPrefix + "fabric session missing rings")
return
}

tr := fabric.NewShmringTransportWithBuffers(rx, tx, &fabricBuffers)
fabricConn := r.uiConn.NewChildConnection("fabric")
if fabricConn == nil {
_ = tr.Close()
log.Println("[uart1] fabric session missing bus")
log.Println(fabricLogPrefix + "fabric session missing bus")
return
}

Expand All @@ -68,7 +69,7 @@ func (r *Reactor) startPassiveFabric(ctx context.Context, ev types.SerialSession
r.fabricDone = done
r.fabricSessionOpen = true

log.Println("[uart1] fabric session opening node=mcu peer=bigbox-cm5 link=mcu-uart0 transfer=", transferMode)
log.Println(fabricLogPrefix+"fabric session opening node=mcu peer=bigbox-cm5 link=mcu-uart0 transfer=", transferMode)
go func() {
defer close(done)
defer tr.Close()
Expand All @@ -78,7 +79,7 @@ func (r *Reactor) startPassiveFabric(ctx context.Context, ev types.SerialSession
// to the updater-owned stage controller.
fabric.RunWithOptions(fabricCtx, tr, fabricConn, "mcu", "bigbox-cm5", fabric.DefaultLinkConfig(), fabric.RunOptions{Buffers: &fabricBuffers, StageController: stageController})
}()
log.Println("[uart1] fabric session opened node=mcu peer=bigbox-cm5 link=mcu-uart0 transfer=", transferMode)
log.Println(fabricLogPrefix+"fabric session opened node=mcu peer=bigbox-cm5 link=mcu-uart0 transfer=", transferMode)
}

func (r *Reactor) stopFabricLink() {
Expand All @@ -91,6 +92,6 @@ func (r *Reactor) stopFabricLink() {
r.fabricDone = nil
r.fabricSessionOpen = false
if !waitFabricDone(done, fabricStopWaitTimeout) {
log.Println("[uart1] fabric session stop timed out")
log.Println(fabricLogPrefix + "fabric session stop timed out")
}
}
26 changes: 4 additions & 22 deletions services/reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,8 @@ func (r *Reactor) Run(ctx context.Context) {
stSub := r.uiConn.Subscribe(stTopic)
evSub := r.uiConn.Subscribe(evTopic)

// UART sessions. uart0 remains the original local JSON telemetry stream;
// uart1 is now reserved for the CM5 Fabric link. The logger still writes to
// the USB monitor, but no longer opens a competing uart1 log mirror.
const uartTele = "uart0"
subSessOpenTele := r.uiConn.Subscribe(tSessOpened(uartTele))
subSessClosedTele := r.uiConn.Subscribe(tSessClosed(uartTele))
// UART session for the CM5 Fabric link. The production hardware-update path
// keeps this UART Fabric-only; legacy JSON telemetry is not opened here.
var subSessOpenFabric *bus.Subscription
var subSessClosedFabric *bus.Subscription
if useHardwareFabricUART() {
Expand All @@ -529,13 +525,12 @@ func (r *Reactor) Run(ctx context.Context) {
}

// Kick open requests (fire-and-forget; events carry handles).
r.uiConn.Publish(r.uiConn.NewMessage(tSessOpen(uartTele), nil, false))
if useHardwareFabricUART() {
r.uiConn.Publish(r.uiConn.NewMessage(tSessOpen(fabricUART), nil, false))
}

// Retry back-off guards.
var retryTeleAt, retryFabricAt time.Time
var retryFabricAt time.Time

// Supervisory ticker
ticker := time.NewTicker(TICK)
Expand All @@ -546,26 +541,13 @@ func (r *Reactor) Run(ctx context.Context) {
for {
select {
// ---- UART session opened/closed ----
case m := <-subSessOpenTele.Channel():
if ev, ok := m.Payload.(types.SerialSessionOpened); ok {
r.jsonOut = shmring.Get(shmring.Handle(ev.TXHandle))
log.Println("[uart0] telemetry session opened")
}
case m := <-subscriptionChannel(subSessOpenFabric):
if ev, ok := m.Payload.(types.SerialSessionOpened); ok {
r.startPassiveFabric(ctx, ev)
}
case <-subSessClosedTele.Channel():
r.jsonOut = nil
log.Println("[uart0] telemetry session closed")
// Auto-reopen with back-off
if time.Now().After(retryTeleAt) {
r.uiConn.Publish(r.uiConn.NewMessage(tSessOpen(uartTele), nil, false))
retryTeleAt = time.Now().Add(2 * time.Second)
}
case <-subscriptionChannel(subSessClosedFabric):
r.stopFabricLink()
log.Println("[uart1] fabric session closed")
log.Println(fabricLogPrefix + "fabric session closed")
// Auto-reopen with back-off
if time.Now().After(retryFabricAt) {
r.uiConn.Publish(r.uiConn.NewMessage(tSessOpen(fabricUART), nil, false))
Expand Down