From 8e3292ae1f4614acfde145fdac5628f34fc5df37 Mon Sep 17 00:00:00 2001 From: Rich Thanki Date: Mon, 8 Jun 2026 02:12:17 +0000 Subject: [PATCH 1/3] WIP update --- src/configs/bigbox-v1-cm-2.json | 16 +- src/services/device/service.lua | 43 +- src/services/ui/config.lua | 6 +- src/services/ui/http/request.lua | 10 +- src/services/ui/service.lua | 1 + src/services/update/active_job.lua | 40 +- src/services/update/active_runtime.lua | 29 + src/services/update/backends/component.lua | 9 +- src/services/update/config.lua | 23 + src/services/update/service.lua | 4 + .../devhost/mcu_update_http_uart_spec.lua | 1488 +++++++++++++++++ tests/run.lua | 1 + tests/unit/device/test_phase2.lua | 30 + tests/unit/ui/test_config.lua | 9 + tests/unit/update/test_config.lua | 14 +- 15 files changed, 1710 insertions(+), 13 deletions(-) create mode 100644 tests/integration/devhost/mcu_update_http_uart_spec.lua diff --git a/src/configs/bigbox-v1-cm-2.json b/src/configs/bigbox-v1-cm-2.json index 4152ae87..74652092 100644 --- a/src/configs/bigbox-v1-cm-2.json +++ b/src/configs/bigbox-v1-cm-2.json @@ -851,7 +851,8 @@ "kind": "fabric_stage", "target": "updater/main", "chunk_size": 2048, - "artifact_store": "main" + "artifact_store": "main", + "timeout_s": 300 }, "commit-update": { "kind": "rpc", @@ -1125,13 +1126,10 @@ "peer_id": "mcu", "transport": { "kind": "uart", - "source": "uart", + "source": "uart_manager", "class": "uart", "id": "uart0", - "terminator": "\n", - "open_opts": { - "baud": 115200 - } + "terminator": "\n" }, "session": { "identity_claim": { @@ -1265,6 +1263,9 @@ "cap_id": "main", "host": "0.0.0.0", "port": 8080 + }, + "uploads": { + "require_auth": true } } }, @@ -1278,7 +1279,8 @@ }, "mcu": { "component": "mcu", - "backend": "component" + "backend": "component", + "stage_timeout_s": 300 }, "switch-main": { "component": "switch-main" diff --git a/src/services/device/service.lua b/src/services/device/service.lua index 3bed029a..a6861e7a 100644 --- a/src/services/device/service.lua +++ b/src/services/device/service.lua @@ -24,6 +24,7 @@ local cap_deps_mod = require 'devicecode.support.capability_dependencies' local dep_failure = require 'devicecode.support.dependency_failure' local backpressure = require 'services.device.backpressure' local dependency_mod = require 'services.device.dependencies' +local fabric_topics = require 'services.fabric.topics' local tablex = require 'shared.table' local M = {} @@ -37,6 +38,45 @@ local function new_service_id() return ('device-%d-%d'):format(os.time(), math.random(1, 1000000)) end +-- Device action timeout is owned by the action worker scope. The Fabric +-- transfer-manager request may legitimately stay open for that whole action, so +-- do not add lua-bus' default one-second call timeout here. The transfer budget +-- is passed as payload policy, while caller abandonment still aborts this Op. +local FABRIC_SEND_BLOB_CALL_OPTS = { timeout = false } + +local function default_fabric_client(conn) + if type(conn) ~= 'table' or type(conn.call_op) ~= 'function' then return nil end + + return { + send_blob_op = function (_, params, opts) + params = params or {} + opts = opts or {} + local timeout_s = opts.timeout or params.timeout + local ev, err = conn:call_op(fabric_topics.transfer_manager_rpc('send-blob'), { + link_id = params.link_id, + request_id = params.request_id or params.job_id, + xfer_id = params.xfer_id, + target = params.target, + source_owner = params.source_owner, + size = params.size, + digest_alg = params.digest_alg, + digest = params.digest, + chunk_size = params.chunk_size, + meta = params.meta, + timeout_s = timeout_s, + }, FABRIC_SEND_BLOB_CALL_OPTS) + if not ev then return nil, err end + return ev:wrap(function (reply, call_err) + if reply == nil then return nil, call_err end + if type(reply) == 'table' and reply.ok == false then + return nil, reply.err or reply.error or reply.reason or call_err or 'fabric_transfer_failed' + end + return (type(reply) == 'table' and (reply.result or reply.transfer)) or reply, nil + end) + end, + } +end + local function request_publication(state) if state.auto_publish == false then return end if state.publication_requested then return end @@ -837,7 +877,7 @@ local function build_state(scope, params) enable_observers = params.enable_observers, auto_publish = params.auto_publish, emit_events = params.emit_events, - fabric_client = params.fabric_client, + fabric_client = params.fabric_client or default_fabric_client(params.conn), open_source = params.open_source, open_source_op = params.open_source_op, terminate_source = params.terminate_source, @@ -974,5 +1014,6 @@ M.start_generation = start_generation M.cancel_active_generation = cancel_active_generation M.flush_publication = flush_publication M.cleanup_publication_now = cleanup_publication_now +M.default_fabric_client = default_fabric_client return M diff --git a/src/services/ui/config.lua b/src/services/ui/config.lua index 61537809..af1ced96 100644 --- a/src/services/ui/config.lua +++ b/src/services/ui/config.lua @@ -27,6 +27,7 @@ local DEFAULTS = { uploads = { enabled = true, max_bytes = 64 * 1024 * 1024, + require_auth = false, }, sessions = { prune_interval = 60, @@ -56,7 +57,7 @@ local HTTP_KEYS = { local STATIC_KEYS = { root = true, index = true, chunk_size = true } local SSE_KEYS = { enabled = true, queue_len = true, max_replay = true, replay = true, pattern = true } -local UPLOAD_KEYS = { enabled = true, max_bytes = true } +local UPLOAD_KEYS = { enabled = true, max_bytes = true, require_auth = true } local SESSION_KEYS = { prune_interval = true } local function fail(msg) return nil, msg end @@ -218,6 +219,9 @@ local function normalise_uploads(raw) v, err = non_negative_int_or_nil(raw.max_bytes, 'uploads.max_bytes') if err then return nil, err end if v ~= nil then out.max_bytes = v end + v, err = bool_or_nil(raw.require_auth, 'uploads.require_auth') + if err then return nil, err end + if v ~= nil then out.require_auth = v end return out, nil end diff --git a/src/services/ui/http/request.lua b/src/services/ui/http/request.lua index 024fbd65..7c7597a7 100644 --- a/src/services/ui/http/request.lua +++ b/src/services/ui/http/request.lua @@ -249,7 +249,15 @@ function M.run(scope, ctx, deps) elseif route.kind == 'command' then return handle_command(scope, owner, ctx, route, deps) elseif route.kind == 'upload' then - return upload.run(scope, owner, ctx, deps.update or deps) + local update_deps = deps.update or deps + if update_deps.require_auth == true then + local principal = principal_from(ctx, deps) + if principal == nil then + perform_response(owner:reply_error_op(401, 'unauthenticated')) + return { status = 'unauthenticated' } + end + end + return upload.run(scope, owner, ctx, update_deps) elseif route.kind == 'sse' then return sse.run(scope, owner, route, deps) elseif route.kind == 'static' then diff --git a/src/services/ui/service.lua b/src/services/ui/service.lua index c55040e7..e96c6eb0 100644 --- a/src/services/ui/service.lua +++ b/src/services/ui/service.lua @@ -339,6 +339,7 @@ local function build_listener_opts(state, cfg, listener_generation, listener_id) local update_opts = shallow_copy(params.update or {}) update_opts.max_bytes = uploads.max_bytes update_opts.enabled = uploads.enabled + update_opts.require_auth = uploads.require_auth update_opts.connect = update_opts.connect or params.connect update_opts.bus = update_opts.bus or params.bus diff --git a/src/services/update/active_job.lua b/src/services/update/active_job.lua index c2f169a4..123b93f7 100644 --- a/src/services/update/active_job.lua +++ b/src/services/update/active_job.lua @@ -72,10 +72,48 @@ local function stage_context(params) return backend, job, ctx end +local function perform_backend_op_until(backend, name, required, deadline, ...) + local fn = backend_method(backend, name, required) + if not fn then + return nil, nil + end + + local backend_op = fn(backend, ...) + local result, err + + if deadline ~= nil then + if fibers.now() >= deadline then + return nil, name .. '_timeout' + end + + -- The update phase owns this deadline. The backend Op, including any + -- bus request it opens, has no hidden call timeout. When timeout wins the + -- losing backend Op is aborted, allowing lua-bus to abandon the request and + -- Device to observe caller cancellation. + local which, a, b = fibers.perform(fibers.named_choice { + backend = backend_op, + timeout = sleep.sleep_until_op(deadline), + }) + if which == 'timeout' then + return nil, name .. '_timeout' + end + result, err = a, b + else + result, err = fibers.perform(backend_op) + end + + if result == nil then + return nil, err or (name .. '_failed') + end + + return result, nil +end + function M.stage(_scope, params) params = params or {} local backend, job, ctx = stage_context(params) - local staged = perform_backend_op(backend, 'stage_op', true, job, ctx) + local staged, err = perform_backend_op_until(backend, 'stage_op', true, params.deadline, job, ctx) + if staged == nil then error(err or 'stage_op_failed', 0) end return { tag = 'staged', diff --git a/src/services/update/active_runtime.lua b/src/services/update/active_runtime.lua index 68642c45..2245bb13 100644 --- a/src/services/update/active_runtime.lua +++ b/src/services/update/active_runtime.lua @@ -409,6 +409,28 @@ local function active_intent_for(job) return job and (job.active_intent or job.active) or nil end +local function positive_number(v) + local n = tonumber(v) + if type(n) ~= 'number' or n <= 0 then return nil end + return n +end + +local function configured_phase_timeout_s(self, component, phase) + local cfg = self and self._config or nil + local components = cfg and cfg.components or nil + local rec = type(components) == 'table' and component and components[component] or nil + if type(rec) ~= 'table' then return nil end + phase = phase or 'stage' + return positive_number(rec[phase .. '_timeout_s']) + or positive_number(rec.timeout_s) +end + +local function phase_deadline(self, job, phase) + local timeout_s = configured_phase_timeout_s(self, job and job.component, phase) + if timeout_s == nil then return nil end + return fibers.now() + timeout_s +end + local function reconcile_token_for(job, generation) return table.concat({ tostring(generation or 0), @@ -589,6 +611,7 @@ function Component:_launch_active_intent(job) }, job, { backend = self._backend, phase = intent.phase, + deadline = phase_deadline(self, job, intent.phase), }) if not handle then @@ -610,6 +633,11 @@ function Component:update_adoption(adoption) return true, nil end +function Component:update_config(config) + self._config = copy(config or {}) + return true, nil +end + function Component:consider_jobs() if not self._jobs then return false, 'not_ready' end if self._state.active ~= nil then return false, 'slot_busy' end @@ -843,6 +871,7 @@ function M.start_component(scope, params) _jobs = params.jobs, _backend = params.backend, _observer = params.observer, + _config = copy(params.config or {}), _adoption = params.adoption or {}, _active_applies = {}, _active_launched = {}, diff --git a/src/services/update/backends/component.lua b/src/services/update/backends/component.lua index 99b2bd48..4152e90d 100644 --- a/src/services/update/backends/component.lua +++ b/src/services/update/backends/component.lua @@ -60,11 +60,18 @@ local function transfer_from(job, ctx) } end +local NO_BUS_TIMEOUT = { timeout = false } + local function call_component_op(self, component, method, payload, opts) if type(self._conn) ~= 'table' or type(self._conn.call_op) ~= 'function' then return op.always(nil, 'component_backend_connection_required') end - return self._conn:call_op(topics.component_rpc(component, method), payload, opts or self._call_opts):wrap(function (reply, err) + -- Component update calls may legitimately take as long as the update phase + -- owns them for. Do not use lua-bus' default one-second call timeout here; + -- callers that own a budget must compose this Op with a sleep/deadline Op. + -- If that outer choice loses, lua-bus observes the abort and abandons the + -- request through the Request owner path. + return self._conn:call_op(topics.component_rpc(component, method), payload, opts or self._call_opts or NO_BUS_TIMEOUT):wrap(function (reply, err) if reply == false then return nil, err or 'component_call_failed' end return reply, err end) diff --git a/src/services/update/config.lua b/src/services/update/config.lua index eb5094d5..8331a9c5 100644 --- a/src/services/update/config.lua +++ b/src/services/update/config.lua @@ -25,6 +25,25 @@ local function sorted_count(t) return n end +local function normalise_optional_positive_number(t, key, where) + local v = t[key] + if v == nil then return true, nil end + local n = tonumber(v) + if type(n) ~= 'number' or n <= 0 then + return false, where .. '.' .. key .. ' must be a positive number' + end + t[key] = n + return true, nil +end + +local function normalise_component_timeouts(c, where) + for _, key in ipairs({ 'timeout_s', 'stage_timeout_s', 'commit_timeout_s', 'reconcile_timeout_s' }) do + local ok, err = normalise_optional_positive_number(c, key, where) + if not ok then return nil, err end + end + return c, nil +end + local function normalise_components(raw) local out = {} local list = raw or {} @@ -47,6 +66,8 @@ local function normalise_components(raw) end local c = copy(item) c.component = id + local ok, terr = normalise_component_timeouts(c, 'components[' .. tostring(i) .. ']') + if not ok then return nil, terr end out[id] = c end else @@ -65,6 +86,8 @@ local function normalise_components(raw) end local c = copy(item) c.component = id + local ok, terr = normalise_component_timeouts(c, 'components.' .. id) + if not ok then return nil, terr end out[id] = c end end diff --git a/src/services/update/service.lua b/src/services/update/service.lua index 80a836d6..ad5c84df 100644 --- a/src/services/update/service.lua +++ b/src/services/update/service.lua @@ -1086,6 +1086,9 @@ local function ensure_active_runtime(self, reason) if self._active_component and type(self._active_component.update_adoption) == 'function' then self._active_component:update_adoption(adoption) end + if self._active_component and type(self._active_component.update_config) == 'function' then + self._active_component:update_config(self._config) + end if self._active_component ~= nil then return true, nil end @@ -1103,6 +1106,7 @@ local function ensure_active_runtime(self, reason) jobs = self._jobs, backend = self._backend, observer = self._component_observer, + config = self._config, adoption = adoption, }) if not active_component then diff --git a/tests/integration/devhost/mcu_update_http_uart_spec.lua b/tests/integration/devhost/mcu_update_http_uart_spec.lua new file mode 100644 index 00000000..402e7088 --- /dev/null +++ b/tests/integration/devhost/mcu_update_http_uart_spec.lua @@ -0,0 +1,1488 @@ +-- tests/integration/devhost/mcu_update_http_uart_spec.lua +-- +-- Release-gate devhost MCU update flow over the real CM5 HAL UART boundary: +-- curl HTTP upload -> UI -> Update -> Device -> Fabric -> HAL UART -> PTY +-- middleware -> MCU-side Fabric -> fake MCU updater. +-- +-- The middleware deliberately preserves the base Fabric protocol semantics. It +-- fragments and paces bytes like a modestly imperfect 115200 bps UART, +-- includes occasional non-corrupt stalls and a small fake flash-write delay on +-- the MCU side, but it does not inject corrupt bytes, reorder frames, or ask +-- the sender for future transfer offsets. + +local busmod = require 'bus' +local fibers = require 'fibers' +local mailbox = require 'fibers.mailbox' +local channel = require 'fibers.channel' +local op = require 'fibers.op' +local sleep = require 'fibers.sleep' +local exec = require 'fibers.io.exec' + +local cjson_ok, cjson = pcall(require, 'cjson.safe') +if not cjson_ok then cjson = require 'cjson' end +local posix_ok, stdlib = pcall(require, 'posix.stdlib') +local safe = require 'coxpcall' + +local http_request = require 'http.request' + +local runfibers = require 'tests.support.run_fibers' +local probe = require 'tests.support.bus_probe' +local pty = require 'tests.support.pty' + +local bus_cleanup = require 'devicecode.support.bus_cleanup' + +local hal_types = require 'services.hal.types.core' +local cap_args = require 'services.hal.types.capability_args' + +local hal_service = require 'services.hal' +local http_service = require 'services.http.service' +local ui_service = require 'services.ui.service' +local update_service = require 'services.update.service' +local device_service = require 'services.device.service' +local fabric = require 'services.fabric' + +local http_driver_mod = require 'services.http.transport.cqueues_driver' +local hal_transport = require 'services.fabric.hal_transport' +local fabric_protocol = require 'services.fabric.protocol' +local fabric_topics = require 'services.fabric.topics' +local update_topics = require 'services.update.topics' +local device_topics = require 'services.device.topics' +local mcu_schema = require 'services.device.schemas.mcu' +local http_topics = require 'services.http.topics' + +local T = {} +local unpack = rawget(table, 'unpack') or _G.unpack + +local UART_BYTES_PER_SEC = 11520 +local DEFAULT_CI_MCU_BLOB_BYTES = 6 * 1024 +local LARGE_MCU_BLOB_ENV = 'MCU_HTTP_UART_BLOB_BYTES' + +local function env_size_bytes(name, default_value) + local raw = os.getenv(name) + if raw == nil or raw == '' then return default_value end + raw = tostring(raw):match('^%s*(.-)%s*$') + local number, suffix = raw:match('^(%d+)%s*([kKmMgG]?[iI]?[bB]?)$') + if number == nil then + error(('invalid %s=%q; expected bytes, KiB, MiB or GiB, for example 204800 or 200K'):format(name, raw), 0) + end + local n = tonumber(number) + suffix = suffix:lower() + if suffix == 'k' or suffix == 'kb' or suffix == 'kib' then + n = n * 1024 + elseif suffix == 'm' or suffix == 'mb' or suffix == 'mib' then + n = n * 1024 * 1024 + elseif suffix == 'g' or suffix == 'gb' or suffix == 'gib' then + n = n * 1024 * 1024 * 1024 + elseif suffix ~= '' and suffix ~= 'b' then + error(('invalid %s=%q; expected bytes, KiB, MiB or GiB, for example 204800 or 200K'):format(name, raw), 0) + end + if n <= 0 then + error(('%s must be positive'):format(name), 0) + end + return n +end + +local REALISTIC_MCU_BLOB_BYTES = env_size_bytes(LARGE_MCU_BLOB_ENV, DEFAULT_CI_MCU_BLOB_BYTES) +local REALISTIC_TRANSFER_TIMEOUT_S = 300.0 +local SHORT_STAGE_TIMEOUT_S = 2.0 +local REALISTIC_TEST_TIMEOUT_S = 240.0 +local MCU_PROGRESS_LOG_BYTES = 16 * 1024 +local WAIT_PROGRESS_LOG_S = 2.0 +local UART_MAX_READ_BYTES = 128 +local UART_MAX_FRAGMENT_BYTES = 16 +local UART_LONG_PAUSE_EVERY_BYTES = 64 * 1024 +local UART_LONG_PAUSE_BASE_S = 0.20 +local UART_LONG_PAUSE_JITTER_S = 0.15 +local MCU_FLASH_WRITE_DELAY_S = 0.010 + +local function fail(msg) error(msg or 'assertion failed', 2) end +local function assert_eq(a, b, msg) if a ~= b then fail(msg or ('expected ' .. tostring(b) .. ', got ' .. tostring(a))) end end +local function assert_true(v, msg) if v ~= true then fail(msg or ('expected true, got ' .. tostring(v))) end end +local function assert_not_nil(v, msg) if v == nil then fail(msg or 'expected non-nil') end end +local function assert_contains(haystack, needle, msg) + haystack = tostring(haystack or '') + needle = tostring(needle or '') + if haystack:find(needle, 1, true) == nil then + fail(msg or ('expected ' .. haystack .. ' to contain ' .. needle)) + end +end + +local function log(msg) + io.stderr:write('[mcu-http-uart] ' .. tostring(msg) .. '\n') +end + +local function shquote(s) + return "'" .. tostring(s):gsub("'", "'\\''") .. "'" +end + +local function mkdir_p(path) + local ok = os.execute('mkdir -p ' .. shquote(path)) + if ok ~= true and ok ~= 0 then error('mkdir failed: ' .. tostring(path), 0) end +end + +local function rm_rf(path) + os.execute('rm -rf ' .. shquote(path)) +end + +local function write_file(path, body) + local f = assert(io.open(path, 'wb')) + assert(f:write(body)) + assert(f:close()) +end + +local function temp_roots() + local base = ('/tmp/devicecode-mcu-http-uart-%d-%d'):format(os.time(), math.random(100000, 999999)) + rm_rf(base) + local roots = { + base = base, + config = base .. '/config', + static = base .. '/static', + artifact = { + transient = base .. '/cm5/artifacts/transient', + durable = base .. '/cm5/artifacts/durable', + import = base .. '/cm5/artifacts/import', + }, + control = base .. '/cm5/control/update', + mcu = base .. '/mcu', + } + mkdir_p(roots.config) + mkdir_p(roots.static) + mkdir_p(roots.artifact.transient) + mkdir_p(roots.artifact.durable) + mkdir_p(roots.artifact.import) + mkdir_p(roots.control) + mkdir_p(roots.mcu) + write_file(roots.static .. '/index.html', 'ok') + return roots +end + +local function copy_frame(frame) + local out = {} + for k, v in pairs(frame or {}) do + if type(v) == 'table' then + local t = {} + for k2, v2 in pairs(v) do t[k2] = v2 end + out[k] = t + else + out[k] = v + end + end + return out +end + +local function new_line_transport() + local in_tx, in_rx = mailbox.new(128, { full = 'reject_newest' }) + local written = {} + local session = {} + + function session:read_line_op() + return in_rx:recv_op():wrap(function(frame) + if frame == nil then return nil, in_rx:why() or 'closed' end + local line, err = fabric_protocol.encode_line(frame) + if not line then return nil, err end + return line, nil + end) + end + + function session:write_line_op(line) + local frame, err = fabric_protocol.decode_line(line) + if not frame then return fibers.always(nil, err) end + written[#written + 1] = copy_frame(frame) + return fibers.always(true, nil) + end + + function session:flush_op() + return fibers.always(true, nil) + end + + function session:terminate(reason) + in_tx:close(reason or 'transport terminated') + return true, nil + end + + return { session = session, in_tx = in_tx, written = written } +end + +local function connect_line_transports(a, b) + local write_a = a.session.write_line_op + function a.session:write_line_op(line) + local frame = assert(fabric_protocol.decode_line(line)) + write_a(self, line) + return b.in_tx:send_op(copy_frame(frame)) + end + + local write_b = b.session.write_line_op + function b.session:write_line_op(line) + local frame = assert(fabric_protocol.decode_line(line)) + write_b(self, line) + return a.in_tx:send_op(copy_frame(frame)) + end +end + +local function wrap_session_op(session) + local wrapped, err = hal_transport.wrap_transport(session) + return fibers.always(wrapped, err) +end + +local function wait_retained_payload_where(conn, topic, label, pred, opts) + opts = opts or {} + local view = conn:retained_view(topic) + local value = probe.wait_versioned_until(label, function () + return view:version() + end, function (seen) + return view:changed_op(seen) + end, function () + local msg = view:get(topic) + local payload = msg and msg.payload or nil + if pred(payload) then return payload end + return nil + end, opts) + view:close() + return value +end + +local function fresh_uart_manager(scope) + -- The UART manager is currently module-singleton state. Keep this test's + -- direct manager instance explicit and restore package.loaded afterwards so + -- a full-suite run is not order-dependent on this spec. + local manager_key = 'services.hal.managers.uart' + local driver_key = 'services.hal.drivers.uart' + local original_manager = package.loaded[manager_key] + local original_driver = package.loaded[driver_key] + + if scope ~= nil then + scope:finally(function () + package.loaded[manager_key] = original_manager + package.loaded[driver_key] = original_driver + end) + end + + package.loaded[manager_key] = nil + package.loaded[driver_key] = nil + return require 'services.hal.managers.uart' +end + +local function dummy_logger() + local logger = {} + for _, k in ipairs({ 'debug', 'info', 'warn', 'error' }) do + logger[k] = function () end + end + function logger:child() return self end + return logger +end + +local function wait_channel_get(ch, timeout_s, what) + local which, a, b = fibers.perform(op.named_choice({ + item = ch:get_op(), + timeout = sleep.sleep_op(timeout_s or 1.0), + })) + if which == 'timeout' then + error(('timed out waiting for %s'):format(what or 'channel item'), 0) + end + if a == nil then + error(('channel closed while waiting for %s: %s'):format(what or 'channel item', tostring(b)), 0) + end + return a +end + +local function wait_device_event(dev_ev_ch, event_type, class, id, timeout_s) + local deadline = fibers.now() + (timeout_s or 1.5) + while fibers.now() < deadline do + local ev = wait_channel_get(dev_ev_ch, deadline - fibers.now(), 'UART device event') + if ev.event_type == event_type and ev.class == class and ev.id == id then + return ev + end + end + error(('timed out waiting for UART device event %s %s/%s'):format( + tostring(event_type), tostring(class), tostring(id) + ), 0) +end + +local function wait_uart_cap(dev_ev_ch) + local added = wait_device_event(dev_ev_ch, 'added', 'uart', 'uart0', 1.5) + assert_true(type(added.capabilities) == 'table' and #added.capabilities == 1, 'UART added event missing capability') + local cap = added.capabilities[1] + assert_eq(cap.class, 'uart') + assert_eq(cap.id, 'uart0') + assert_true(type(cap.control_ch) == 'table', 'UART capability should expose control_ch') + return cap +end + +local function normalise_uart_open_opts(opts) + if opts == nil or getmetatable(opts) ~= cap_args.UARTOpenOpts then + local open_opts, err = cap_args.new.UARTOpenOpts(opts) + assert_not_nil(open_opts, tostring(err)) + return open_opts + end + return opts +end + +local function call_hal_control(cap, verb, opts) + local reply_ch = channel.new(1) + local req, err = hal_types.new.ControlRequest(verb, opts or {}, reply_ch) + assert_not_nil(req, tostring(err)) + + fibers.perform(cap.control_ch:put_op(req)) + + local reply = wait_channel_get(reply_ch, 1.0, 'HAL UART control reply') + assert_true(type(reply) == 'table', 'HAL control reply must be a table') + return reply +end + +local function expose_raw_host_uart_open(scope, bus, cap, source) + source = source or 'uart_manager' + local conn = bus:connect({ origin_base = { service = 'hal-uart-test-adapter' } }) + local cap_id = cap.id + local ep = conn:bind({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'rpc', 'open' }, { + queue_len = 8, + }) + + conn:retain({ 'raw', 'host', source, 'status' }, { + state = 'available', + available = true, + source = source, + class = 'uart', + id = cap_id, + }) + conn:retain({ 'raw', 'host', source, 'meta' }, { + source = source, + class = 'uart', + id = cap_id, + }) + conn:retain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'status' }, { + state = 'available', + available = true, + source_kind = 'host', + source = source, + }) + conn:retain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'meta' }, { + source_kind = 'host', + source = source, + offerings = { open = true }, + }) + + -- Register cleanup before spawning work on this scope. lua-fibers only + -- allows adding finalisers to a started scope from within that same scope. + scope:finally(function () + safe.pcall(function () ep:unbind() end) + safe.pcall(function () conn:unretain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'meta' }) end) + safe.pcall(function () conn:retain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'status' }, { + state = 'removed', + available = false, + source_kind = 'host', + source = source, + }) end) + safe.pcall(function () conn:unretain({ 'raw', 'host', source, 'meta' }) end) + safe.pcall(function () conn:retain({ 'raw', 'host', source, 'status' }, { + state = 'removed', + available = false, + source = source, + class = 'uart', + id = cap_id, + }) end) + bus_cleanup.disconnect(conn) + end) + + local ok_spawn, spawn_err = scope:spawn(function () + while true do + local req = ep:recv() + if req == nil then return end + + local open_opts = normalise_uart_open_opts(req.payload) + local reply = call_hal_control(cap, 'open', open_opts) + + local replied = req:reply(reply) + if not replied + and reply.ok == true + and type(reply.reason) == 'table' + and type(reply.reason.session) == 'table' + and type(reply.reason.session.terminate) == 'function' + then + reply.reason.session:terminate('fabric request abandoned') + end + end + end) + assert_true(ok_spawn, tostring(spawn_err)) + + return { source = source, class = 'uart', id = cap_id } +end + +local function start_cm5_uart_manager(scope, bus, port) + local uart_mgr = fresh_uart_manager(scope) + local dev_ev_ch = channel.new(16) + local cap_emit_ch = channel.new(32) + + local ok_start, start_err = fibers.perform(uart_mgr.start_op(dummy_logger(), dev_ev_ch, cap_emit_ch)) + assert_true(ok_start, tostring(start_err)) + scope:finally(function () + safe.pcall(function () fibers.perform(uart_mgr.shutdown_op()) end) + end) + + local ok_cfg, cfg_err = fibers.perform(uart_mgr.apply_config_op({ + { + id = 'uart0', + path = port.slave_name, + baud = 115200, + mode = '8N1', + }, + })) + assert_true(ok_cfg, tostring(cfg_err)) + + local cap = wait_uart_cap(dev_ev_ch) + local raw_cap = expose_raw_host_uart_open(scope, bus, cap, 'uart_manager') + + local conn = bus:connect({ origin_base = { service = 'hal-uart-test-wait' } }) + wait_retained_payload_where(conn, { 'raw', 'host', raw_cap.source, 'cap', raw_cap.class, raw_cap.id, 'status' }, + 'HAL UART raw capability available', function (p) + return p and p.available == true and p + end, { timeout = 1.0 }) + bus_cleanup.disconnect(conn) + + return raw_cap +end + +local function wait_job(conn, job_id, state, timeout) + return wait_retained_payload_where(conn, update_topics.workflow_update_job(job_id), 'update job ' .. tostring(job_id) .. ' ' .. tostring(state), function (p) + if p and p.state == state then return p end + return nil + end, { timeout = timeout or 4.0 }) +end + +local function describe_job_value(v, depth) + depth = depth or 0 + if type(v) ~= 'table' then return tostring(v) end + if depth >= 2 then return '' end + local parts = {} + for k, vv in pairs(v) do + if type(vv) ~= 'function' then + parts[#parts + 1] = tostring(k) .. '=' .. describe_job_value(vv, depth + 1) + end + end + table.sort(parts) + return '{' .. table.concat(parts, ',') .. '}' +end + +local function describe_job_progress(p) + if type(p) ~= 'table' then return 'absent' end + local parts = { 'state=' .. tostring(p.state) } + if p.stage_attempt ~= nil then + local st = p.stage_attempt + parts[#parts + 1] = 'stage=' .. tostring(type(st) == 'table' and (st.status or st.state or st.phase) or st) + if type(st) == 'table' and st.err ~= nil then parts[#parts + 1] = 'stage_err=' .. tostring(st.err) end + end + if p.component ~= nil then parts[#parts + 1] = 'component=' .. tostring(p.component) end + if p.err ~= nil then parts[#parts + 1] = 'err=' .. describe_job_value(p.err) end + if p.error ~= nil then parts[#parts + 1] = 'error=' .. describe_job_value(p.error) end + if p.reason ~= nil then parts[#parts + 1] = 'reason=' .. describe_job_value(p.reason) end + return table.concat(parts, ' ') +end + +local function wait_job_chatty(conn, job_id, state, timeout, progress_fn) + timeout = timeout or 4.0 + local topic = update_topics.workflow_update_job(job_id) + local view = conn:retained_view(topic) + local deadline = fibers.now() + timeout + local last_log = -math.huge + + local function maybe_log(force) + local now = fibers.now() + if not force and (now - last_log) < WAIT_PROGRESS_LOG_S then return end + last_log = now + local msg = view:get(topic) + local payload = msg and msg.payload or nil + local extra = progress_fn and progress_fn() or '' + if extra ~= '' then extra = '; ' .. extra end + log(('waiting for job %s -> %s: %s%s'):format(job_id, state, describe_job_progress(payload), extra)) + end + + local payload = (view:get(topic) or {}).payload + if payload and payload.state == state then + view:close() + return payload + end + maybe_log(true) + + local seen = view:version() + while true do + local remaining = deadline - fibers.now() + if remaining <= 0 then + maybe_log(true) + view:close() + error('timed out waiting for update job ' .. tostring(job_id) .. ' ' .. tostring(state), 0) + end + + local which, version, reason = fibers.perform(op.named_choice({ + changed = view:changed_op(seen), + progress = sleep.sleep_op(math.min(WAIT_PROGRESS_LOG_S, remaining)), + })) + + if which == 'changed' then + if version == nil then + view:close() + error('update job ' .. tostring(job_id) .. ' closed: ' .. tostring(reason or 'closed'), 0) + end + seen = version + payload = (view:get(topic) or {}).payload + if payload and payload.state == state then + maybe_log(true) + view:close() + return payload + end + end + + maybe_log(which == 'changed') + end +end + +local function wait_component_software(conn, image_id, boot_id) + return wait_retained_payload_where(conn, device_topics.component_software('mcu'), 'mcu software canonical state', function (p) + if p and (image_id == nil or p.image_id == image_id) and (boot_id == nil or p.boot_id == boot_id) then return p end + return nil + end, { timeout = 4.0 }) +end + +local function start_public_fabric(scope, conn, cfg, transport, opts) + opts = opts or {} + local link_overrides = opts.link_overrides + if link_overrides == nil and transport ~= nil then + link_overrides = { + ['link-a'] = { + open_transport_op = function () return wrap_session_op(transport.session) end, + transfer = opts.transfer, + }, + } + end + local ok, err = scope:spawn(function () + fabric.start(conn, { + name = opts.name or 'fabric', + env = 'test', + config = cfg, + link_overrides = link_overrides, + }) + end) + assert_true(ok, tostring(err)) +end + +local function fabric_config(local_node, peer_node, bridge, transfer) + return { + schema = fabric.config.SCHEMA, + local_node = local_node, + links = { + { + id = 'link-a', + peer_id = peer_node, + transport = { source = 'test', class = 'jsonl', id = 'link-a' }, + session = { hello_interval_s = 5.0, ping_interval_s = 5.0, liveness_timeout_s = 5.0 }, + bridge = bridge or {}, + transfer = transfer or { chunk_size = 2048, timeout_s = 3.0 }, + }, + }, + } +end + +local function cm5_fabric_config() + return fabric_config('cm5', 'mcu', { + imports = { + { id = 'mcu-state', remote = { 'state', 'self' }, ['local'] = { 'raw', 'member', 'mcu', 'state' } }, + { id = 'mcu-event', remote = { 'event', 'self' }, ['local'] = { 'raw', 'member', 'mcu', 'cap', 'telemetry', 'main', 'event' } }, + { id = 'mcu-cap', remote = { 'cap', 'self' }, ['local'] = { 'raw', 'member', 'mcu', 'cap' } }, + }, + rpc = { + outbound = { + { + id = 'mcu-prepare', + ['local'] = { 'raw', 'member', 'mcu', 'cap', 'updater', 'main', 'rpc', 'prepare-update' }, + remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }, + timeout_s = 2.0, + }, + { + id = 'mcu-commit', + ['local'] = { 'raw', 'member', 'mcu', 'cap', 'updater', 'main', 'rpc', 'commit-update' }, + remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }, + timeout_s = 2.0, + reply_policy = 'sent-is-accepted', + }, + }, + }, + }, { chunk_size = 2048, timeout_s = REALISTIC_TRANSFER_TIMEOUT_S }) +end + +local function mcu_fabric_config() + return fabric_config('mcu', 'cm5', { + exports = { + { id = 'mcu-state-export', ['local'] = { 'state', 'self' }, remote = { 'state', 'self' }, publish = true, retain = true }, + { id = 'mcu-cap-export', ['local'] = { 'cap', 'self' }, remote = { 'cap', 'self' }, publish = true, retain = true }, + }, + rpc = { + inbound = { + { + id = 'mcu-prepare-in', + ['local'] = { 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }, + remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }, + timeout_s = 2.0, + }, + { + id = 'mcu-commit-in', + ['local'] = { 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }, + remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }, + timeout_s = 2.0, + }, + }, + }, + }, { chunk_size = 2048, timeout_s = REALISTIC_TRANSFER_TIMEOUT_S }) +end + +local function publish_mcu_facts(conn, fake) + local image = fake.committed_image_id or fake.old_image_id + local boot = 'mcu-boot-' .. tostring(fake.boot_seq) + conn:retain({ 'state', 'self', 'software' }, { + image_id = image, + boot_id = boot, + version = image, + }) + conn:retain({ 'state', 'self', 'updater' }, { + state = 'ready', + last_error = nil, + staged_image_id = fake.staged and fake.staged.image_id or nil, + pending_image_id = fake.committed_image_id, + job_id = fake.job_id, + }) + conn:retain({ 'cap', 'self', 'updater', 'main', 'meta' }, { + class = 'updater', + id = 'main', + methods = { 'prepare-update', 'commit-update' }, + }) + conn:retain({ 'cap', 'self', 'updater', 'main', 'status' }, { + available = true, + state = 'available', + }) +end + +local function new_mcu_receive_target(fake) + local target = {} + function target:open_sink_op(req) + fake.transfer_begin = req + fake.receive_started_at = fibers.now() + fake.receive_bytes = 0 + fake.receive_chunks = 0 + fake.receive_next_log = MCU_PROGRESS_LOG_BYTES + assert_eq(req.target, 'updater/main') + log(('MCU receive target opened: target=%s size=%s job_id=%s image_id=%s'):format( + tostring(req.target), + tostring(req.size), + tostring(req.meta and req.meta.job_id), + tostring(req.meta and (req.meta.image_id or req.meta.expected_image_id)) + )) + local chunks = {} + local sink = {} + function sink:append_op(chunk) + return fibers.run_scope_op(function () + -- Simulate a small flash/programming delay on the MCU side. + -- This keeps the receiver honest without changing the transfer + -- protocol: it still only asks for the current offset. + fibers.perform(sleep.sleep_op(MCU_FLASH_WRITE_DELAY_S)) + chunks[#chunks + 1] = chunk + fake.receive_bytes = (fake.receive_bytes or 0) + #(chunk or '') + fake.receive_chunks = (fake.receive_chunks or 0) + 1 + if fake.receive_bytes >= (fake.receive_next_log or MCU_PROGRESS_LOG_BYTES) then + local elapsed = fibers.now() - (fake.receive_started_at or fibers.now()) + log(('MCU received %d bytes in %d chunks after %.1fs'):format( + fake.receive_bytes, fake.receive_chunks or 0, elapsed + )) + fake.receive_next_log = fake.receive_bytes + MCU_PROGRESS_LOG_BYTES + end + return true, nil + end):wrap(function (status, _report, ok, err) + if status ~= 'ok' then return nil, err or status end + return ok, err + end) + end + function sink:commit_op(req2) + local bytes = table.concat(chunks) + log(('MCU transfer commit: %d bytes in %d chunks; digest=%s'):format( + #bytes, fake.receive_chunks or 0, tostring(req2.digest) + )) + fake.staged = { + bytes = bytes, + digest = req2.digest, + size = req2.size, + image_id = req.meta and (req.meta.image_id or req.meta.expected_image_id), + job_id = req.meta and req.meta.job_id, + } + fake.staged_signal = (fake.staged_signal or 0) + 1 + return fibers.always({ staged = true, digest = req2.digest }, nil) + end + function sink:abort(reason) + fake.abort_reason = reason + fake.abort_count = (fake.abort_count or 0) + 1 + log(('MCU transfer abort: reason=%s after %d bytes in %d chunks'):format( + tostring(reason), fake.receive_bytes or 0, fake.receive_chunks or 0 + )) + return true, nil + end + return fibers.always(sink, nil) + end + return target +end + +local function start_fake_mcu(scope, bus, fake) + local conn = bus:connect({ origin_base = { service = 'fake-mcu' } }) + fake.boot_seq = (fake.boot_seq or 0) + 1 + publish_mcu_facts(conn, fake) + + local eps = {} + local function bind(topic) + local ep, err = bus_cleanup.bind(conn, topic, { queue_len = 8 }) + assert_not_nil(ep, err) + eps[#eps + 1] = ep + return ep + end + + local prepare_ep = bind({ 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }) + local commit_ep = bind({ 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }) + + scope:finally(function () + for _, ep in ipairs(eps) do bus_cleanup.unbind(conn, ep) end + bus_cleanup.disconnect(conn) + end) + + assert_true(scope:spawn(function () + while true do + local req = fibers.perform(prepare_ep:recv_op()) + if req == nil then return end + fake.prepare_payload = req.payload + assert_eq(type(req.payload) == 'table' and req.payload.target or nil, 'mcu') + fake.job_id = type(req.payload) == 'table' and req.payload.job_id or nil + conn:retain({ 'state', 'self', 'updater' }, { state = 'ready', last_error = nil, job_id = fake.job_id }) + req:reply({ ready = true, target = 'updater/main', max_chunk_size = 2048 }) + end + end)) + + assert_true(scope:spawn(function () + while true do + local req = fibers.perform(commit_ep:recv_op()) + if req == nil then return end + fake.commit_payload = req.payload + fake.commit_seen = true + fake.committed_image_id = (fake.staged and fake.staged.image_id) + or (type(req.payload) == 'table' and req.payload.expected_image_id) + conn:retain({ 'state', 'self', 'updater' }, { + state = 'rebooting', + last_error = nil, + pending_image_id = fake.committed_image_id, + staged_image_id = fake.staged and fake.staged.image_id or nil, + job_id = fake.job_id, + }) + req:reply({ accepted = true, reboot_required = true }) + end + end)) + + return conn +end + +local function new_fabric_client(conn) + local client = {} + function client:send_blob_op(params, opts) + params = params or {} + opts = opts or {} + assert(params.source_owner, 'fabric client source_owner required') + return conn:call_op(fabric_topics.transfer_manager_rpc('send-blob'), { + link_id = params.link_id or 'link-a', + request_id = params.request_id or ('device-stage-' .. tostring(params.job_id or os.clock())), + xfer_id = params.xfer_id, + target = assert(params.target, 'fabric client params.target required'), + source_owner = params.source_owner, + size = params.size, + digest_alg = params.digest_alg, + digest = params.digest, + chunk_size = params.chunk_size or 2048, + meta = params.meta, + timeout_s = opts.timeout or params.timeout or REALISTIC_TRANSFER_TIMEOUT_S, + }, { timeout = opts.timeout or params.timeout or REALISTIC_TRANSFER_TIMEOUT_S }):wrap(function (reply, err) + if reply == nil then return nil, err end + return { + ok = reply.ok, + committed = reply.committed, + transfer = reply, + }, nil + end) + end + return client +end + +local function update_config(opts) + opts = opts or {} + return { + schema = 'devicecode.update/1', + components = { + { + component = 'mcu', + -- Active update owns the phase budget. The Update backend + -- opens the Device bus call without lua-bus' default timeout; + -- active_job.stage composes the backend Op with this deadline. + stage_timeout_s = opts.stage_timeout_s or REALISTIC_TRANSFER_TIMEOUT_S, + }, + }, + } +end + +local function device_config(opts) + opts = opts or {} + local stage_timeout_s = opts.stage_action_timeout_s or REALISTIC_TRANSFER_TIMEOUT_S + return { + schema = 'devicecode.config/device/1', + components = { + mcu = { + class = 'member', + subtype = 'mcu', + role = 'controller', + member = 'mcu', + required_facts = { 'software', 'updater' }, + facts = mcu_schema.member_fact_topics('mcu'), + events = mcu_schema.member_event_topics('mcu'), + actions = { + ['restart'] = { kind = 'rpc', call_topic = device_topics.raw_member_cap_rpc('mcu', 'control', 'main', 'restart') }, + ['prepare-update'] = { + kind = 'rpc', + call_topic = device_topics.raw_member_cap_rpc('mcu', 'updater', 'main', 'prepare-update'), + timeout_s = 2.0, + }, + ['stage-update'] = { + kind = 'fabric_stage', + target = 'updater/main', + chunk_size = 2048, + artifact_store = 'main', + -- This is Device action policy, not a hidden bus timeout. + -- Keep it in the component config so the test does not + -- change Device's global default action timeout. + timeout_s = stage_timeout_s, + }, + ['commit-update'] = { + kind = 'rpc', + call_topic = device_topics.raw_member_cap_rpc('mcu', 'updater', 'main', 'commit-update'), + timeout_s = 2.0, + }, + }, + }, + }, + } +end + +local function hal_config(roots) + return { + schema = 'devicecode.config/hal/1', + artifact_store = { + stores = { + { + id = 'main', + transient_root = roots.artifact.transient, + durable_root = roots.artifact.durable, + import_root = roots.artifact.import, + durable_enabled = true, + }, + }, + }, + control_store = { + { name = 'update', root = roots.control }, + }, + } +end + +local function install_config_dir_for_scope(scope, config_dir) + if not (posix_ok and stdlib and stdlib.setenv) then return end + local old_config_dir = os.getenv('DEVICECODE_CONFIG_DIR') + scope:finally(function () + if old_config_dir ~= nil then + stdlib.setenv('DEVICECODE_CONFIG_DIR', old_config_dir, true) + elseif type(stdlib.unsetenv) == 'function' then + stdlib.unsetenv('DEVICECODE_CONFIG_DIR') + else + stdlib.setenv('DEVICECODE_CONFIG_DIR', '', true) + end + end) + stdlib.setenv('DEVICECODE_CONFIG_DIR', config_dir, true) +end + +local function start_hal(scope, bus, roots) + local conn = bus:connect({ origin_base = { service = 'hal' } }) + assert_true(scope:spawn(function () + hal_service.start(conn, { name = 'hal', env = 'test', heartbeat_s = false }) + end)) + conn:retain({ 'cfg', 'hal' }, { data = hal_config(roots) }) + wait_retained_payload_where(conn, { 'cap', 'artifact-store', 'main', 'status' }, 'artifact-store available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + wait_retained_payload_where(conn, { 'cap', 'control-store', 'update', 'status' }, 'control-store available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + return conn +end + +local function start_http(scope, bus) + local conn = bus:connect({ origin_base = { service = 'http' } }) + assert_true(scope:spawn(function (s) + http_service.run(s, { + conn = conn, + id = 'main', + config = { + schema = 'devicecode.config/http/1', + id = 'main', + policy = { allow_loopback = true, max_request_body = 16 * 1024 * 1024, max_response_body = 16 * 1024 * 1024 }, + }, + backend_timeout = 3.0, + connection_setup_timeout = 3.0, + intra_stream_timeout = 3.0, + }) + end)) + wait_retained_payload_where(conn, { 'cap', 'http', 'main', 'status' }, 'http service available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + return conn +end + +local function start_device(scope, bus, fabric_client, opts) + opts = opts or {} + local conn = bus:connect({ origin_base = { service = 'device' } }) + assert_true(scope:spawn(function (s) + device_service.run(s, { + conn = conn, + initial_config = device_config(opts), + fabric_client = fabric_client, + watch_config = false, + }) + end)) + wait_retained_payload_where(conn, device_topics.component_cap_status('mcu'), 'mcu component cap available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + return conn +end + +local function start_update(scope, bus, opts) + opts = opts or {} + local conn = bus:connect({ origin_base = { service = 'update' } }) + assert_true(scope:spawn(function (s) + update_service.run(s, { + conn = conn, + service_id = 'update', + watch_config = false, + config = update_config(opts), + job_store_call_opts = { timeout = 3.0 }, + }) + end)) + wait_retained_payload_where(conn, update_topics.update_manager_status(), 'update manager available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + wait_retained_payload_where(conn, update_topics.artifact_ingest_status(), 'artifact ingest available', function (p) + return p and p.available == true and p + end, { timeout = 4.0 }) + return conn +end + +local function start_ui(scope, bus, port, roots) + local conn = bus:connect({ origin_base = { service = 'ui' } }) + assert_true(scope:spawn(function (s) + ui_service.run(s, { + conn = conn, + service_id = 'ui', + auth_opts = { + users = { + tester = { password = 'test-password', principal = { kind = 'user', id = 'tester' } }, + }, + }, + bus = bus, + connect = function (principal) + return bus:connect({ principal = principal or { kind = 'ui-test' } }) + end, + encode_json = function (v) return assert(cjson.encode(v)) end, + update = { + bus = bus, + component = 'mcu', + ingest_id = 'ing-mcu-http-uart', + job_id = 'job-mcu-http-uart', + create_job = true, + start_job = true, + timeout = 8.0, + chunk_size = 4096, + metadata = { + source = 'browser', + format = 'dcmcu-v1', + expected_image_id = 'mcu-image-new', + image_id = 'mcu-image-new', + }, + }, + uploads = { enabled = true, max_bytes = 1024 * 1024, require_auth = true }, + }) + end)) + conn:retain({ 'cfg', 'ui' }, { data = { + schema = 'devicecode.config/ui/1', + enabled = true, + http = { enabled = true, cap_id = 'main', host = '127.0.0.1', port = port, max_active_requests = 8 }, + static = { root = roots.static, index = 'index.html' }, + uploads = { enabled = true, max_bytes = 1024 * 1024, require_auth = true }, + sse = { enabled = false }, + sessions = { prune_interval = false }, + } }) + wait_retained_payload_where(conn, http_topics.state('main', 'stats'), 'ui listener active', function (p) + return p and type(p.active_listeners) == 'number' and p.active_listeners > 0 and p + end, { timeout = 4.0 }) + return conn +end + +local function make_realistic_mcu_blob(image_id, target_bytes) + target_bytes = target_bytes or REALISTIC_MCU_BLOB_BYTES + local prefix = ('DCMCU-v1 manifest:%s\n'):format(image_id or 'mcu-image-new') + local block = 'payload-0123456789abcdef-' + local chunks = { prefix } + local remaining = target_bytes - #prefix + while remaining > 0 do + local n = math.min(remaining, #block) + chunks[#chunks + 1] = block:sub(1, n) + remaining = remaining - n + end + return table.concat(chunks) +end + +local function write_all(stream, data) + local off = 1 + while off <= #data do + local n, err = fibers.perform(stream:write_op(data:sub(off))) + if n == nil then return nil, err or 'write_failed' end + if n <= 0 then return nil, 'zero_length_write' end + off = off + n + end + return true, nil +end + +local function relay_fragmented_uart(direction, input, output, stats) + while true do + local want = math.random(1, UART_MAX_READ_BYTES) + local chunk, err = fibers.perform(input.master:read_some_op(want)) + if chunk == nil then + return { direction = direction, reason = err or 'closed' } + end + + local off = 1 + while off <= #chunk do + local frag_len = math.random(1, UART_MAX_FRAGMENT_BYTES) + local frag = chunk:sub(off, off + frag_len - 1) + local ok, werr = write_all(output.master, frag) + if ok ~= true then + return { direction = direction, reason = werr or 'write_failed' } + end + stats.bytes = (stats.bytes or 0) + #frag + stats.fragments = (stats.fragments or 0) + 1 + off = off + #frag + + if stats.bytes >= (stats.next_long_pause_at or UART_LONG_PAUSE_EVERY_BYTES) then + stats.pauses = (stats.pauses or 0) + 1 + stats.next_long_pause_at = (stats.next_long_pause_at or UART_LONG_PAUSE_EVERY_BYTES) + UART_LONG_PAUSE_EVERY_BYTES + local pause_s = UART_LONG_PAUSE_BASE_S + (math.random() * UART_LONG_PAUSE_JITTER_S) + log(('UART middleware pause #%d on %s for %.3fs at %d bytes'):format( + stats.pauses, direction, pause_s, stats.bytes + )) + fibers.perform(sleep.sleep_op(pause_s)) + end + + -- Approximate 115200 8N1 UART payload rate, with modest scheduling + -- jitter and short fragments. This tests the release protocol under + -- realistic pacing without injecting corrupt or reordered bytes. + fibers.perform(sleep.sleep_op((#frag / UART_BYTES_PER_SEC) + (math.random(0, 4) / 1000))) + end + end +end + +local function start_noisy_uart_middleware(scope, cm5_port, mcu_port) + local stats = { bytes = 0, fragments = 0, pauses = 0, next_long_pause_at = UART_LONG_PAUSE_EVERY_BYTES } + local ok_a, err_a = scope:spawn(function () + return relay_fragmented_uart('cm5_to_mcu', cm5_port, mcu_port, stats) + end) + assert_true(ok_a, tostring(err_a)) + local ok_b, err_b = scope:spawn(function () + return relay_fragmented_uart('mcu_to_cm5', mcu_port, cm5_port, stats) + end) + assert_true(ok_b, tostring(err_b)) + return stats +end + +local function open_pty_transport_op(slave_name) + return op.guard(function () + local stream, err = pty.open_slave_stream(slave_name) + if not stream then return fibers.always(nil, err or 'pty_slave_open_failed') end + local transport, terr = hal_transport.wrap_transport(stream, { terminator = '\n' }) + if not transport then + if type(stream.terminate) == 'function' then stream:terminate(terr or 'transport_wrap_failed') end + return fibers.always(nil, terr or 'transport_wrap_failed') + end + return fibers.always(transport, nil) + end) +end + +local function cm5_uart_fabric_config() + local cfg = cm5_fabric_config() + cfg.links[1].transport = { + source = 'uart_manager', + class = 'uart', + id = 'uart0', + terminator = '\n', + } + cfg.links[1].session.hello_interval_s = 0.20 + return cfg +end + +local function mcu_pty_fabric_config() + local cfg = mcu_fabric_config() + cfg.links[1].session.hello_interval_s = 0.20 + return cfg +end + +local function start_fabric_pair(parent_scope, cm5_bus, mcu_bus, fake, uart) + local pair_scope = assert(parent_scope:child()) + local stats = start_noisy_uart_middleware(pair_scope, assert(uart and uart.cm5, 'cm5 PTY required'), assert(uart and uart.mcu, 'mcu PTY required')) + + local cm5_conn = cm5_bus:connect({ origin_base = { service = 'fabric-cm5' } }) + local mcu_conn = mcu_bus:connect({ origin_base = { service = 'fabric-mcu' } }) + + -- CM5 uses the real HAL raw-host UART capability path. + start_public_fabric(pair_scope, cm5_conn, cm5_uart_fabric_config(), nil, { name = 'fabric-cm5' }) + + -- The MCU side is still a separate devicecode bus and real Fabric instance, + -- but opens the PTY slave directly to avoid starting a second singleton HAL + -- UART manager in this Lua VM. + start_public_fabric(pair_scope, mcu_conn, mcu_pty_fabric_config(), nil, { + name = 'fabric-mcu', + link_overrides = { + ['link-a'] = { + open_transport_op = function () return open_pty_transport_op(uart.mcu.slave_name) end, + transfer = { + chunk_size = 2048, + timeout_s = REALISTIC_TRANSFER_TIMEOUT_S, + receive_targets = { ['updater/main'] = new_mcu_receive_target(fake) }, + }, + }, + }, + }) + + wait_retained_payload_where(cm5_conn, fabric_topics.transfer_manager_status(), 'cm5 fabric transfer manager available', function (p) + return p and p.available == true and p + end, { timeout = 6.0 }) + + return { + scope = pair_scope, + cm5_conn = cm5_conn, + mcu_conn = mcu_conn, + uart_stats = stats, + } +end + +local function stop_fabric_pair(pair, reason) + if not pair then return end + if pair.scope then + pair.scope:cancel(reason or 'fabric pair stop') + fibers.perform(pair.scope:join_op()) + end + bus_cleanup.disconnect(pair.cm5_conn) + bus_cleanup.disconnect(pair.mcu_conn) +end + +local function start_cm5_instance(parent_scope, roots, http_port, uart_port, opts) + opts = opts or {} + local scope = assert(parent_scope:child()) + local bus = busmod.new() + local conn = bus:connect({ origin_base = { service = 'test-cm5' } }) + + -- DEVICECODE_CONFIG_DIR is process-global test harness state. Register + -- its finaliser before starting any work on this scope; lua-fibers requires + -- scope:finally to be called from inside the target scope once started. + install_config_dir_for_scope(scope, roots.config) + + -- The test UART adapter registers finalisers on the instance scope. + -- Do this before starting services that spawn onto the same scope; after a + -- scope has started, lua-fibers requires finalisers to be added from within + -- that target scope. + if uart_port ~= nil then + start_cm5_uart_manager(scope, bus, uart_port) + end + start_hal(scope, bus, roots) + start_http(scope, bus) + + return { + scope = scope, + bus = bus, + conn = conn, + start_services_after_fabric = function (fabric_client) + start_device(scope, bus, fabric_client, opts.device or opts) + start_update(scope, bus, opts.update or opts) + if http_port then start_ui(scope, bus, http_port, roots) end + end, + } +end + +local function start_mcu_instance(parent_scope, fake) + local scope = assert(parent_scope:child()) + local bus = busmod.new() + local conn = bus:connect({ origin_base = { service = 'test-mcu' } }) + start_fake_mcu(scope, bus, fake) + return { scope = scope, bus = bus, conn = conn } +end + +local function stop_instance(inst, reason) + if inst and inst.scope then + inst.scope:cancel(reason or 'test reboot') + fibers.perform(inst.scope:join_op()) + end +end + +local function parse_curl_output(out) + out = tostring(out or '') + local status = out:match('\n__HTTP_STATUS__:(%d+)%s*$') + local body = out:gsub('\n__HTTP_STATUS__:%d+%s*$', '') + return status, body +end + +local function run_curl(args) + local cmd = exec.command('curl', unpack(args)) + local out, st, code, sig, err = fibers.perform(cmd:combined_output_op()) + if not (st == 'exited' and code == 0) then + error(('curl failed: status=%s code=%s signal=%s err=%s output=%s'):format( + tostring(st), tostring(code), tostring(sig), tostring(err), tostring(out) + ), 0) + end + return parse_curl_output(out) +end + +local function append_headers(args, headers) + for k, v in pairs(headers or {}) do + args[#args + 1] = '--header' + args[#args + 1] = tostring(k) .. ': ' .. tostring(v) + end +end + +local function run_http_upload(_scope, port, body, headers) + local path = ('/tmp/devicecode-mcu-http-uart-upload-%d-%d.bin'):format(os.time(), math.random(100000, 999999)) + write_file(path, body) + local args = { + '--silent', '--show-error', + '--request', 'POST', + '--header', 'content-type: application/octet-stream', + '--data-binary', '@' .. path, + '--write-out', '\n__HTTP_STATUS__:%{http_code}\n', + } + append_headers(args, headers) + args[#args + 1] = ('http://127.0.0.1:%d/api/update/upload'):format(port) + local status, resp_body = run_curl(args) + os.remove(path) + return status, resp_body +end + +local function run_http_json(_scope, port, path, payload, headers) + local args = { + '--silent', '--show-error', + '--request', 'POST', + '--header', 'content-type: application/json', + '--data', assert(cjson.encode(payload or {})), + '--write-out', '\n__HTTP_STATUS__:%{http_code}\n', + } + append_headers(args, headers) + args[#args + 1] = ('http://127.0.0.1:%d%s'):format(port, path) + local status, resp_body = run_curl(args) + return status, resp_body, resp_body and cjson.decode(resp_body) or nil +end + + +function T.ui_http_mcu_update_short_stage_timeout_fails_via_outer_choice() + runfibers.run(function (root_scope) + local roots = temp_roots() + local blob = make_realistic_mcu_blob('mcu-image-new') + local port = 30000 + math.random(0, 20000) + local fake = { old_image_id = 'mcu-image-old' } + local uart = { cm5 = pty.open(root_scope), mcu = pty.open(root_scope) } + + log('short-timeout: booting CM5 instance') + local cm5 = start_cm5_instance(root_scope, roots, port, uart.cm5, { + stage_timeout_s = SHORT_STAGE_TIMEOUT_S, + stage_action_timeout_s = REALISTIC_TRANSFER_TIMEOUT_S, + }) + log('short-timeout: booting fake MCU instance') + local mcu = start_mcu_instance(root_scope, fake) + log('short-timeout: starting Fabric pair over PTY UART middleware') + local pair = start_fabric_pair(root_scope, cm5.bus, mcu.bus, fake, uart) + log('short-timeout: starting CM5 Device/Update/UI services') + cm5.start_services_after_fabric() + + wait_component_software(cm5.conn, 'mcu-image-old', 'mcu-boot-1') + + log('short-timeout: logging in through curl') + local login_status, login_body, login_decoded = run_http_json(root_scope, port, '/api/login', { + username = 'tester', + password = 'test-password', + }) + assert_eq(login_status, '200', login_body) + local sid = assert(login_decoded and login_decoded.session and login_decoded.session.id, login_body) + + log(('short-timeout: sending upload through curl with %.1fs update stage deadline'):format(SHORT_STAGE_TIMEOUT_S)) + local status, body = run_http_upload(root_scope, port, blob, { ['x-session-id'] = sid }) + assert_eq(status, '200', 'upload HTTP status ' .. tostring(status) .. ': ' .. tostring(body)) + + log('short-timeout: waiting for job to fail due to the outer stage deadline') + local failed = wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'failed', 20.0, function () + return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d'):format( + fake.receive_bytes or 0, + fake.receive_chunks or 0, + pair.uart_stats and pair.uart_stats.bytes or 0, + pair.uart_stats and pair.uart_stats.fragments or 0, + pair.uart_stats and pair.uart_stats.pauses or 0 + ) + end) + assert_contains(failed.error, 'stage_op_timeout', 'job should fail from the active stage deadline') + assert_true((fake.receive_bytes or 0) < #blob, 'short timeout should not stage the full artifact') + assert_true(fake.staged == nil, 'short timeout must not commit a staged artifact') + assert_true(fake.commit_seen ~= true, 'short timeout must not reach the MCU commit RPC') + fibers.perform(sleep.sleep_op(0.25)) + log(('short-timeout: cancellation observed abort_reason=%s abort_count=%d received=%d chunks=%d'):format( + tostring(fake.abort_reason), fake.abort_count or 0, fake.receive_bytes or 0, fake.receive_chunks or 0 + )) + + log('short-timeout: cleanup') + stop_fabric_pair(pair, 'short timeout complete') + stop_instance(cm5, 'short timeout complete') + stop_instance(mcu, 'short timeout complete') + rm_rf(roots.base) + end, { timeout = 60.0 }) +end + +function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() + runfibers.run(function (root_scope) + local roots = temp_roots() + local blob = make_realistic_mcu_blob('mcu-image-new') + local port = 30000 + math.random(0, 20000) + local fake = { old_image_id = 'mcu-image-old' } + + local uart = { cm5 = pty.open(root_scope), mcu = pty.open(root_scope) } + + log('booting initial CM5 instance') + local cm5 = start_cm5_instance(root_scope, roots, port, uart.cm5) + log('booting initial fake MCU instance') + local mcu = start_mcu_instance(root_scope, fake) + log('starting initial Fabric pair over PTY UART middleware') + local pair = start_fabric_pair(root_scope, cm5.bus, mcu.bus, fake, uart) + log('starting CM5 Device/Update/UI services') + cm5.start_services_after_fabric() + + log('waiting for initial canonical MCU software state') + wait_component_software(cm5.conn, 'mcu-image-old', 'mcu-boot-1') + + log('checking unauthenticated upload is rejected before login') + local unauth_status, unauth_body = run_http_upload(root_scope, port, 'not-authorised', nil) + assert_true( + unauth_status == '401' or unauth_status == '403', + 'unauthenticated upload should be rejected, got ' .. tostring(unauth_status) .. ': ' .. tostring(unauth_body) + ) + + log('logging in through curl before upload') + local login_status, login_body, login_decoded = run_http_json(root_scope, port, '/api/login', { + username = 'tester', + password = 'test-password', + }) + assert_eq(login_status, '200', login_body) + assert_not_nil(login_decoded and login_decoded.session, login_body) + local sid = login_decoded.session.id + assert_not_nil(sid, 'login should return a session id') + + log(('sending real HTTP upload through curl (%d byte artifact)'):format(#blob)) + local stage_started = fibers.now() + local status, body = run_http_upload(root_scope, port, blob, { ['x-session-id'] = sid }) + assert_eq(status, '200', 'upload HTTP status ' .. tostring(status) .. ': ' .. tostring(body)) + local decoded = assert(cjson.decode(body), body) + assert_eq(decoded.status, 'ok') + assert_not_nil(decoded.job, 'upload should create an update job') + assert_eq(decoded.job.job_id, 'job-mcu-http-uart') + + log('waiting for job awaiting_commit') + wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'awaiting_commit', REALISTIC_TRANSFER_TIMEOUT_S, function () + return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d'):format( + fake.receive_bytes or 0, + fake.receive_chunks or 0, + pair.uart_stats and pair.uart_stats.bytes or 0, + pair.uart_stats and pair.uart_stats.fragments or 0, + pair.uart_stats and pair.uart_stats.pauses or 0 + ) + end) + assert_true(probe.wait_until(function () + return fake.staged and fake.staged.bytes == blob + end, { timeout = 10.0 }), 'fake MCU should stage transferred artifact') + local stage_elapsed = fibers.now() - stage_started + local uart_bytes = pair.uart_stats and pair.uart_stats.bytes or 0 + local uart_fragments = pair.uart_stats and pair.uart_stats.fragments or 0 + local uart_pauses = pair.uart_stats and pair.uart_stats.pauses or 0 + log(('artifact staged in %.1fs; middleware relayed %d bytes in %d fragments with %d long pauses'):format( + stage_elapsed, uart_bytes, uart_fragments, uart_pauses + )) + assert_true(uart_bytes >= #blob, 'UART middleware should relay at least the artifact payload size') + assert_true( + uart_fragments >= math.floor(#blob / UART_MAX_FRAGMENT_BYTES), + 'UART middleware should fragment the byte stream heavily' + ) + if #blob >= (2 * UART_LONG_PAUSE_EVERY_BYTES) then + assert_true(uart_pauses >= 2, 'large UART test should inject at least two long scheduling pauses') + end + assert_true( + stage_elapsed >= ((#blob / UART_BYTES_PER_SEC) * 0.75), + 'artifact should take UART-paced time to stage' + ) + + log('committing job through curl HTTP JSON command route') + local commit_status, commit_body, commit_decoded = run_http_json( + root_scope, + port, + '/api/call/cap/update-manager/main/rpc/commit-job', + { job_id = 'job-mcu-http-uart' }, + { ['x-session-id'] = sid } + ) + assert_eq(commit_status, '200', commit_body) + assert_not_nil(commit_decoded and commit_decoded.value, commit_body) + assert_eq(commit_decoded.value.ok, true) + log('waiting for job awaiting_return') + wait_job(cm5.conn, 'job-mcu-http-uart', 'awaiting_return', 6.0) + assert_true(probe.wait_until(function () return fake.commit_seen == true end, { timeout = 2.0 }), 'fake MCU should see commit') + assert_eq(fake.committed_image_id, 'mcu-image-new') + + log('fake reboot: stopping Fabric pair') + stop_fabric_pair(pair, 'fake fabric link reboot') + log('fake reboot: stopping CM5 instance') + stop_instance(cm5, 'fake cm5 reboot') + log('fake reboot: stopping MCU instance') + stop_instance(mcu, 'fake mcu reboot') + + local uart_b = { cm5 = pty.open(root_scope), mcu = pty.open(root_scope) } + log('reboot: starting fresh CM5 instance') + local cm5b = start_cm5_instance(root_scope, roots, nil, uart_b.cm5) + log('reboot: starting fresh MCU instance') + local mcub = start_mcu_instance(root_scope, fake) + log('reboot: starting fresh Fabric pair over PTY UART middleware') + local pair_b = start_fabric_pair(root_scope, cm5b.bus, mcub.bus, fake, uart_b) + log('reboot: starting fresh CM5 Device/Update services') + cm5b.start_services_after_fabric() + + log('reboot: waiting for post-boot canonical MCU software state') + wait_component_software(cm5b.conn, 'mcu-image-new', 'mcu-boot-2') + log('reboot: waiting for job succeeded') + local final_job = wait_job(cm5b.conn, 'job-mcu-http-uart', 'succeeded', 8.0) + assert_eq(final_job.component, 'mcu') + assert_eq(final_job.job_id, 'job-mcu-http-uart') + assert_not_nil(final_job.commit_attempt, 'job should carry commit attempt details') + if final_job.commit_attempt and final_job.commit_attempt.pre_commit then + assert_eq(final_job.commit_attempt.pre_commit.pre_commit_boot_id, 'mcu-boot-1') + end + + log('cleanup: stopping second Fabric pair') + stop_fabric_pair(pair_b, 'test complete') + log('cleanup: stopping second CM5 instance') + stop_instance(cm5b, 'test complete') + log('cleanup: stopping second MCU instance') + stop_instance(mcub, 'test complete') + log('cleanup: removing temporary roots') + rm_rf(roots.base) + end, { timeout = REALISTIC_TEST_TIMEOUT_S }) +end + +return T diff --git a/tests/run.lua b/tests/run.lua index 230bf3a7..f7d075dc 100644 --- a/tests/run.lua +++ b/tests/run.lua @@ -103,6 +103,7 @@ local files = { "integration.devhost.fabric_public_service_path_spec", 'integration.devhost.update_public_seams_spec', 'integration.devhost.mcu_update_full_path_spec', + 'integration.devhost.mcu_update_http_uart_spec', 'unit.support.test_capability_dependencies', 'unit.support.test_dependency_failure', 'unit.support.test_dependency_slot', diff --git a/tests/unit/device/test_phase2.lua b/tests/unit/device/test_phase2.lua index 397df89f..45e0d332 100644 --- a/tests/unit/device/test_phase2.lua +++ b/tests/unit/device/test_phase2.lua @@ -64,6 +64,36 @@ local function sample_config(display_name) } end + +function tests.test_default_fabric_client_disables_bus_timeout_but_passes_transfer_budget() + local seen_topic, seen_payload, seen_opts + + fibers.run(function () + local conn = { + call_op = function (_, topic, payload, opts) + seen_topic = topic + seen_payload = payload + seen_opts = opts + return op.always({ ok = true, result = { accepted = true } }) + end, + } + local client = assert(service.default_fabric_client(conn)) + local result, err = fibers.perform(client:send_blob_op({ + request_id = 'r-fabric-stage', + target = 'updater/main', + chunk_size = 2048, + timeout = 300, + }, {})) + assert_nil(err) + assert_true(result.accepted) + end) + + assert_eq(table.concat(seen_topic, '/'), 'cap/transfer-manager/main/rpc/send-blob') + assert_eq(seen_payload.timeout_s, 300) + assert_eq(seen_payload.target, 'updater/main') + assert_eq(seen_opts.timeout, false, 'Device must not hide a lua-bus timeout inside Fabric staging') +end + function tests.test_default_catalogue_includes_host_and_mcu_components() local cat = catalogue.build(nil) assert_not_nil(cat.components.cm5) diff --git a/tests/unit/ui/test_config.lua b/tests/unit/ui/test_config.lua index 0435bbe9..801e519d 100644 --- a/tests/unit/ui/test_config.lua +++ b/tests/unit/ui/test_config.lua @@ -16,6 +16,15 @@ function M.test_normalise_supplies_http_static_sse_and_upload_defaults() eq(cfg.static.root, 'www') eq(cfg.sse.enabled, true) eq(cfg.uploads.enabled, true) + eq(cfg.uploads.require_auth, false) +end + +function M.test_uploads_can_require_authentication() + local cfg = ok(config.normalise({ + schema = config.SCHEMA, + uploads = { require_auth = true }, + })) + eq(cfg.uploads.require_auth, true) end function M.test_can_disable_http_listener_without_disabling_service() diff --git a/tests/unit/update/test_config.lua b/tests/unit/update/test_config.lua index 711e6399..aadff102 100644 --- a/tests/unit/update/test_config.lua +++ b/tests/unit/update/test_config.lua @@ -22,12 +22,13 @@ end function tests.test_components_array_is_normalised_to_map() local cfg, err = config.normalise({ components = { - { component = 'cm5', backend = 'swupdate' }, + { component = 'cm5', backend = 'swupdate', stage_timeout_s = '12.5' }, { component = 'mcu', backend = 'mcu' }, }, }) assert_not_nil(cfg, err) assert_eq(cfg.components.cm5.component, 'cm5') + assert_eq(cfg.components.cm5.stage_timeout_s, 12.5) assert_eq(cfg.components.mcu.component, 'mcu') assert_eq(cfg.summary.component_count, 2) end @@ -49,6 +50,17 @@ function tests.test_unsupported_schema_is_rejected() assert_not_nil(err) end +function tests.test_component_phase_timeouts_must_be_positive_numbers() + local cfg, err = config.normalise({ + components = { + mcu = { component = 'mcu', stage_timeout_s = 0 }, + }, + }) + if cfg ~= nil then fail('expected zero stage timeout to be rejected') end + assert_not_nil(err) + assert_true(tostring(err):find('stage_timeout_s', 1, true) ~= nil, err) +end + function tests.test_extract_payload_accepts_config_service_shape() local raw, rev = config.extract_payload({ rev = 7, data = { schema = config.SCHEMA, namespace = 'prod' } }) assert_eq(rev, 7) From 8ed145afe7b2df898b68cee35db7f2212ee89c5f Mon Sep 17 00:00:00 2001 From: Rich Thanki Date: Mon, 8 Jun 2026 02:44:30 +0000 Subject: [PATCH 2/3] test working through real HAL --- .../devhost/mcu_update_http_uart_spec.lua | 332 +++++++++++---- .../devhost/support/mcu_http_uart_child.lua | 399 ++++++++++++++++++ 2 files changed, 641 insertions(+), 90 deletions(-) create mode 100644 tests/integration/devhost/support/mcu_http_uart_child.lua diff --git a/tests/integration/devhost/mcu_update_http_uart_spec.lua b/tests/integration/devhost/mcu_update_http_uart_spec.lua index 402e7088..2afb92c3 100644 --- a/tests/integration/devhost/mcu_update_http_uart_spec.lua +++ b/tests/integration/devhost/mcu_update_http_uart_spec.lua @@ -6,9 +6,10 @@ -- -- The middleware deliberately preserves the base Fabric protocol semantics. It -- fragments and paces bytes like a modestly imperfect 115200 bps UART, --- includes occasional non-corrupt stalls and a small fake flash-write delay on --- the MCU side, but it does not inject corrupt bytes, reorder frames, or ask --- the sender for future transfer offsets. +-- includes occasional non-corrupt stalls, a small fake flash-write delay on +-- the MCU side, and a small number of standalone malformed JSONL lines +-- between valid frames. It does not corrupt bytes inside a valid frame, +-- reorder frames, or ask the sender for future transfer offsets. local busmod = require 'bus' local fibers = require 'fibers' @@ -17,6 +18,7 @@ local channel = require 'fibers.channel' local op = require 'fibers.op' local sleep = require 'fibers.sleep' local exec = require 'fibers.io.exec' +local socket = require 'fibers.io.socket' local cjson_ok, cjson = pcall(require, 'cjson.safe') if not cjson_ok then cjson = require 'cjson' end @@ -49,6 +51,7 @@ local update_topics = require 'services.update.topics' local device_topics = require 'services.device.topics' local mcu_schema = require 'services.device.schemas.mcu' local http_topics = require 'services.http.topics' +local xxhash32 = require 'shared.hash.xxhash32' local T = {} local unpack = rawget(table, 'unpack') or _G.unpack @@ -93,6 +96,9 @@ local UART_MAX_FRAGMENT_BYTES = 16 local UART_LONG_PAUSE_EVERY_BYTES = 64 * 1024 local UART_LONG_PAUSE_BASE_S = 0.20 local UART_LONG_PAUSE_JITTER_S = 0.15 +local UART_MALFORMED_LINE_COUNT = 2 +local UART_MALFORMED_LINE_FIRST_AT = 1536 +local UART_MALFORMED_LINE_EVERY_BYTES = 128 * 1024 local MCU_FLASH_WRITE_DELAY_S = 0.010 local function fail(msg) error(msg or 'assertion failed', 2) end @@ -241,24 +247,12 @@ local function wait_retained_payload_where(conn, topic, label, pred, opts) return value end -local function fresh_uart_manager(scope) - -- The UART manager is currently module-singleton state. Keep this test's - -- direct manager instance explicit and restore package.loaded afterwards so - -- a full-suite run is not order-dependent on this spec. - local manager_key = 'services.hal.managers.uart' - local driver_key = 'services.hal.drivers.uart' - local original_manager = package.loaded[manager_key] - local original_driver = package.loaded[driver_key] - - if scope ~= nil then - scope:finally(function () - package.loaded[manager_key] = original_manager - package.loaded[driver_key] = original_driver - end) - end - - package.loaded[manager_key] = nil - package.loaded[driver_key] = nil +local function fresh_uart_manager() + -- The UART manager is currently module-singleton state. The filtered test + -- runner still requires every spec module before it runs the selected test, + -- so make this test's direct manager instance explicit and isolated. + package.loaded['services.hal.managers.uart'] = nil + package.loaded['services.hal.drivers.uart'] = nil return require 'services.hal.managers.uart' end @@ -408,7 +402,7 @@ local function expose_raw_host_uart_open(scope, bus, cap, source) end local function start_cm5_uart_manager(scope, bus, port) - local uart_mgr = fresh_uart_manager(scope) + local uart_mgr = fresh_uart_manager() local dev_ev_ch = channel.new(16) local cap_emit_ch = channel.new(32) @@ -888,23 +882,11 @@ local function hal_config(roots) } end -local function install_config_dir_for_scope(scope, config_dir) - if not (posix_ok and stdlib and stdlib.setenv) then return end - local old_config_dir = os.getenv('DEVICECODE_CONFIG_DIR') - scope:finally(function () - if old_config_dir ~= nil then - stdlib.setenv('DEVICECODE_CONFIG_DIR', old_config_dir, true) - elseif type(stdlib.unsetenv) == 'function' then - stdlib.unsetenv('DEVICECODE_CONFIG_DIR') - else - stdlib.setenv('DEVICECODE_CONFIG_DIR', '', true) - end - end) - stdlib.setenv('DEVICECODE_CONFIG_DIR', config_dir, true) -end - local function start_hal(scope, bus, roots) local conn = bus:connect({ origin_base = { service = 'hal' } }) + if posix_ok and stdlib and stdlib.setenv then + stdlib.setenv('DEVICECODE_CONFIG_DIR', roots.config, true) + end assert_true(scope:spawn(function () hal_service.start(conn, { name = 'hal', env = 'test', heartbeat_s = false }) end)) @@ -1053,6 +1035,45 @@ local function write_all(stream, data) return true, nil end + +local function maybe_inject_malformed_jsonl(direction, output, stats) + if direction ~= 'cm5_to_mcu' then return true, nil end + if (stats.malformed_lines or 0) >= UART_MALFORMED_LINE_COUNT then return true, nil end + local next_at = stats.next_malformed_at or UART_MALFORMED_LINE_FIRST_AT + if (stats.bytes or 0) < next_at then return true, nil end + + stats.malformed_lines = (stats.malformed_lines or 0) + 1 + stats.next_malformed_at = next_at + UART_MALFORMED_LINE_EVERY_BYTES + local line = ('{ malformed json from mcu_http_uart test #%d }\n'):format(stats.malformed_lines) + local ok, err = write_all(output.master, line) + if ok ~= true then return nil, err or 'malformed_jsonl_inject_failed' end + + stats.bytes = (stats.bytes or 0) + #line + stats.fragments = (stats.fragments or 0) + 1 + stats.malformed_bytes = (stats.malformed_bytes or 0) + #line + log(('UART middleware injected standalone malformed JSONL line #%d on %s at %d bytes'):format( + stats.malformed_lines, direction, stats.bytes + )) + return true, nil +end + +local function write_uart_fragment_piece(direction, output, stats, piece) + if piece == '' then return true, nil end + local ok, werr = write_all(output.master, piece) + if ok ~= true then + return nil, werr or 'write_failed' + end + stats.bytes = (stats.bytes or 0) + #piece + stats.fragments = (stats.fragments or 0) + 1 + + if piece:sub(-1) == '\n' then + local mok, merr = maybe_inject_malformed_jsonl(direction, output, stats) + if mok ~= true then return nil, merr end + end + + return true, nil +end + local function relay_fragmented_uart(direction, input, output, stats) while true do local want = math.random(1, UART_MAX_READ_BYTES) @@ -1065,12 +1086,22 @@ local function relay_fragmented_uart(direction, input, output, stats) while off <= #chunk do local frag_len = math.random(1, UART_MAX_FRAGMENT_BYTES) local frag = chunk:sub(off, off + frag_len - 1) - local ok, werr = write_all(output.master, frag) - if ok ~= true then - return { direction = direction, reason = werr or 'write_failed' } + local frag_off = 1 + while frag_off <= #frag do + local nl = frag:find('\n', frag_off, true) + local piece + if nl ~= nil then + piece = frag:sub(frag_off, nl) + frag_off = nl + 1 + else + piece = frag:sub(frag_off) + frag_off = #frag + 1 + end + local ok, werr = write_uart_fragment_piece(direction, output, stats, piece) + if ok ~= true then + return { direction = direction, reason = werr or 'write_failed' } + end end - stats.bytes = (stats.bytes or 0) + #frag - stats.fragments = (stats.fragments or 0) + 1 off = off + #frag if stats.bytes >= (stats.next_long_pause_at or UART_LONG_PAUSE_EVERY_BYTES) then @@ -1092,7 +1123,15 @@ local function relay_fragmented_uart(direction, input, output, stats) end local function start_noisy_uart_middleware(scope, cm5_port, mcu_port) - local stats = { bytes = 0, fragments = 0, pauses = 0, next_long_pause_at = UART_LONG_PAUSE_EVERY_BYTES } + local stats = { + bytes = 0, + fragments = 0, + pauses = 0, + malformed_lines = 0, + malformed_bytes = 0, + next_long_pause_at = UART_LONG_PAUSE_EVERY_BYTES, + next_malformed_at = UART_MALFORMED_LINE_FIRST_AT, + } local ok_a, err_a = scope:spawn(function () return relay_fragmented_uart('cm5_to_mcu', cm5_port, mcu_port, stats) end) @@ -1135,33 +1174,16 @@ local function mcu_pty_fabric_config() return cfg end -local function start_fabric_pair(parent_scope, cm5_bus, mcu_bus, fake, uart) +local function start_fabric_pair(parent_scope, cm5_bus, fake, uart) local pair_scope = assert(parent_scope:child()) local stats = start_noisy_uart_middleware(pair_scope, assert(uart and uart.cm5, 'cm5 PTY required'), assert(uart and uart.mcu, 'mcu PTY required')) local cm5_conn = cm5_bus:connect({ origin_base = { service = 'fabric-cm5' } }) - local mcu_conn = mcu_bus:connect({ origin_base = { service = 'fabric-mcu' } }) - -- CM5 uses the real HAL raw-host UART capability path. + -- CM5 uses the real HAL raw-host UART capability path. The MCU side is a + -- separate Lua process with its own bus, scheduler and HAL UART manager. start_public_fabric(pair_scope, cm5_conn, cm5_uart_fabric_config(), nil, { name = 'fabric-cm5' }) - -- The MCU side is still a separate devicecode bus and real Fabric instance, - -- but opens the PTY slave directly to avoid starting a second singleton HAL - -- UART manager in this Lua VM. - start_public_fabric(pair_scope, mcu_conn, mcu_pty_fabric_config(), nil, { - name = 'fabric-mcu', - link_overrides = { - ['link-a'] = { - open_transport_op = function () return open_pty_transport_op(uart.mcu.slave_name) end, - transfer = { - chunk_size = 2048, - timeout_s = REALISTIC_TRANSFER_TIMEOUT_S, - receive_targets = { ['updater/main'] = new_mcu_receive_target(fake) }, - }, - }, - }, - }) - wait_retained_payload_where(cm5_conn, fabric_topics.transfer_manager_status(), 'cm5 fabric transfer manager available', function (p) return p and p.available == true and p end, { timeout = 6.0 }) @@ -1169,7 +1191,6 @@ local function start_fabric_pair(parent_scope, cm5_bus, mcu_bus, fake, uart) return { scope = pair_scope, cm5_conn = cm5_conn, - mcu_conn = mcu_conn, uart_stats = stats, } end @@ -1180,8 +1201,8 @@ local function stop_fabric_pair(pair, reason) pair.scope:cancel(reason or 'fabric pair stop') fibers.perform(pair.scope:join_op()) end - bus_cleanup.disconnect(pair.cm5_conn) - bus_cleanup.disconnect(pair.mcu_conn) + if pair.cm5_conn then bus_cleanup.disconnect(pair.cm5_conn) end + if pair.mcu_conn then bus_cleanup.disconnect(pair.mcu_conn) end end local function start_cm5_instance(parent_scope, roots, http_port, uart_port, opts) @@ -1190,11 +1211,6 @@ local function start_cm5_instance(parent_scope, roots, http_port, uart_port, opt local bus = busmod.new() local conn = bus:connect({ origin_base = { service = 'test-cm5' } }) - -- DEVICECODE_CONFIG_DIR is process-global test harness state. Register - -- its finaliser before starting any work on this scope; lua-fibers requires - -- scope:finally to be called from inside the target scope once started. - install_config_dir_for_scope(scope, roots.config) - -- The test UART adapter registers finalisers on the instance scope. -- Do this before starting services that spawn onto the same scope; after a -- scope has started, lua-fibers requires finalisers to be added from within @@ -1217,12 +1233,141 @@ local function start_cm5_instance(parent_scope, roots, http_port, uart_port, opt } end -local function start_mcu_instance(parent_scope, fake) +local function update_fake_from_mcu_child(fake, ev) + if type(ev) ~= 'table' then return end + local event = ev.event + if event == 'ready' then + fake.boot_seq = ev.boot_seq or fake.boot_seq + fake.child_ready = true + fake.child_pid = ev.pid + log(('MCU child ready: pid=%s boot_seq=%s image=%s'):format( + tostring(ev.pid), tostring(ev.boot_seq), tostring(ev.image_id) + )) + elseif event == 'receive_opened' then + fake.transfer_begin = ev + fake.receive_started_at = fibers.now() + fake.receive_bytes = 0 + fake.receive_chunks = 0 + log(('MCU receive target opened: target=%s size=%s job_id=%s image_id=%s'):format( + tostring(ev.target), tostring(ev.size), tostring(ev.job_id), tostring(ev.image_id) + )) + elseif event == 'receive_tick' then + fake.receive_bytes = ev.bytes or fake.receive_bytes + fake.receive_chunks = ev.chunks or fake.receive_chunks + elseif event == 'receive_progress' then + fake.receive_bytes = ev.bytes or fake.receive_bytes + fake.receive_chunks = ev.chunks or fake.receive_chunks + log(('MCU received %d bytes in %d chunks after %.1fs'):format( + fake.receive_bytes or 0, fake.receive_chunks or 0, tonumber(ev.elapsed_s) or 0 + )) + elseif event == 'transfer_commit' then + fake.receive_bytes = ev.size or fake.receive_bytes + fake.receive_chunks = ev.chunks or fake.receive_chunks + fake.staged = { + size = ev.size, + digest = ev.digest, + payload_digest = ev.payload_digest, + image_id = ev.image_id, + job_id = ev.job_id, + } + fake.staged_signal = (fake.staged_signal or 0) + 1 + log(('MCU transfer commit: %d bytes in %d chunks; digest=%s payload_digest=%s'):format( + ev.size or 0, ev.chunks or 0, tostring(ev.digest), tostring(ev.payload_digest) + )) + elseif event == 'transfer_abort' then + fake.abort_reason = ev.reason + fake.abort_count = ev.abort_count or ((fake.abort_count or 0) + 1) + fake.receive_bytes = ev.bytes or fake.receive_bytes + fake.receive_chunks = ev.chunks or fake.receive_chunks + log(('MCU transfer abort: reason=%s after %d bytes in %d chunks'):format( + tostring(ev.reason), fake.receive_bytes or 0, fake.receive_chunks or 0 + )) + elseif event == 'prepare' then + fake.prepare_payload = ev.payload + fake.job_id = ev.job_id + elseif event == 'commit' then + fake.commit_payload = ev.payload + fake.commit_seen = true + fake.committed_image_id = ev.committed_image_id + elseif event == 'log' then + log('MCU child: ' .. tostring(ev.message)) + end +end + +local function start_mcu_instance(parent_scope, fake, uart_port) + assert(uart_port and uart_port.slave_name, 'MCU PTY slave required') local scope = assert(parent_scope:child()) - local bus = busmod.new() - local conn = bus:connect({ origin_base = { service = 'test-mcu' } }) - start_fake_mcu(scope, bus, fake) - return { scope = scope, bus = bus, conn = conn } + local ipc_path = ('/tmp/devicecode-mcu-child-%d-%d.sock'):format(os.time(), math.random(100000, 999999)) + os.remove(ipc_path) + + local server, serr = socket.listen_unix(ipc_path, { ephemeral = true }) + assert_not_nil(server, serr or 'listen_unix failed') + local ready_ch = channel.new(1) + + scope:finally(function () + safe.pcall(function () server:close() end) + os.remove(ipc_path) + end) + + assert_true(scope:spawn(function () + local stream, aerr = server:accept() + if not stream then + ready_ch:put({ ok = false, err = aerr or 'mcu child ipc accept failed' }) + return + end + while true do + local line, rerr = fibers.perform(stream:read_line_op()) + if line == nil then + if fake.child_ready ~= true then + ready_ch:put({ ok = false, err = rerr or 'mcu child ipc closed before ready' }) + end + return + end + local ev, derr = cjson.decode(line) + if not ev then + log('MCU child sent invalid IPC JSON: ' .. tostring(derr)) + else + update_fake_from_mcu_child(fake, ev) + if ev.event == 'ready' then + ready_ch:put({ ok = true }) + end + end + end + end)) + + local child_script = './integration/devhost/support/mcu_http_uart_child.lua' + local cmd = exec.command({ + 'lua', child_script, + '--ipc', ipc_path, + '--uart', uart_port.slave_name, + '--old-image', fake.old_image_id or 'mcu-image-old', + '--committed-image', fake.committed_image_id or '', + '--boot-seq', tostring((fake.boot_seq or 0) + 1), + cwd = '.', + stdin = 'null', + stdout = 'inherit', + stderr = 'inherit', + shutdown_grace = 1.0, + env = { + MCU_HTTP_UART_TRANSFER_TIMEOUT_S = tostring(REALISTIC_TRANSFER_TIMEOUT_S), + MCU_HTTP_UART_FLASH_DELAY_S = tostring(MCU_FLASH_WRITE_DELAY_S), + }, + }) + + assert_true(scope:spawn(function () + local status, code, signal, err = fibers.perform(cmd:run_op()) + fake.child_exit = { status = status, code = code, signal = signal, err = err } + if status ~= 'signalled' and not (status == 'exited' and code == 0) then + log(('MCU child exited: status=%s code=%s signal=%s err=%s'):format( + tostring(status), tostring(code), tostring(signal), tostring(err) + )) + end + end)) + + local ready = wait_channel_get(ready_ch, 6.0, 'MCU child ready') + if not ready.ok then error(ready.err or 'MCU child failed to start', 0) end + + return { scope = scope, ipc_path = ipc_path, command = cmd } end local function stop_instance(inst, reason) @@ -1303,9 +1448,9 @@ function T.ui_http_mcu_update_short_stage_timeout_fails_via_outer_choice() stage_action_timeout_s = REALISTIC_TRANSFER_TIMEOUT_S, }) log('short-timeout: booting fake MCU instance') - local mcu = start_mcu_instance(root_scope, fake) + local mcu = start_mcu_instance(root_scope, fake, uart.mcu) log('short-timeout: starting Fabric pair over PTY UART middleware') - local pair = start_fabric_pair(root_scope, cm5.bus, mcu.bus, fake, uart) + local pair = start_fabric_pair(root_scope, cm5.bus, fake, uart) log('short-timeout: starting CM5 Device/Update/UI services') cm5.start_services_after_fabric() @@ -1325,12 +1470,13 @@ function T.ui_http_mcu_update_short_stage_timeout_fails_via_outer_choice() log('short-timeout: waiting for job to fail due to the outer stage deadline') local failed = wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'failed', 20.0, function () - return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d'):format( + return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d'):format( fake.receive_bytes or 0, fake.receive_chunks or 0, pair.uart_stats and pair.uart_stats.bytes or 0, pair.uart_stats and pair.uart_stats.fragments or 0, - pair.uart_stats and pair.uart_stats.pauses or 0 + pair.uart_stats and pair.uart_stats.pauses or 0, + pair.uart_stats and pair.uart_stats.malformed_lines or 0 ) end) assert_contains(failed.error, 'stage_op_timeout', 'job should fail from the active stage deadline') @@ -1362,9 +1508,9 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() log('booting initial CM5 instance') local cm5 = start_cm5_instance(root_scope, roots, port, uart.cm5) log('booting initial fake MCU instance') - local mcu = start_mcu_instance(root_scope, fake) + local mcu = start_mcu_instance(root_scope, fake, uart.mcu) log('starting initial Fabric pair over PTY UART middleware') - local pair = start_fabric_pair(root_scope, cm5.bus, mcu.bus, fake, uart) + local pair = start_fabric_pair(root_scope, cm5.bus, fake, uart) log('starting CM5 Device/Update/UI services') cm5.start_services_after_fabric() @@ -1399,25 +1545,31 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() log('waiting for job awaiting_commit') wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'awaiting_commit', REALISTIC_TRANSFER_TIMEOUT_S, function () - return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d'):format( + return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d'):format( fake.receive_bytes or 0, fake.receive_chunks or 0, pair.uart_stats and pair.uart_stats.bytes or 0, pair.uart_stats and pair.uart_stats.fragments or 0, - pair.uart_stats and pair.uart_stats.pauses or 0 + pair.uart_stats and pair.uart_stats.pauses or 0, + pair.uart_stats and pair.uart_stats.malformed_lines or 0 ) end) + local expected_payload_digest = xxhash32.digest_hex(blob) assert_true(probe.wait_until(function () - return fake.staged and fake.staged.bytes == blob - end, { timeout = 10.0 }), 'fake MCU should stage transferred artifact') + return fake.staged + and fake.staged.size == #blob + and fake.staged.payload_digest == expected_payload_digest + end, { timeout = 10.0 }), 'fake MCU should stage transferred artifact with matching digest') local stage_elapsed = fibers.now() - stage_started local uart_bytes = pair.uart_stats and pair.uart_stats.bytes or 0 local uart_fragments = pair.uart_stats and pair.uart_stats.fragments or 0 local uart_pauses = pair.uart_stats and pair.uart_stats.pauses or 0 - log(('artifact staged in %.1fs; middleware relayed %d bytes in %d fragments with %d long pauses'):format( - stage_elapsed, uart_bytes, uart_fragments, uart_pauses + local uart_malformed_lines = pair.uart_stats and pair.uart_stats.malformed_lines or 0 + log(('artifact staged in %.1fs; middleware relayed %d bytes in %d fragments with %d long pauses and %d malformed JSONL lines'):format( + stage_elapsed, uart_bytes, uart_fragments, uart_pauses, uart_malformed_lines )) assert_true(uart_bytes >= #blob, 'UART middleware should relay at least the artifact payload size') + assert_true(uart_malformed_lines >= 1, 'UART middleware should inject at least one standalone malformed JSONL line') assert_true( uart_fragments >= math.floor(#blob / UART_MAX_FRAGMENT_BYTES), 'UART middleware should fragment the byte stream heavily' @@ -1457,9 +1609,9 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() log('reboot: starting fresh CM5 instance') local cm5b = start_cm5_instance(root_scope, roots, nil, uart_b.cm5) log('reboot: starting fresh MCU instance') - local mcub = start_mcu_instance(root_scope, fake) + local mcub = start_mcu_instance(root_scope, fake, uart_b.mcu) log('reboot: starting fresh Fabric pair over PTY UART middleware') - local pair_b = start_fabric_pair(root_scope, cm5b.bus, mcub.bus, fake, uart_b) + local pair_b = start_fabric_pair(root_scope, cm5b.bus, fake, uart_b) log('reboot: starting fresh CM5 Device/Update services') cm5b.start_services_after_fabric() diff --git a/tests/integration/devhost/support/mcu_http_uart_child.lua b/tests/integration/devhost/support/mcu_http_uart_child.lua new file mode 100644 index 00000000..37a316da --- /dev/null +++ b/tests/integration/devhost/support/mcu_http_uart_child.lua @@ -0,0 +1,399 @@ +-- tests/integration/devhost/support/mcu_http_uart_child.lua +-- +-- Separate-process fake MCU for the HTTP-over-UART devhost test. This keeps +-- the MCU bus, scheduler and HAL UART manager out of the parent test process +-- while still using the real Fabric and HAL UART open path on the MCU side. + +local function add_path(prefix) + package.path = prefix .. '?.lua;' .. prefix .. '?/init.lua;' .. package.path +end + +package.path = '../src/?.lua;' .. package.path +package.path = '../?.lua;../?/init.lua;./?.lua;./?/init.lua;' .. package.path +add_path('../vendor/lua-fibers/src/') +add_path('../vendor/lua-bus/src/') +add_path('../vendor/lua-trie/src/') +add_path('./') + +local stdlib_ok, stdlib = pcall(require, 'posix.stdlib') +if stdlib_ok and stdlib and stdlib.setenv then + stdlib.setenv('CONFIG_TARGET', 'services', true) +end + +local busmod = require 'bus' +local fibers = require 'fibers' +local channel = require 'fibers.channel' +local op = require 'fibers.op' +local sleep = require 'fibers.sleep' +local socket = require 'fibers.io.socket' +local safe = require 'coxpcall' + +local cjson_ok, cjson = pcall(require, 'cjson.safe') +if not cjson_ok then cjson = require 'cjson' end + +local hal_types = require 'services.hal.types.core' +local cap_args = require 'services.hal.types.capability_args' +local fabric = require 'services.fabric' +local bus_cleanup = require 'devicecode.support.bus_cleanup' +local xxhash32 = require 'shared.hash.xxhash32' + +local UART_BYTES_PER_SEC = 11520 +local REALISTIC_TRANSFER_TIMEOUT_S = tonumber(os.getenv('MCU_HTTP_UART_TRANSFER_TIMEOUT_S')) or 300.0 +local MCU_PROGRESS_LOG_BYTES = 16 * 1024 +local MCU_FLASH_WRITE_DELAY_S = tonumber(os.getenv('MCU_HTTP_UART_FLASH_DELAY_S')) or 0.010 + +local function parse_args(argv) + local out = {} + local i = 1 + while i <= #argv do + local k = argv[i] + if k == '--ipc' then i = i + 1; out.ipc = argv[i] + elseif k == '--uart' then i = i + 1; out.uart = argv[i] + elseif k == '--old-image' then i = i + 1; out.old_image_id = argv[i] + elseif k == '--committed-image' then i = i + 1; out.committed_image_id = argv[i] ~= '' and argv[i] or nil + elseif k == '--boot-seq' then i = i + 1; out.boot_seq = tonumber(argv[i]) or 1 + else error('unknown argument: ' .. tostring(k), 0) end + i = i + 1 + end + assert(out.ipc and out.ipc ~= '', '--ipc required') + assert(out.uart and out.uart ~= '', '--uart required') + out.old_image_id = out.old_image_id or 'mcu-image-old' + return out +end + +local function assert_true(v, msg) if v ~= true then error(msg or ('expected true, got ' .. tostring(v)), 0) end end +local function assert_not_nil(v, msg) if v == nil then error(msg or 'expected non-nil', 0) end end +local function assert_eq(a, b, msg) if a ~= b then error(msg or ('expected ' .. tostring(b) .. ', got ' .. tostring(a)), 0) end end + +local function dummy_logger() + local logger = {} + for _, k in ipairs({ 'debug', 'info', 'warn', 'error' }) do logger[k] = function () end end + function logger:child() return self end + return logger +end + +local function wait_channel_get(ch, timeout_s, what) + local which, a, b = fibers.perform(op.named_choice({ + item = ch:get_op(), + timeout = sleep.sleep_op(timeout_s or 1.0), + })) + if which == 'timeout' then error(('timed out waiting for %s'):format(what or 'channel item'), 0) end + if a == nil then error(('channel closed while waiting for %s: %s'):format(what or 'channel item', tostring(b)), 0) end + return a +end + +local function wait_device_event(dev_ev_ch, event_type, class, id, timeout_s) + local deadline = fibers.now() + (timeout_s or 1.5) + while fibers.now() < deadline do + local ev = wait_channel_get(dev_ev_ch, deadline - fibers.now(), 'UART device event') + if ev.event_type == event_type and ev.class == class and ev.id == id then return ev end + end + error(('timed out waiting for UART device event %s %s/%s'):format(tostring(event_type), tostring(class), tostring(id)), 0) +end + +local function wait_uart_cap(dev_ev_ch) + local added = wait_device_event(dev_ev_ch, 'added', 'uart', 'uart0', 1.5) + assert_true(type(added.capabilities) == 'table' and #added.capabilities == 1, 'UART added event missing capability') + local cap = added.capabilities[1] + assert_eq(cap.class, 'uart') + assert_eq(cap.id, 'uart0') + assert_true(type(cap.control_ch) == 'table', 'UART capability should expose control_ch') + return cap +end + +local function normalise_uart_open_opts(opts) + if opts == nil or getmetatable(opts) ~= cap_args.UARTOpenOpts then + local open_opts, err = cap_args.new.UARTOpenOpts(opts) + assert_not_nil(open_opts, tostring(err)) + return open_opts + end + return opts +end + +local function call_hal_control(cap, verb, opts) + local reply_ch = channel.new(1) + local req, err = hal_types.new.ControlRequest(verb, opts or {}, reply_ch) + assert_not_nil(req, tostring(err)) + fibers.perform(cap.control_ch:put_op(req)) + local reply = wait_channel_get(reply_ch, 1.0, 'HAL UART control reply') + assert_true(type(reply) == 'table', 'HAL control reply must be a table') + return reply +end + +local function expose_raw_host_uart_open(scope, bus, cap, source) + source = source or 'uart_manager' + local conn = bus:connect({ origin_base = { service = 'mcu-child-hal-uart-adapter' } }) + local cap_id = cap.id + local ep = conn:bind({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'rpc', 'open' }, { queue_len = 8 }) + + conn:retain({ 'raw', 'host', source, 'status' }, { state = 'available', available = true, source = source, class = 'uart', id = cap_id }) + conn:retain({ 'raw', 'host', source, 'meta' }, { source = source, class = 'uart', id = cap_id }) + conn:retain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'status' }, { state = 'available', available = true, source_kind = 'host', source = source }) + conn:retain({ 'raw', 'host', source, 'cap', 'uart', cap_id, 'meta' }, { source_kind = 'host', source = source, offerings = { open = true } }) + + scope:finally(function () + safe.pcall(function () ep:unbind() end) + bus_cleanup.disconnect(conn) + end) + + assert_true(scope:spawn(function () + while true do + local req = ep:recv() + if req == nil then return end + local open_opts = normalise_uart_open_opts(req.payload) + local reply = call_hal_control(cap, 'open', open_opts) + local replied = req:reply(reply) + if not replied and reply.ok == true and type(reply.reason) == 'table' + and type(reply.reason.session) == 'table' + and type(reply.reason.session.terminate) == 'function' + then + reply.reason.session:terminate('fabric request abandoned') + end + end + end)) + + return { source = source, class = 'uart', id = cap_id } +end + +local function fresh_uart_manager() + package.loaded['services.hal.managers.uart'] = nil + package.loaded['services.hal.drivers.uart'] = nil + return require 'services.hal.managers.uart' +end + +local function start_uart_manager(scope, bus, uart_slave) + local uart_mgr = fresh_uart_manager() + local dev_ev_ch = channel.new(16) + local cap_emit_ch = channel.new(32) + local ok_start, start_err = fibers.perform(uart_mgr.start_op(dummy_logger(), dev_ev_ch, cap_emit_ch)) + assert_true(ok_start, tostring(start_err)) + scope:finally(function () safe.pcall(function () fibers.perform(uart_mgr.shutdown_op()) end) end) + + local ok_cfg, cfg_err = fibers.perform(uart_mgr.apply_config_op({ + { id = 'uart0', path = uart_slave, baud = 115200, mode = '8N1' }, + })) + assert_true(ok_cfg, tostring(cfg_err)) + local cap = wait_uart_cap(dev_ev_ch) + return expose_raw_host_uart_open(scope, bus, cap, 'uart_manager') +end + +local function fabric_config(local_node, peer_node, bridge, transfer) + return { + schema = fabric.config.SCHEMA, + local_node = local_node, + links = { + { + id = 'link-a', + peer_id = peer_node, + transport = { source = 'uart_manager', class = 'uart', id = 'uart0', terminator = '\n' }, + session = { hello_interval_s = 0.20, ping_interval_s = 5.0, liveness_timeout_s = 5.0 }, + bridge = bridge or {}, + transfer = transfer or { chunk_size = 2048, timeout_s = REALISTIC_TRANSFER_TIMEOUT_S }, + }, + }, + } +end + +local function mcu_fabric_config() + return fabric_config('mcu', 'cm5', { + exports = { + { id = 'mcu-state-export', ['local'] = { 'state', 'self' }, remote = { 'state', 'self' }, publish = true, retain = true }, + { id = 'mcu-cap-export', ['local'] = { 'cap', 'self' }, remote = { 'cap', 'self' }, publish = true, retain = true }, + }, + rpc = { + inbound = { + { id = 'mcu-prepare-in', ['local'] = { 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }, remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }, timeout_s = 2.0 }, + { id = 'mcu-commit-in', ['local'] = { 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }, remote = { 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }, timeout_s = 2.0 }, + }, + }, + }, { chunk_size = 2048, timeout_s = REALISTIC_TRANSFER_TIMEOUT_S }) +end + +local function start_public_fabric(scope, conn, cfg, opts) + opts = opts or {} + assert_true(scope:spawn(function () + fabric.start(conn, { + name = opts.name or 'fabric-mcu-child', + env = 'test', + config = cfg, + link_overrides = opts.link_overrides, + }) + end)) +end + +local function publish_mcu_facts(conn, fake) + local image = fake.committed_image_id or fake.old_image_id + local boot = 'mcu-boot-' .. tostring(fake.boot_seq) + conn:retain({ 'state', 'self', 'software' }, { image_id = image, boot_id = boot, version = image }) + conn:retain({ 'state', 'self', 'updater' }, { + state = 'ready', + last_error = nil, + staged_image_id = fake.staged and fake.staged.image_id or nil, + pending_image_id = fake.committed_image_id, + job_id = fake.job_id, + }) + conn:retain({ 'cap', 'self', 'updater', 'main', 'meta' }, { class = 'updater', id = 'main', methods = { 'prepare-update', 'commit-update' } }) + conn:retain({ 'cap', 'self', 'updater', 'main', 'status' }, { available = true, state = 'available' }) +end + +local function new_mcu_receive_target(fake, emit) + local target = {} + function target:open_sink_op(req) + fake.transfer_begin = req + fake.receive_started_at = fibers.now() + fake.receive_bytes = 0 + fake.receive_chunks = 0 + fake.receive_next_log = MCU_PROGRESS_LOG_BYTES + assert_eq(req.target, 'updater/main') + emit({ event = 'receive_opened', target = req.target, size = req.size, job_id = req.meta and req.meta.job_id, image_id = req.meta and (req.meta.image_id or req.meta.expected_image_id) }) + local chunks = {} + local sink = {} + function sink:append_op(chunk) + return fibers.run_scope_op(function () + fibers.perform(sleep.sleep_op(MCU_FLASH_WRITE_DELAY_S)) + chunks[#chunks + 1] = chunk + fake.receive_bytes = (fake.receive_bytes or 0) + #(chunk or '') + fake.receive_chunks = (fake.receive_chunks or 0) + 1 + if fake.receive_bytes >= (fake.receive_next_log or MCU_PROGRESS_LOG_BYTES) then + local elapsed = fibers.now() - (fake.receive_started_at or fibers.now()) + emit({ event = 'receive_progress', bytes = fake.receive_bytes, chunks = fake.receive_chunks or 0, elapsed_s = elapsed }) + fake.receive_next_log = fake.receive_bytes + MCU_PROGRESS_LOG_BYTES + else + emit({ event = 'receive_tick', bytes = fake.receive_bytes, chunks = fake.receive_chunks or 0 }) + end + return true, nil + end):wrap(function (status, _report, ok, err) + if status ~= 'ok' then return nil, err or status end + return ok, err + end) + end + function sink:commit_op(req2) + local bytes = table.concat(chunks) + local payload_digest = xxhash32.digest_hex(bytes) + fake.staged = { size = #bytes, digest = req2.digest, payload_digest = payload_digest, image_id = req.meta and (req.meta.image_id or req.meta.expected_image_id), job_id = req.meta and req.meta.job_id } + fake.staged_signal = (fake.staged_signal or 0) + 1 + emit({ event = 'transfer_commit', size = #bytes, chunks = fake.receive_chunks or 0, digest = req2.digest, payload_digest = payload_digest, image_id = fake.staged.image_id, job_id = fake.staged.job_id }) + return op.always({ staged = true, digest = req2.digest }, nil) + end + function sink:abort(reason) + fake.abort_reason = reason + fake.abort_count = (fake.abort_count or 0) + 1 + emit({ event = 'transfer_abort', reason = tostring(reason), bytes = fake.receive_bytes or 0, chunks = fake.receive_chunks or 0, abort_count = fake.abort_count }) + return true, nil + end + return op.always(sink, nil) + end + return target +end + +local function start_fake_mcu(scope, bus, fake, emit) + local conn = bus:connect({ origin_base = { service = 'fake-mcu-child' } }) + publish_mcu_facts(conn, fake) + + local eps = {} + local function bind(topic) + local ep, err = bus_cleanup.bind(conn, topic, { queue_len = 8 }) + assert_not_nil(ep, err) + eps[#eps + 1] = ep + return ep + end + + local prepare_ep = bind({ 'cap', 'self', 'updater', 'main', 'rpc', 'prepare-update' }) + local commit_ep = bind({ 'cap', 'self', 'updater', 'main', 'rpc', 'commit-update' }) + + scope:finally(function () + for _, ep in ipairs(eps) do bus_cleanup.unbind(conn, ep) end + bus_cleanup.disconnect(conn) + end) + + assert_true(scope:spawn(function () + while true do + local req = fibers.perform(prepare_ep:recv_op()) + if req == nil then return end + fake.prepare_payload = req.payload + assert_eq(type(req.payload) == 'table' and req.payload.target or nil, 'mcu') + fake.job_id = type(req.payload) == 'table' and req.payload.job_id or nil + conn:retain({ 'state', 'self', 'updater' }, { state = 'ready', last_error = nil, job_id = fake.job_id }) + emit({ event = 'prepare', payload = req.payload, job_id = fake.job_id }) + req:reply({ ready = true, target = 'updater/main', max_chunk_size = 2048 }) + end + end)) + + assert_true(scope:spawn(function () + while true do + local req = fibers.perform(commit_ep:recv_op()) + if req == nil then return end + fake.commit_payload = req.payload + fake.commit_seen = true + fake.committed_image_id = (fake.staged and fake.staged.image_id) or (type(req.payload) == 'table' and req.payload.expected_image_id) + conn:retain({ 'state', 'self', 'updater' }, { + state = 'rebooting', + last_error = nil, + pending_image_id = fake.committed_image_id, + staged_image_id = fake.staged and fake.staged.image_id or nil, + job_id = fake.job_id, + }) + emit({ event = 'commit', payload = req.payload, committed_image_id = fake.committed_image_id }) + req:reply({ accepted = true, reboot_required = true }) + end + end)) + + return conn +end + +local unix_ok, unistd = pcall(require, 'posix.unistd') +local child_pid = (unix_ok and unistd and unistd.getpid and tostring(unistd.getpid())) or '' + +local function main(scope) + local args = parse_args(arg or {}) + local ipc, ierr = socket.connect_unix(args.ipc) + assert(ipc, 'IPC connect failed: ' .. tostring(ierr)) + + local events = channel.new(256) + local function emit(ev) + ev = ev or {} + ev.pid = child_pid + local ok, err = fibers.perform(events:put_op(ev)) + return ok, err + end + + scope:spawn(function () + while true do + local ev = fibers.perform(events:get_op()) + if ev == nil then return end + local line = assert(cjson.encode(ev)) .. '\n' + local _, werr = fibers.perform(ipc:write_op(line)) + if werr then return end + end + end) + + local bus = busmod.new() + local fake = { + old_image_id = args.old_image_id, + committed_image_id = args.committed_image_id, + boot_seq = args.boot_seq, + } + + start_uart_manager(scope, bus, args.uart) + start_fake_mcu(scope, bus, fake, emit) + + local conn = bus:connect({ origin_base = { service = 'fabric-mcu-child' } }) + scope:finally(function () bus_cleanup.disconnect(conn) end) + start_public_fabric(scope, conn, mcu_fabric_config(), { + name = 'fabric-mcu-child', + link_overrides = { + ['link-a'] = { + transfer = { + chunk_size = 2048, + timeout_s = REALISTIC_TRANSFER_TIMEOUT_S, + receive_targets = { ['updater/main'] = new_mcu_receive_target(fake, emit) }, + }, + }, + }, + }) + + emit({ event = 'ready', boot_seq = fake.boot_seq, image_id = fake.committed_image_id or fake.old_image_id, uart = args.uart }) + fibers.perform(op.never()) +end + +fibers.run(main) From 7db88f14fc84cd1d679a12a8e353e1389a5593e6 Mon Sep 17 00:00:00 2001 From: Rich Thanki Date: Mon, 8 Jun 2026 02:56:56 +0000 Subject: [PATCH 3/3] adds detailed fabric logging --- .../devhost/mcu_update_http_uart_spec.lua | 158 +++++++++++++++++- .../devhost/support/mcu_http_uart_child.lua | 131 +++++++++++++++ 2 files changed, 283 insertions(+), 6 deletions(-) diff --git a/tests/integration/devhost/mcu_update_http_uart_spec.lua b/tests/integration/devhost/mcu_update_http_uart_spec.lua index 2afb92c3..71c43f4e 100644 --- a/tests/integration/devhost/mcu_update_http_uart_spec.lua +++ b/tests/integration/devhost/mcu_update_http_uart_spec.lua @@ -247,6 +247,119 @@ local function wait_retained_payload_where(conn, topic, label, pred, opts) return value end + +local function topic_string(topic) + if type(topic) ~= 'table' then return tostring(topic) end + local out = {} + for i = 1, #topic do out[i] = tostring(topic[i]) end + return table.concat(out, '/') +end + +local function fabric_payload_snapshot(payload) + if type(payload) ~= 'table' then return nil end + return type(payload.snapshot) == 'table' and payload.snapshot or payload +end + +local function compact_fabric_session(s) + s = type(s) == 'table' and s or {} + return ('phase=%s established=%s local=%s peer_node=%s peer_sid=%s gen=%s wire_errors=%s bad_frames=%s last_wire_error=%s why=%s'):format( + tostring(s.phase), tostring(s.established), tostring(s.local_node), tostring(s.peer_node), + tostring(s.peer_sid), tostring(s.session_generation), tostring(s.wire_errors or 0), + tostring(s.bad_frame_count or 0), tostring(s.last_wire_error), tostring(s.why) + ) +end + +local function compact_fabric_bridge(s) + s = type(s) == 'table' and s or {} + return ('state=%s imported=%s pending=%s inbound=%s frames_sent=%s frames_recv=%s session_peer=%s drop=%s err=%s'):format( + tostring(s.state), tostring(s.imported_topics), tostring(s.pending_calls), tostring(s.inbound_calls), + tostring(s.frames_sent), tostring(s.frames_received), + tostring(type(s.session) == 'table' and s.session.peer_sid or nil), + tostring(s.session_drop_reason), tostring(s.last_err) + ) +end + +local function compact_fabric_transfer(s) + s = type(s) == 'table' and s or {} + local stats = type(s.stats) == 'table' and s.stats or {} + local active = type(s.active) == 'table' and s.active or nil + local last = type(s.last) == 'table' and s.last or nil + return ('active=%s active_status=%s last_status=%s completed=%s failed=%s cancelled=%s stale=%s'):format( + tostring(active ~= nil), tostring(active and active.status), tostring(last and last.status), + tostring(stats.completed), tostring(stats.failed), tostring(stats.cancelled), tostring(stats.stale) + ) +end + +local function compact_fabric_link(s) + s = type(s) == 'table' and s or {} + local comps = {} + if type(s.components) == 'table' then + for name, rec in pairs(s.components) do + comps[#comps + 1] = tostring(name) .. '=' .. tostring(type(rec) == 'table' and rec.status or rec) + end + table.sort(comps) + end + return ('state=%s completed=%s/%s reason=%s components=[%s]'):format( + tostring(s.state), tostring(s.completed), tostring(s.total), tostring(s.reason), table.concat(comps, ',') + ) +end + +local function describe_fabric_status_payload(payload, component) + local s = fabric_payload_snapshot(payload) + if component == 'session' then return compact_fabric_session(s) end + if component == 'rpc_bridge' then return compact_fabric_bridge(s) end + if component == 'transfer_manager' or component == 'transfer' then return compact_fabric_transfer(s) end + return compact_fabric_link(s) +end + +local function retained_payload_now(conn, topic) + local view = conn:retained_view(topic) + local msg = view:get(topic) + view:close() + return msg and msg.payload or nil +end + +local function log_fabric_status(conn, prefix) + prefix = prefix or 'fabric' + local topics = { + { label = 'link', topic = fabric_topics.state_link('link-a') }, + { label = 'session', component = 'session', topic = fabric_topics.state_link_component('link-a', 'session') }, + { label = 'rpc_bridge', component = 'rpc_bridge', topic = fabric_topics.state_link_component('link-a', 'rpc_bridge') }, + { label = 'transfer_manager', component = 'transfer_manager', topic = fabric_topics.state_link_component('link-a', 'transfer_manager') }, + } + for _, item in ipairs(topics) do + local payload = retained_payload_now(conn, item.topic) + log(('%s %s %s -> %s'):format(prefix, item.label, topic_string(item.topic), describe_fabric_status_payload(payload, item.component))) + end +end + +local function wait_fabric_session_established(conn, label, opts) + opts = opts or {} + local topic = fabric_topics.state_link_component('link-a', 'session') + local payload = wait_retained_payload_where(conn, topic, label or 'fabric session established', function (p) + local s = fabric_payload_snapshot(p) + if type(s) == 'table' and s.established == true and type(s.peer_sid) == 'string' and s.peer_sid ~= '' then + return p + end + return nil + end, { timeout = opts.timeout or 6.0 }) + log((label or 'fabric session established') .. ': ' .. describe_fabric_status_payload(payload, 'session')) + return payload +end + + +local function fabric_progress_fragment(conn) + if conn == nil then return '' end + local session = retained_payload_now(conn, fabric_topics.state_link_component('link-a', 'session')) + local bridge = retained_payload_now(conn, fabric_topics.state_link_component('link-a', 'rpc_bridge')) + local transfer = retained_payload_now(conn, fabric_topics.state_link_component('link-a', 'transfer_manager')) + return ('fabric_session=(%s) fabric_bridge=(%s) fabric_transfer=(%s)'):format( + describe_fabric_status_payload(session, 'session'), + describe_fabric_status_payload(bridge, 'rpc_bridge'), + describe_fabric_status_payload(transfer, 'transfer_manager') + ) +end + local function fresh_uart_manager() -- The UART manager is currently module-singleton state. The filtered test -- runner still requires every spec module before it runs the selected test, @@ -279,6 +392,15 @@ local function wait_channel_get(ch, timeout_s, what) return a end +local function wait_until_test(label, pred, timeout_s, interval_s) + local deadline = fibers.now() + (timeout_s or 2.0) + while fibers.now() < deadline do + if pred() then return true end + fibers.perform(sleep.sleep_op(interval_s or 0.05)) + end + error('timed out waiting for ' .. tostring(label), 0) +end + local function wait_device_event(dev_ev_ch, event_type, class, id, timeout_s) local deadline = fibers.now() + (timeout_s or 1.5) while fibers.now() < deadline do @@ -1188,6 +1310,15 @@ local function start_fabric_pair(parent_scope, cm5_bus, fake, uart) return p and p.available == true and p end, { timeout = 6.0 }) + local cm5_session = wait_fabric_session_established(cm5_conn, 'CM5 fabric hello/hello_ack session established', { timeout = 6.0 }) + local cm5_snapshot = fabric_payload_snapshot(cm5_session) or {} + assert_eq(cm5_snapshot.peer_node, 'mcu', 'CM5 fabric session should identify MCU peer') + wait_until_test('MCU child fabric hello/hello_ack session established', function () + return fake and fake.mcu_fabric_session_seen == true + end, 6.0, 0.05) + assert_eq(fake.mcu_fabric_session and fake.mcu_fabric_session.peer_node, 'cm5', 'MCU fabric session should identify CM5 peer') + log_fabric_status(cm5_conn, 'CM5 fabric established status') + return { scope = pair_scope, cm5_conn = cm5_conn, @@ -1289,6 +1420,15 @@ local function update_fake_from_mcu_child(fake, ev) fake.commit_payload = ev.payload fake.commit_seen = true fake.committed_image_id = ev.committed_image_id + elseif event == 'fabric_session' then + fake.mcu_fabric_session = ev + fake.mcu_fabric_session_seen = true + log(('MCU child fabric session: phase=%s established=%s peer_node=%s peer_sid=%s gen=%s wire_errors=%s bad_frames=%s'):format( + tostring(ev.phase), tostring(ev.established), tostring(ev.peer_node), tostring(ev.peer_sid), + tostring(ev.session_generation), tostring(ev.wire_errors or 0), tostring(ev.bad_frame_count or 0) + )) + elseif event == 'fabric_status' then + log(('MCU child fabric %s: %s'):format(tostring(ev.component or ev.label or 'status'), tostring(ev.summary))) elseif event == 'log' then log('MCU child: ' .. tostring(ev.message)) end @@ -1296,6 +1436,8 @@ end local function start_mcu_instance(parent_scope, fake, uart_port) assert(uart_port and uart_port.slave_name, 'MCU PTY slave required') + fake.mcu_fabric_session_seen = false + fake.mcu_fabric_session = nil local scope = assert(parent_scope:child()) local ipc_path = ('/tmp/devicecode-mcu-child-%d-%d.sock'):format(os.time(), math.random(100000, 999999)) os.remove(ipc_path) @@ -1470,14 +1612,15 @@ function T.ui_http_mcu_update_short_stage_timeout_fails_via_outer_choice() log('short-timeout: waiting for job to fail due to the outer stage deadline') local failed = wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'failed', 20.0, function () - return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d'):format( + return (('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d; %s'):format( fake.receive_bytes or 0, fake.receive_chunks or 0, pair.uart_stats and pair.uart_stats.bytes or 0, pair.uart_stats and pair.uart_stats.fragments or 0, pair.uart_stats and pair.uart_stats.pauses or 0, - pair.uart_stats and pair.uart_stats.malformed_lines or 0 - ) + pair.uart_stats and pair.uart_stats.malformed_lines or 0, + fabric_progress_fragment(pair.cm5_conn) + )) end) assert_contains(failed.error, 'stage_op_timeout', 'job should fail from the active stage deadline') assert_true((fake.receive_bytes or 0) < #blob, 'short timeout should not stage the full artifact') @@ -1545,14 +1688,15 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() log('waiting for job awaiting_commit') wait_job_chatty(cm5.conn, 'job-mcu-http-uart', 'awaiting_commit', REALISTIC_TRANSFER_TIMEOUT_S, function () - return ('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d'):format( + return (('mcu_received=%d chunks=%d uart_bytes=%d uart_fragments=%d uart_pauses=%d uart_bad_json=%d; %s'):format( fake.receive_bytes or 0, fake.receive_chunks or 0, pair.uart_stats and pair.uart_stats.bytes or 0, pair.uart_stats and pair.uart_stats.fragments or 0, pair.uart_stats and pair.uart_stats.pauses or 0, - pair.uart_stats and pair.uart_stats.malformed_lines or 0 - ) + pair.uart_stats and pair.uart_stats.malformed_lines or 0, + fabric_progress_fragment(pair.cm5_conn) + )) end) local expected_payload_digest = xxhash32.digest_hex(blob) assert_true(probe.wait_until(function () @@ -1581,6 +1725,7 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() stage_elapsed >= ((#blob / UART_BYTES_PER_SEC) * 0.75), 'artifact should take UART-paced time to stage' ) + log_fabric_status(cm5.conn, 'CM5 fabric post-stage status') log('committing job through curl HTTP JSON command route') local commit_status, commit_body, commit_decoded = run_http_json( @@ -1612,6 +1757,7 @@ function T.ui_http_mcu_update_survives_fake_reboot_and_reconciles() local mcub = start_mcu_instance(root_scope, fake, uart_b.mcu) log('reboot: starting fresh Fabric pair over PTY UART middleware') local pair_b = start_fabric_pair(root_scope, cm5b.bus, fake, uart_b) + log_fabric_status(cm5b.conn, 'CM5 fabric reboot-established status') log('reboot: starting fresh CM5 Device/Update services') cm5b.start_services_after_fabric() diff --git a/tests/integration/devhost/support/mcu_http_uart_child.lua b/tests/integration/devhost/support/mcu_http_uart_child.lua index 37a316da..8ed1df7b 100644 --- a/tests/integration/devhost/support/mcu_http_uart_child.lua +++ b/tests/integration/devhost/support/mcu_http_uart_child.lua @@ -34,6 +34,7 @@ if not cjson_ok then cjson = require 'cjson' end local hal_types = require 'services.hal.types.core' local cap_args = require 'services.hal.types.capability_args' local fabric = require 'services.fabric' +local fabric_topics = require 'services.fabric.topics' local bus_cleanup = require 'devicecode.support.bus_cleanup' local xxhash32 = require 'shared.hash.xxhash32' @@ -177,6 +178,135 @@ local function start_uart_manager(scope, bus, uart_slave) return expose_raw_host_uart_open(scope, bus, cap, 'uart_manager') end + +local function fabric_payload_snapshot(payload) + if type(payload) ~= 'table' then return nil end + return type(payload.snapshot) == 'table' and payload.snapshot or payload +end + +local function compact_fabric_session(s) + s = type(s) == 'table' and s or {} + return ('phase=%s established=%s local=%s peer_node=%s peer_sid=%s gen=%s wire_errors=%s bad_frames=%s last_wire_error=%s why=%s'):format( + tostring(s.phase), tostring(s.established), tostring(s.local_node), tostring(s.peer_node), + tostring(s.peer_sid), tostring(s.session_generation), tostring(s.wire_errors or 0), + tostring(s.bad_frame_count or 0), tostring(s.last_wire_error), tostring(s.why) + ) +end + +local function compact_fabric_bridge(s) + s = type(s) == 'table' and s or {} + return ('state=%s imported=%s pending=%s inbound=%s frames_sent=%s frames_recv=%s session_peer=%s drop=%s err=%s'):format( + tostring(s.state), tostring(s.imported_topics), tostring(s.pending_calls), tostring(s.inbound_calls), + tostring(s.frames_sent), tostring(s.frames_received), + tostring(type(s.session) == 'table' and s.session.peer_sid or nil), + tostring(s.session_drop_reason), tostring(s.last_err) + ) +end + +local function compact_fabric_transfer(s) + s = type(s) == 'table' and s or {} + local stats = type(s.stats) == 'table' and s.stats or {} + local active = type(s.active) == 'table' and s.active or nil + local last = type(s.last) == 'table' and s.last or nil + return ('active=%s active_status=%s last_status=%s completed=%s failed=%s cancelled=%s stale=%s'):format( + tostring(active ~= nil), tostring(active and active.status), tostring(last and last.status), + tostring(stats.completed), tostring(stats.failed), tostring(stats.cancelled), tostring(stats.stale) + ) +end + +local function compact_fabric_link(s) + s = type(s) == 'table' and s or {} + local comps = {} + if type(s.components) == 'table' then + for name, rec in pairs(s.components) do + comps[#comps + 1] = tostring(name) .. '=' .. tostring(type(rec) == 'table' and rec.status or rec) + end + table.sort(comps) + end + return ('state=%s completed=%s/%s reason=%s components=[%s]'):format( + tostring(s.state), tostring(s.completed), tostring(s.total), tostring(s.reason), table.concat(comps, ',') + ) +end + +local function describe_fabric_status_payload(payload, component) + local s = fabric_payload_snapshot(payload) + if component == 'session' then return compact_fabric_session(s) end + if component == 'rpc_bridge' then return compact_fabric_bridge(s) end + if component == 'transfer_manager' or component == 'transfer' then return compact_fabric_transfer(s) end + return compact_fabric_link(s) +end + +local function retained_payload_now(conn, topic) + local view = conn:retained_view(topic) + local msg = view:get(topic) + view:close() + return msg and msg.payload or nil +end + +local function wait_retained_payload_where(conn, topic, label, pred, timeout_s) + local view = conn:retained_view(topic) + local deadline = fibers.now() + (timeout_s or 6.0) + local seen = view:version() + while true do + local msg = view:get(topic) + local payload = msg and msg.payload or nil + local out = pred(payload) + if out then view:close(); return out end + local remaining = deadline - fibers.now() + if remaining <= 0 then view:close(); error('timed out waiting for ' .. tostring(label), 0) end + local which, version, reason = fibers.perform(op.named_choice({ + changed = view:changed_op(seen), + timeout = sleep.sleep_op(math.min(0.50, remaining)), + })) + if which == 'changed' then + if version == nil then view:close(); error(tostring(label) .. ' closed: ' .. tostring(reason or 'closed'), 0) end + seen = version + end + end +end + +local function start_fabric_status_reporter(scope, conn, emit) + assert_true(scope:spawn(function () + local payload = wait_retained_payload_where( + conn, + fabric_topics.state_link_component('link-a', 'session'), + 'MCU fabric session established', + function (p) + local s = fabric_payload_snapshot(p) + if type(s) == 'table' and s.established == true and type(s.peer_sid) == 'string' and s.peer_sid ~= '' then + return p + end + return nil + end, + 6.0 + ) + local s = fabric_payload_snapshot(payload) or {} + emit({ + event = 'fabric_session', + phase = s.phase, + established = s.established, + local_node = s.local_node, + peer_node = s.peer_node, + peer_sid = s.peer_sid, + session_generation = s.session_generation, + wire_errors = s.wire_errors or 0, + bad_frame_count = s.bad_frame_count or 0, + last_wire_error = s.last_wire_error, + summary = describe_fabric_status_payload(payload, 'session'), + }) + local items = { + { label = 'link', topic = fabric_topics.state_link('link-a') }, + { label = 'session', component = 'session', topic = fabric_topics.state_link_component('link-a', 'session') }, + { label = 'rpc_bridge', component = 'rpc_bridge', topic = fabric_topics.state_link_component('link-a', 'rpc_bridge') }, + { label = 'transfer_manager', component = 'transfer_manager', topic = fabric_topics.state_link_component('link-a', 'transfer_manager') }, + } + for _, item in ipairs(items) do + local p = retained_payload_now(conn, item.topic) + emit({ event = 'fabric_status', component = item.label, summary = describe_fabric_status_payload(p, item.component) }) + end + end)) +end + local function fabric_config(local_node, peer_node, bridge, transfer) return { schema = fabric.config.SCHEMA, @@ -391,6 +521,7 @@ local function main(scope) }, }, }) + start_fabric_status_reporter(scope, conn, emit) emit({ event = 'ready', boot_seq = fake.boot_seq, image_id = fake.committed_image_id or fake.old_image_id, uart = args.uart }) fibers.perform(op.never())