diff --git a/docs/switch.md b/docs/switch.md index 41ef79fc..d4a34a93 100644 --- a/docs/switch.md +++ b/docs/switch.md @@ -81,7 +81,11 @@ The only supported switch HTTP provider configuration spellings on this path are username = "$SWITCH_USERNAME", password = "$SWITCH_PASSWORD", timeout_s = 0.8, - poll_interval_s = 1.0, + poll = { + fast = { interval_s = 1.0, groups = { "panel", "poe", "counters" } }, + medium = { interval_s = 5.0, groups = { "vlan", "lldp" } }, + slow = { interval_s = 30.0, groups = { "identity", "runtime" } }, + }, http = { capability = "main", response_parser = "legacy-http1-close", @@ -185,7 +189,33 @@ lldp_local lldp_neighbor ``` -The driver captures these into normalised provider observations: +The provider has narrow read groups for grouped polling. Surface-bearing groups include `home_main` so that rows can be attached to the canonical switch surface names (`GE1` ... `GE10`): + +```text +panel path: home_main, panel_info +identity path: sys_sysinfo +vlan path: home_main, vlan_create, vlan_conf, vlan_port, vlan_membership +poe path: home_main, poe_poe +lldp path: lldp_local, lldp_neighbor +runtime path: sys_cpumem +counters path: home_main, rmon_statistics +``` + +The poll plan is based on timings measured against the fixed RTL8380M switch on 192.168.1.1 using a retained admin session: + +```text +panel avg 0.303 s, max 0.344 s +vlan avg 0.363 s, max 0.389 s +poe avg 0.077 s, max 0.084 s +lldp avg 0.160 s, max 0.183 s +counters avg 0.203 s, max 0.257 s +runtime avg 2.085 s, max 2.089 s +full read avg 3.522 s, max 3.554 s, with observed timeout +``` + +A concurrent probe over `panel,poe,counters,runtime` improved wall-clock time only modestly, from 2.688 s sequential average to 2.309 s concurrent average, so the production poller remains grouped and sequential. + +The driver captures the full snapshot into normalised provider observations: ```text raw/host/wired/provider/switch-main/status @@ -198,7 +228,21 @@ raw/host/wired/provider/switch-main/state/topology If `include_raw = true` is set in a test, the snapshot also keeps the source command payloads for parser debugging. Full raw CGI bodies should not be promoted to public retained state by default. -The HAL wired manager owns scheduling. For the RTL8380M provider it takes an immediate snapshot when configured and then polls at `poll_interval_s`, which is `1.0` seconds in the Big Box configuration. Polls are non-overlapping: if a read is slow or fails, the next poll is not queued behind it. Poll failures update only the provider status and leave the last good identity/runtime/power/surfaces/topology retained facts in place. +The HAL wired manager owns scheduling. For the RTL8380M provider, manager apply admits the provider and starts one owned provider runner; switch observation is not part of configuration admission. The Big Box poll plan is grouped: + +```text +fast, 1 Hz: panel, poe, counters +medium, 5 s: vlan, lldp +slow, 30 s: identity, runtime +``` + +There is one runner per provider, not one fibre per poll group. The runner lives in `services/hal/managers/wired/provider_runner.lua` and owns the backend object, request mailbox, switch session, observation cache and due-time schedule. Capability snapshot/control requests are sent to the runner mailbox, so the RTL8380M backend is touched only by the runner fibre. This gives serialisation by ownership rather than a lock or semaphore. + +Each runner cycle coalesces all due poll groups, calls the mandatory backend `observe_groups_op` once, and lets the backend de-duplicate shared CGI commands such as `home_main`. A saturated cycle schedules the next attempt from the finish time rather than trying to catch up, and applies a short minimum idle interval before another due cycle. Slow runtime reads can therefore degrade runtime status without creating overlapping switch sessions or a busy catch-up loop. + +Successful groups merge into the retained raw observation cache, so `state/surfaces` carries last-known link, PoE, counter and VLAN facts together. Group failures update provider status but leave the last good identity/runtime/power/surfaces/topology retained facts in place. + +The HAL wired manager emits raw provider facts on a changed-retained basis. A successful `panel` group can update `state/surfaces` without re-emitting unchanged identity/runtime/power/topology facts, and repeated identical provider statuses are suppressed. This keeps switch visibility in the provider status and semantic `state/wired/...` surfaces rather than turning the monitor into a per-request trace. Canonical observation names are deliberately strict. CPU and memory are published as `runtime.cpu` and `runtime.memory`; PoE device-level power and temperature are published as `power.poe`; port counters are published under each surface as `counters`. The switch path must not publish `telemetry.cpu`, `telemetry.mem`, `telemetry.poe`, or any compatibility topic for `state/telemetry`. diff --git a/src/configs/bigbox-v1-cm-2.json b/src/configs/bigbox-v1-cm-2.json index 87454588..53e1a591 100644 --- a/src/configs/bigbox-v1-cm-2.json +++ b/src/configs/bigbox-v1-cm-2.json @@ -31,6 +31,14 @@ "cm5-local-wired": { "provider": "static", "mode": "read_only", + "poll": { + "static": { + "interval_s": 30.0, + "groups": [ + "snapshot" + ] + } + }, "surfaces": { "eth0": { "provider_surface_id": "eth0", @@ -62,7 +70,30 @@ "username": "$SWITCH_USERNAME", "password": "$SWITCH_PASSWORD", "timeout_s": 0.8, - "poll_interval_s": 1.0, + "poll": { + "fast": { + "interval_s": 1.0, + "groups": [ + "panel", + "poe", + "counters" + ] + }, + "medium": { + "interval_s": 5.0, + "groups": [ + "vlan", + "lldp" + ] + }, + "slow": { + "interval_s": 30.0, + "groups": [ + "identity", + "runtime" + ] + } + }, "http": { "response_parser": "legacy-http1-close", "capability": "main", diff --git a/src/services/hal/backends/wired/provider.lua b/src/services/hal/backends/wired/provider.lua index bf9b9a39..4990f469 100644 --- a/src/services/hal/backends/wired/provider.lua +++ b/src/services/hal/backends/wired/provider.lua @@ -17,7 +17,9 @@ function M.new(config, opts) local ok, mod = pcall(require, modname) if not ok then return nil, ('wired provider %s not available: %s'):format(name, tostring(mod)) end if type(mod) ~= 'table' or type(mod.new) ~= 'function' then return nil, 'wired provider module must export new(config, opts)' end - return mod.new(config, opts or {}) + local backend, err = mod.new(config, opts or {}) + if not backend then return nil, err end + return backend, nil, name end return M diff --git a/src/services/hal/backends/wired/providers/rtl8380m_http.lua b/src/services/hal/backends/wired/providers/rtl8380m_http.lua index 07edd546..cef1b53c 100644 --- a/src/services/hal/backends/wired/providers/rtl8380m_http.lua +++ b/src/services/hal/backends/wired/providers/rtl8380m_http.lua @@ -39,6 +39,16 @@ local READ_COMMANDS = { 'rmon_statistics', } +local COMMAND_GROUPS = { + panel = { 'home_main', 'panel_info' }, + identity = { 'sys_sysinfo' }, + vlan = { 'home_main', 'vlan_create', 'vlan_conf', 'vlan_port', 'vlan_membership' }, + poe = { 'home_main', 'poe_poe' }, + lldp = { 'lldp_local', 'lldp_neighbor' }, + runtime = { 'sys_cpumem' }, + counters = { 'home_main', 'rmon_statistics' }, +} + local VLAN_MODE = { [0] = 'hybrid', [1] = 'access', @@ -60,6 +70,39 @@ local VLAN_MEMBERSHIP = { local function copy(v) return tablex.deep_copy(v) end +local function merge_table(dst, src) + dst = dst or {} + if type(src) ~= 'table' then return dst end + for k, v in pairs(src) do + if v ~= nil then + if type(v) == 'table' and type(dst[k]) == 'table' then + merge_table(dst[k], v) + else + dst[k] = copy(v) + end + end + end + return dst +end + +local function append_unique(out, seen, value) + value = tostring(value or '') + if value ~= '' and not seen[value] then + seen[value] = true + out[#out + 1] = value + end +end + +local function commands_for_groups(groups) + local out, seen = {}, {} + for _, group in ipairs(groups or {}) do + local commands = COMMAND_GROUPS[tostring(group or '')] + if not commands then return nil, 'unknown command group: ' .. tostring(group) end + for _, cmd in ipairs(commands) do append_unique(out, seen, cmd) end + end + return out, nil +end + local function trim(s) return (tostring(s or ''):gsub('^%s+', ''):gsub('%s+$', '')) end @@ -162,6 +205,18 @@ local function header_values(headers, name) return values end +local function reset_session(self) + self.jar = CookieJar.new({ cookie_language = 'defLang_en' }) + self.logged_in = false +end + +local function auth_invalid_body(body) + local s = tostring(body or ''):lower() + return s:find('login.html', 1, true) ~= nil + or s:find('home_login', 1, true) ~= nil + or s:find('loginstatus', 1, true) ~= nil +end + local function request(self, method, url, body, extra_headers) if not self.http_ref or type(self.http_ref.exchange_op) ~= 'function' then return nil, 'http capability ref not configured' @@ -317,19 +372,21 @@ local function response_status_ok(body) return tostring(body or ''):find('"status"%s*:%s*"ok"') ~= nil end -local function login(self) - if self.disable_login or self.logged_in then return true, nil end +local function login(self, opts) + opts = opts or {} + if self.disable_login then return true, nil end + if self.logged_in and not opts.force then return true, nil end if not self.username or not self.password then return false, 'switch username/password not configured' end - self.jar:set('cookie_language', 'defLang_en') + reset_session(self) local info_url = cgi_url(self.base_url, 'get', 'home_login') local info, err = request(self, 'GET', info_url) - if not info or info.status ~= 200 then return false, err or ('home_login HTTP ' .. tostring(info and info.status)) end + if not info or info.status ~= 200 then reset_session(self); return false, err or ('home_login HTTP ' .. tostring(info and info.status)) end local info_json, jerr = json_decode(info.body) - if not info_json then return false, 'home_login invalid JSON: ' .. tostring(jerr) end + if not info_json then reset_session(self); return false, 'home_login invalid JSON: ' .. tostring(jerr) end local modulus = info_json.data and info_json.data.modulus - if not modulus then return false, 'home_login response missing RSA modulus' end + if not modulus then reset_session(self); return false, 'home_login response missing RSA modulus' end local encrypted, enc_err = openssl_encrypt_b64(self, modulus, self.password) - if not encrypted then return false, enc_err end + if not encrypted then reset_session(self); return false, enc_err end local form_data = '_ds=1&' .. form_encode({ username = self.username, password = encrypted }) .. '&_de=1' local auth_url = cgi_url(self.base_url, 'set', 'home_loginAuth', os.time()) local origin = parse_origin(self.base_url) @@ -353,6 +410,7 @@ local function login(self) if status and response_status_ok(status.body) then self.logged_in = true; return true, nil end end end + reset_session(self) return false, 'RTL8380 RSA login was not confirmed' end @@ -360,12 +418,26 @@ local function get_cmd(self, cmd) local url = cgi_url(self.base_url, 'get', cmd) if cmd == 'rmon_statistics' then url = url .. '&time=0' end local r, err = request(self, 'GET', url) - if not r then return nil, err end - if r.status ~= 200 then return nil, ('%s HTTP %s'):format(cmd, tostring(r.status)) end + if not r then return nil, err, 'transport' end + if r.status == 401 or r.status == 403 then return nil, ('%s HTTP %s'):format(cmd, tostring(r.status)), 'auth_invalid' end + if r.status ~= 200 then return nil, ('%s HTTP %s'):format(cmd, tostring(r.status)), 'http' end local parsed, perr = json_decode(r.body) - if not parsed then return nil, cmd .. ' invalid JSON: ' .. tostring(perr) end - if parsed.logout then return nil, cmd .. ' returned logout=' .. tostring(parsed.logout) .. ' reason=' .. tostring(parsed.reason) end - return parsed.data or parsed, nil + if not parsed then + if auth_invalid_body(r.body) then return nil, cmd .. ' returned login page', 'auth_invalid' end + return nil, cmd .. ' invalid JSON: ' .. tostring(perr), 'parse' + end + if parsed.logout then return nil, cmd .. ' returned logout=' .. tostring(parsed.logout) .. ' reason=' .. tostring(parsed.reason), 'auth_invalid' end + return parsed.data or parsed, nil, nil +end + +local function read_commands(self, commands) + local data = {} + for _, cmd in ipairs(commands or {}) do + local d, err, code = get_cmd(self, cmd) + if not d then return nil, err or ('failed to read ' .. cmd), code end + data[cmd] = d + end + return data, nil, nil end local function parse_speed_mbps(v) @@ -565,11 +637,11 @@ local function build_surfaces(data) return surfaces end -local function build_snapshot(self, data) +local function build_identity(data) + data = data or {} local sys = data.sys_sysinfo or {} local home = data.home_main or {} - local poe = data.poe_poe or {} - local identity = { + return { model = home.model or home.title, hostname = sys.hostname, mac = sys.sysMac, @@ -581,20 +653,28 @@ local function build_snapshot(self, data) management_ipv4 = sys.currIpv4, management_ipv6 = sys.currIpv6, } +end + +local function base_status(self) + return { + state = 'available', + available = true, + mode = self.mode, + driver = DRIVER, + base_url = self.base_url, + login = self.logged_in and 'confirmed' or (self.disable_login and 'disabled' or 'attempted'), + } +end + +local function build_snapshot(self, data) + local poe = data.poe_poe or {} return { ok = true, provider_id = self.id, mode = self.mode, writable = false, - status = { - state = 'available', - available = true, - mode = self.mode, - driver = DRIVER, - base_url = self.base_url, - login = self.logged_in and 'confirmed' or (self.disable_login and 'disabled' or 'attempted'), - }, - identity = identity, + status = base_status(self), + identity = build_identity(data), surfaces = build_surfaces(data), topology = { lldp_local = data.lldp_local, @@ -606,6 +686,61 @@ local function build_snapshot(self, data) } end +local function build_group_observation(self, group, data) + group = tostring(group or '') + local out = { + ok = true, + provider_id = self.id, + group = group, + status = base_status(self), + } + + if group == 'identity' then + out.identity = build_identity(data) + elseif group == 'runtime' then + out.runtime = parse_runtime(data.sys_cpumem) + elseif group == 'poe' then + out.power = parse_power(data.poe_poe) + out.surfaces = build_surfaces(data) + elseif group == 'lldp' then + out.topology = { + lldp_local = data.lldp_local, + lldp_neighbor = data.lldp_neighbor, + } + elseif group == 'panel' or group == 'vlan' or group == 'counters' then + out.surfaces = build_surfaces(data) + else + return { ok = false, provider_id = self.id, group = group, status = { state = 'unavailable', available = false, driver = DRIVER, err = 'unknown command group: ' .. group } } + end + + if self.include_raw then out.raw = data end + return out +end + +local function build_groups_observation(self, groups, data) + local out = { + ok = true, + provider_id = self.id, + groups = copy(groups or {}), + status = base_status(self), + } + for _, group in ipairs(groups or {}) do + local partial = build_group_observation(self, group, data) + if not partial or partial.ok ~= true then return partial end + for _, key in ipairs({ 'identity', 'runtime', 'power', 'topology' }) do + if type(partial[key]) == 'table' then out[key] = merge_table(out[key] or {}, partial[key]) end + end + if type(partial.surfaces) == 'table' then + out.surfaces = out.surfaces or {} + for surface_id, surface in pairs(partial.surfaces) do + out.surfaces[surface_id] = merge_table(out.surfaces[surface_id] or {}, surface) + end + end + end + if self.include_raw then out.raw = data end + return out +end + local function require_http_config(config) local http = config and config.http or nil if type(http) ~= 'table' then return nil, 'http table is required' end @@ -624,7 +759,6 @@ local CONFIG_FIELDS = { username = true, password = true, timeout_s = true, - poll_interval_s = true, http = true, openssl_bin = true, disable_login = true, @@ -692,18 +826,94 @@ function Provider:fetch_snapshot() local ok_login, lerr = login(self) if not ok_login then - return { ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, err = lerr or 'login failed' } } + return { ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = lerr or 'login failed' } } end - local data = {} - for _, cmd in ipairs(READ_COMMANDS) do - local d, err = get_cmd(self, cmd) - if not d then - return { ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, err = err or ('failed to read ' .. cmd) } } + local data, err, code = read_commands(self, READ_COMMANDS) + if data then return build_snapshot(self, data) end + + if code == 'auth_invalid' then + reset_session(self) + local ok_relogin, relogin_err = login(self, { force = true }) + if not ok_relogin then + return { ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = relogin_err or 're-login failed' } } end - data[cmd] = d + data, err, code = read_commands(self, READ_COMMANDS) + if data then return build_snapshot(self, data) end end - return build_snapshot(self, data) + + return { ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, login = self.logged_in and 'confirmed' or 'failed', err = err or 'switch snapshot failed' } } +end + +function Provider:fetch_command_group(group_name) + local group = tostring(group_name or '') + local commands = COMMAND_GROUPS[group] + if not commands then + return { + ok = false, + provider_id = self.id, + status = { state = 'unavailable', available = false, driver = DRIVER, err = 'unknown command group: ' .. group }, + } + end + + local ok_login, lerr = login(self) + if not ok_login then + return { ok = false, provider_id = self.id, group = group, commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = lerr or 'login failed' } } + end + + local data, err, code = read_commands(self, commands) + if code == 'auth_invalid' then + reset_session(self) + local ok_relogin, relogin_err = login(self, { force = true }) + if not ok_relogin then + return { ok = false, provider_id = self.id, group = group, commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = relogin_err or 're-login failed' } } + end + data, err, code = read_commands(self, commands) + end + + if not data then + return { ok = false, provider_id = self.id, group = group, commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = self.logged_in and 'confirmed' or 'failed', err = err or ('switch command group failed: ' .. group) } } + end + + return { + ok = true, + provider_id = self.id, + group = group, + commands = commands, + status = { state = 'available', available = true, driver = DRIVER, login = self.logged_in and 'confirmed' or 'disabled' }, + raw = data, + } +end + +function Provider:fetch_command_groups(groups) + groups = groups or {} + local commands, cerr = commands_for_groups(groups) + if not commands then + return { ok = false, provider_id = self.id, groups = copy(groups), status = { state = 'unavailable', available = false, driver = DRIVER, err = cerr } } + end + + local ok_login, lerr = login(self) + if not ok_login then + return { ok = false, provider_id = self.id, groups = copy(groups), commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = lerr or 'login failed' } } + end + + local data, err, code = read_commands(self, commands) + if code == 'auth_invalid' then + reset_session(self) + local ok_relogin, relogin_err = login(self, { force = true }) + if not ok_relogin then + return { ok = false, provider_id = self.id, groups = copy(groups), commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = 'failed', err = relogin_err or 're-login failed' } } + end + data, err, code = read_commands(self, commands) + end + + if not data then + return { ok = false, provider_id = self.id, groups = copy(groups), commands = commands, status = { state = 'unavailable', available = false, driver = DRIVER, login = self.logged_in and 'confirmed' or 'failed', err = err or 'switch command groups failed' } } + end + + local out = build_groups_observation(self, groups, data) + if out and out.ok == true then out.commands = commands end + return out end function Provider:fetch_snapshot_op(_req) @@ -728,10 +938,37 @@ end function Provider:snapshot_op(req) return self:fetch_snapshot_op(req) end function Provider:watch_op(req) return self:fetch_snapshot_op(req) end + +function Provider:observe_groups_op(req) + req = req or {} + local groups = req.groups or {} + if type(groups) ~= 'table' then + return op.always({ ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, driver = DRIVER, err = 'groups must be an array' } }) + end + return op.guard(function () + return fibers.run_scope_op(function () + return self:fetch_command_groups(groups) + end):wrap(function (status, _report, result_or_primary, err) + if status == 'ok' then return result_or_primary, err end + return { + ok = false, + provider_id = self.id, + groups = copy(groups), + status = { + state = 'unavailable', + available = false, + driver = DRIVER, + err = tostring(result_or_primary or status or 'group observation failed'), + }, + }, nil + end) + end) +end + function Provider:apply_attachments_op(_req) return op.always(contract.read_only('apply_attachments')) end function Provider:set_poe_op(_req) return op.always(contract.read_only('set_poe')) end function Provider:bounce_op(_req) return op.always(contract.read_only('bounce')) end -function Provider:terminate(_reason) return true end +function Provider:terminate(_reason) reset_session(self); return true end M._test = { VLAN_MODE = VLAN_MODE, @@ -743,6 +980,11 @@ M._test = { parse_runtime = parse_runtime, parse_power = parse_power, parse_surface_counters = parse_surface_counters, + build_group_observation = function(provider_like, group, data) return build_group_observation(provider_like or { id = 'switch-main', mode = 'read_only' }, group, data or {}) end, + build_groups_observation = function(provider_like, groups, data) return build_groups_observation(provider_like or { id = 'switch-main', mode = 'read_only' }, groups or {}, data or {}) end, + commands_for_groups = commands_for_groups, + auth_invalid_body = auth_invalid_body, + COMMAND_GROUPS = COMMAND_GROUPS, } return M diff --git a/src/services/hal/backends/wired/providers/static.lua b/src/services/hal/backends/wired/providers/static.lua index 4fe3d36f..975e1682 100644 --- a/src/services/hal/backends/wired/providers/static.lua +++ b/src/services/hal/backends/wired/providers/static.lua @@ -17,7 +17,6 @@ local CONFIG_FIELDS = { surfaces = true, topology = true, meta = true, - poll_interval_s = true, } local function check_allowed_config(config) @@ -57,6 +56,19 @@ function Provider:snapshot_op(_req) end function Provider:watch_op(req) return self:snapshot_op(req) end + +function Provider:observe_groups_op(req) + req = req or {} + local groups = req.groups or {} + if type(groups) ~= 'table' then return op.always({ ok = false, provider_id = self.id, status = { state = 'unavailable', available = false, err = 'groups must be an array' } }) end + for i = 1, #groups do + if groups[i] ~= 'snapshot' then + return op.always({ ok = false, provider_id = self.id, group = groups[i], status = { state = 'unavailable', available = false, err = 'unsupported static poll group: ' .. tostring(groups[i]) } }) + end + end + return self:snapshot_op(req) +end + function Provider:apply_attachments_op(_req) return op.always(contract.read_only('apply_attachments')) end function Provider:set_poe_op(_req) return op.always(contract.read_only('set_poe')) end function Provider:bounce_op(_req) return op.always(contract.read_only('bounce')) end diff --git a/src/services/hal/drivers/wired.lua b/src/services/hal/drivers/wired.lua index 41c5e4df..4e2b3faf 100644 --- a/src/services/hal/drivers/wired.lua +++ b/src/services/hal/drivers/wired.lua @@ -7,20 +7,49 @@ local M = {} local Driver = {} Driver.__index = Driver +local REQUIRED_BACKEND_OPS = { + 'snapshot_op', + 'watch_op', + 'observe_groups_op', + 'apply_attachments_op', + 'set_poe_op', + 'bounce_op', +} + +local function validate_backend(backend, provider_name) + for _, opname in ipairs(REQUIRED_BACKEND_OPS) do + if type(backend[opname]) ~= 'function' then + return nil, ('wired provider %s missing required %s'):format(tostring(provider_name), opname) + end + end + return true, nil +end + function M.new(config, opts) - local provider, err = provider_loader.new(config or {}, opts or {}) - if not provider then return nil, err end - return setmetatable({ provider = provider }, Driver), nil + opts = opts or {} + local backend, err, provider_name = provider_loader.new(config or {}, opts) + if not backend then return nil, err end + local ok, verr = validate_backend(backend, provider_name) + if not ok then + if type(backend.terminate) == 'function' then backend:terminate('invalid backend') end + return nil, verr + end + return setmetatable({ + backend = backend, + provider_name = provider_name, + provider_id = opts.provider_id, + }, Driver), nil end -function Driver:snapshot_op(req) return self.provider:snapshot_op(req) end -function Driver:watch_op(req) return self.provider:watch_op(req) end -function Driver:apply_attachments_op(req) return self.provider:apply_attachments_op(req) end -function Driver:set_poe_op(req) return self.provider:set_poe_op(req) end -function Driver:bounce_op(req) return self.provider:bounce_op(req) end +function Driver:snapshot_op(req) return self.backend:snapshot_op(req) end +function Driver:watch_op(req) return self.backend:watch_op(req) end +function Driver:observe_groups_op(req) return self.backend:observe_groups_op(req) end +function Driver:apply_attachments_op(req) return self.backend:apply_attachments_op(req) end +function Driver:set_poe_op(req) return self.backend:set_poe_op(req) end +function Driver:bounce_op(req) return self.backend:bounce_op(req) end function Driver:terminate(reason) - if self.provider and type(self.provider.terminate) == 'function' then return self.provider:terminate(reason) end + if self.backend and type(self.backend.terminate) == 'function' then return self.backend:terminate(reason) end return true, nil end diff --git a/src/services/hal/managers/wired.lua b/src/services/hal/managers/wired.lua index b6074430..7dd6d12c 100644 --- a/src/services/hal/managers/wired.lua +++ b/src/services/hal/managers/wired.lua @@ -6,15 +6,16 @@ -- Wired service combines them with Device assembly into public state/wired/... surfaces. local fibers = require 'fibers' +local safe = require 'coxpcall' local op = require 'fibers.op' local channel = require 'fibers.channel' -local sleep = require 'fibers.sleep' -local runtime = require 'fibers.runtime' +local cond = require 'fibers.cond' local strict = require 'services.hal.support.strict_manager' local hal_types = require 'services.hal.types.core' local cap_types = require 'services.hal.types.capabilities' -local driver_mod = require 'services.hal.drivers.wired' +local backend_mod = require 'services.hal.drivers.wired' +local provider_runner = require 'services.hal.managers.wired.provider_runner' local M = strict.api_table() @@ -25,10 +26,9 @@ local state = { dev_ev_ch = nil, cap_emit_ch = nil, http_client_for = nil, - drivers = {}, + runners = {}, -- provider_id -> provider runner handle controls = {}, provider_ids = {}, - pollers = {}, device_registered = false, } @@ -58,103 +58,77 @@ local function shallow_copy(t) return out end -local function max(a, b) if a > b then return a end return b end - local function list_signature(list) return table.concat(list or {}, '\0') end +local function normalise_groups(groups, path) + if type(groups) ~= 'table' then return nil, path .. '.groups must be a non-empty array' end + local out = {} + for i = 1, #groups do + local group = groups[i] + if type(group) ~= 'string' or group == '' then return nil, path .. '.groups[' .. tostring(i) .. '] must be a non-empty string' end + out[#out + 1] = group + end + if #out == 0 then return nil, path .. '.groups must be a non-empty array' end + return out, nil +end + +local function provider_poll_plan(config) + config = config or {} + if config.poll_interval_s ~= nil then return nil, 'use poll, not poll_interval_s, for grouped wired polling' end + if config.poll == nil then return nil, 'poll is required' end + if type(config.poll) ~= 'table' then return nil, 'poll must be a table' end + local out = {} + for _, name in ipairs(sorted_keys(config.poll)) do + local rec = config.poll[name] + local path = 'poll.' .. tostring(name) + if type(rec) ~= 'table' then return nil, path .. ' must be a table' end + local interval_s = tonumber(rec.interval_s) + if interval_s == nil or interval_s <= 0 then return nil, path .. '.interval_s must be a positive number' end + local groups, gerr = normalise_groups(rec.groups, path) + if not groups then return nil, gerr end + out[#out + 1] = { name = tostring(name), interval_s = interval_s, groups = groups } + end + if #out == 0 then return nil, 'poll must contain at least one poll group' end + return out, nil +end + local function emit_state(class, id, key, payload) local ev = assert(hal_types.new.Emit(class, id, 'state', key, payload)) return state.cap_emit_ch:put_op(ev):wrap(function () return true, nil end) end -local function emit_status_now(provider_id, status) - local ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'status', status or { state = 'available', available = true })) - if ok == false or ok == nil then return nil, err end - return true, nil -end - -local function emit_snapshot_now(provider_id, snapshot) - local ok, err = emit_status_now(provider_id, snapshot.status or { state = 'available', available = snapshot.ok == true }) - if ok ~= true then return nil, err end - ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'identity', snapshot.identity or {})) - if ok == false or ok == nil then return nil, err end - ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'runtime', snapshot.runtime or {})) - if ok == false or ok == nil then return nil, err end - ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'power', snapshot.power or {})) - if ok == false or ok == nil then return nil, err end - ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'surfaces', { surfaces = snapshot.surfaces or {} })) - if ok == false or ok == nil then return nil, err end - ok, err = fibers.perform(emit_state('wired-provider', provider_id, 'topology', snapshot.topology or {})) +local function emit_provider_state(provider_id, key, payload) + local ok, err = fibers.perform(emit_state('wired-provider', provider_id, key, payload or {})) if ok == false or ok == nil then return nil, err end return true, nil end -local function provider_poll_interval_s(config) - local n = tonumber(config and config.poll_interval_s) - if n == nil then return 1.0 end - if n <= 0 then return nil, 'poll_interval_s must be a positive number' end - return n, nil -end - - -local function driver_result(provider_id, method, opts) - local driver = state.drivers[provider_id] - if not driver then return { ok = false, err = 'wired provider not configured', code = 'not_configured' } end +local function runner_result(provider_id, method, opts) + local runner = state.runners[provider_id] + if not runner then return { ok = false, err = 'wired provider not configured', code = 'not_configured' } end local opname = tostring(method) .. '_op' - local fn = driver[opname] - if type(fn) ~= 'function' then return { ok = false, err = 'wired driver missing ' .. opname } end - local ok, driver_op = pcall(function () return fn(driver, opts or {}) end) - if not ok then return { ok = false, err = tostring(driver_op) } end - if type(driver_op) ~= 'table' then return { ok = false, err = opname .. ' did not return an Op' } end - local ok2, result = pcall(function () return fibers.perform(driver_op) end) + local fn = runner[opname] + if type(fn) ~= 'function' then return { ok = false, err = 'wired runner missing ' .. opname } end + local ok, runner_op = safe.pcall(function () return fn(runner, opts or {}) end) + if not ok then return { ok = false, err = tostring(runner_op) } end + if type(runner_op) ~= 'table' then return { ok = false, err = opname .. ' did not return an Op' } end + local ok2, result = safe.pcall(function () return fibers.perform(runner_op) end) if not ok2 then return { ok = false, err = tostring(result) } end if type(result) == 'table' then return result end return { ok = result == true, result = result } end -local function poll_loop(provider_id, driver, interval_s) - fibers.perform(sleep.sleep_op(interval_s)) - while state.drivers[provider_id] == driver do - local started = runtime.now() - local result = driver_result(provider_id, 'snapshot', {}) - if state.drivers[provider_id] ~= driver then return end - if result and result.ok == true then - local ok, err = emit_snapshot_now(provider_id, result) - if ok ~= true then log('error', { what = 'wired_provider_poll_emit_failed', provider = provider_id, err = err }) end - else - local status = { - state = 'unavailable', - available = false, - err = result and result.err or 'switch snapshot failed', - polling = true, - } - local ok, err = emit_status_now(provider_id, status) - if ok ~= true then log('error', { what = 'wired_provider_poll_status_emit_failed', provider = provider_id, err = err }) end - end - local elapsed = runtime.now() - started - fibers.perform(sleep.sleep_op(max(0, interval_s - elapsed))) - end -end - -local function spawn_poll_loop(provider_id, driver, interval_s) - state.pollers[provider_id] = true - state.scope:spawn(function () poll_loop(provider_id, driver, interval_s) end) -end - local function handle_request(provider_id, req) local verb = req and req.verb local opts = req and req.opts or {} local result if verb == 'snapshot' or verb == 'watch' or verb == 'apply_attachments' or verb == 'set_poe' or verb == 'bounce' then - result = driver_result(provider_id, verb, opts) + result = runner_result(provider_id, verb, opts) else result = { ok = false, err = 'unsupported wired-provider verb: ' .. tostring(verb) } end - if result and result.ok == true and (verb == 'snapshot' or verb == 'watch') then - emit_snapshot_now(provider_id, result) - end reply(req, result and result.ok == true, result) end @@ -178,12 +152,12 @@ local function make_caps(provider_ids) return caps end -local function device_event_op(event_type, caps) +local function device_event_op(event_type, caps, ready_cond) local ev = assert(hal_types.new.DeviceEvent(event_type, 'wired', 'main', { source = 'host', source_id = 'wired', manager = 'wired', - }, caps or {})) + }, caps or {}, ready_cond)) return state.dev_ev_ch:put_op(ev):wrap(function () return true, nil end) end @@ -192,11 +166,11 @@ local function close_control_channels() state.controls = {} end -local function stop_drivers(reason) - for _, driver in pairs(state.drivers or {}) do - if driver and type(driver.terminate) == 'function' then driver:terminate(reason or 'reconfigured') end +local function stop_runners(reason) + for _, runner in pairs(state.runners or {}) do + if runner and type(runner.terminate) == 'function' then runner:terminate(reason or 'reconfigured') end end - state.drivers = {} + state.runners = {} end local function spawn_control_loops(provider_ids) @@ -232,10 +206,10 @@ local function configured_provider(config, provider_id) return shallow_copy(rec) end -local function reconcile_device_caps(provider_ids) +local function reconcile_device_caps(provider_ids, ready_cond) local new_sig = list_signature(provider_ids) local old_sig = list_signature(state.provider_ids) - if new_sig == old_sig then return true, nil end + if new_sig == old_sig then return true, nil, ready_cond, true end if state.device_registered then local ok, err = fibers.perform(device_event_op('removed', {})) @@ -246,11 +220,11 @@ local function reconcile_device_caps(provider_ids) close_control_channels() state.provider_ids = {} - if #provider_ids == 0 then return true, nil end + if #provider_ids == 0 then return true, nil, ready_cond, true end local caps = make_caps(provider_ids) spawn_control_loops(provider_ids) - local ok, err = fibers.perform(device_event_op('added', caps)) + local ok, err = fibers.perform(device_event_op('added', caps, ready_cond)) if ok == false or ok == nil then close_control_channels() return nil, err or 'wired device add event failed' @@ -258,7 +232,7 @@ local function reconcile_device_caps(provider_ids) state.provider_ids = provider_ids state.device_registered = true - return true, nil + return true, nil, ready_cond, false end function M.start_op(logger, dev_ev_ch, cap_emit_ch, opts) @@ -274,9 +248,8 @@ function M.start_op(logger, dev_ev_ch, cap_emit_ch, opts) state.cap_emit_ch = cap_emit_ch state.http_client_for = opts and opts.http_client_for or nil state.controls = {} - state.drivers = {} + state.runners = {} state.provider_ids = {} - state.pollers = {} state.device_registered = false child:finally(function (_, status, primary) M.terminate(primary or status or 'wired manager closed') end) @@ -286,6 +259,52 @@ function M.start_op(logger, dev_ev_ch, cap_emit_ch, opts) end) end +local function terminate_prepared(prepared, reason) + for _, rec in pairs(prepared or {}) do + local backend_handle = rec and rec.backend_handle + if backend_handle and not rec.owned_by_runner and type(backend_handle.terminate) == 'function' then + backend_handle:terminate(reason or 'discarded') + end + end +end + +local function terminate_runners(runners, reason) + for _, runner in pairs(runners or {}) do + if runner and type(runner.terminate) == 'function' then runner:terminate(reason or 'discarded') end + end +end + +local function prepare_providers(config, provider_ids) + local prepared = {} + for i = 1, #provider_ids do + local id = provider_ids[i] + local pcfg = configured_provider(config or {}, id) + if not pcfg then + terminate_prepared(prepared, 'prepare failed') + return nil, ('wired provider %s missing configuration'):format(id) + end + + local driver_config = {} + for k, v in pairs(pcfg) do driver_config[k] = v end + local poll_plan, poll_err = provider_poll_plan(driver_config) + if not poll_plan then + terminate_prepared(prepared, 'prepare failed') + return nil, ('wired provider %s poll config failed: %s'):format(id, tostring(poll_err)) + end + driver_config.poll = nil + + local driver_opts = { logger = state.logger, cap_emit_ch = state.cap_emit_ch, provider_id = id } + if driver_config.provider == 'rtl8380m_http' then driver_opts.http_client_for = state.http_client_for end + local backend_handle, err = backend_mod.new(driver_config, driver_opts) + if not backend_handle then + terminate_prepared(prepared, 'prepare failed') + return nil, ('wired provider %s create failed: %s'):format(id, tostring(err)) + end + prepared[id] = { backend_handle = backend_handle, provider_name = backend_handle.provider_name, poll_plan = poll_plan } + end + return prepared, nil +end + function M.apply_config_op(config) return op.guard(function () if not state.started then return op.always(false, 'wired manager not started') end @@ -293,37 +312,56 @@ function M.apply_config_op(config) local provider_ids, perr = normalise_provider_ids(config or {}) if not provider_ids then return false, perr end - stop_drivers('reconfigured') - local ok, cerr = reconcile_device_caps(provider_ids) - if ok ~= true then return false, cerr end + local prepared, prep_err = prepare_providers(config or {}, provider_ids) + if not prepared then return false, prep_err end + + local caps_ready_cond = cond.new() + local runners = {} + for i = 1, #provider_ids do + local id = provider_ids[i] + local rec = prepared[id] + local runner, rerr = provider_runner.new({ + provider_id = id, + provider_name = rec.provider_name, + backend = rec.backend_handle, + poll_plan = rec.poll_plan, + ready_cond = caps_ready_cond, + parent_scope = state.scope, + emit_state = emit_provider_state, + log = log, + }) + if not runner then + terminate_runners(runners, 'runner create failed') + terminate_prepared(prepared, 'runner create failed') + return false, ('wired provider %s runner failed: %s'):format(id, tostring(rerr)) + end + rec.owned_by_runner = true + runners[id] = runner + end for i = 1, #provider_ids do local id = provider_ids[i] - local pcfg = configured_provider(config or {}, id) - if not pcfg then - local eok, eerr = emit_snapshot_now(id, { status = { state = 'not_configured', available = false }, surfaces = {}, topology = {} }) - if eok ~= true then return false, eerr or 'wired provider status emit failed' end - else - local driver_config = {} - for k, v in pairs(pcfg) do driver_config[k] = v end - local driver_opts = { logger = state.logger, cap_emit_ch = state.cap_emit_ch, provider_id = id } - if driver_config.provider == 'rtl8380m_http' then driver_opts.http_client_for = state.http_client_for end - local poll_interval_s, poll_err = provider_poll_interval_s(driver_config) - if not poll_interval_s then return false, poll_err end - local driver, err = driver_mod.new(driver_config, driver_opts) - if not driver then return false, ('wired provider %s create failed: %s'):format(id, tostring(err)) end - state.drivers[id] = driver - local result = driver_result(id, 'snapshot', {}) - if result.ok == true then - local eok, eerr = emit_snapshot_now(id, result) - if eok ~= true then return false, eerr or 'wired provider emit failed' end - else - local eok, eerr = emit_status_now(id, { state = 'unavailable', available = false, err = result.err }) - if eok ~= true then return false, eerr or 'wired provider status emit failed' end - end - spawn_poll_loop(id, driver, poll_interval_s) + local spawned, spawn_err = runners[id]:start() + if spawned ~= true then + terminate_runners(runners, 'runner spawn failed') + terminate_prepared(prepared, 'runner spawn failed') + return false, ('wired provider %s runner failed: %s'):format(id, tostring(spawn_err)) end end + + stop_runners('reconfigured') + local ok, cerr, _, ready_now = reconcile_device_caps(provider_ids, caps_ready_cond) + if ok ~= true then + terminate_runners(runners, 'capability reconcile failed') + return false, cerr + end + + state.runners = {} + for i = 1, #provider_ids do + local id = provider_ids[i] + state.runners[id] = runners[id] + end + if ready_now and caps_ready_cond then caps_ready_cond:signal() end log('info', { what = 'wired_manager_configured', providers = provider_ids }) return true, nil end):wrap(function (status, report, ok_or_primary, err) @@ -341,10 +379,9 @@ function M.shutdown_op(_timeout_s) end function M.terminate(reason) - stop_drivers(reason or 'terminated') + stop_runners(reason or 'terminated') close_control_channels() state.provider_ids = {} - state.pollers = {} state.device_registered = false if state.scope then local scope = state.scope; state.scope = nil; scope:cancel(reason or 'terminated') end state.started = false @@ -361,7 +398,8 @@ end M._test = { normalise_provider_ids = normalise_provider_ids, - provider_poll_interval_s = provider_poll_interval_s, + provider_poll_plan = provider_poll_plan, + groups_for_plans = provider_runner._test.groups_for_plans, } return M diff --git a/src/services/hal/managers/wired/provider_runner.lua b/src/services/hal/managers/wired/provider_runner.lua new file mode 100644 index 00000000..87015ada --- /dev/null +++ b/src/services/hal/managers/wired/provider_runner.lua @@ -0,0 +1,359 @@ +-- services/hal/managers/wired/provider_runner.lua +-- +-- Owned runner for one HAL wired-provider backend. The runner is the sole +-- owner of the backend/session object: polling, snapshots and future controls +-- all pass through this mailbox, giving CML-style serialisation without locks. + +local fibers = require 'fibers' +local safe = require 'coxpcall' +local op = require 'fibers.op' +local channel = require 'fibers.channel' +local sleep = require 'fibers.sleep' +local runtime = require 'fibers.runtime' +local tablex = require 'shared.table' + +local M = {} +local Runner = {} +Runner.__index = Runner + +local function max(a, b) if a > b then return a end return b end + +local function copy(v) return tablex.deep_copy(v) end + +local function stable_signature(v) + local tv = type(v) + if tv == 'nil' or tv == 'boolean' or tv == 'number' or tv == 'string' then + return tv .. ':' .. tostring(v) + end + if tv ~= 'table' then return tv .. ':' .. tostring(v) end + local keys = {} + for k in pairs(v) do keys[#keys + 1] = k end + table.sort(keys, function(a, b) return tostring(a) < tostring(b) end) + local out = { 'table{' } + for i = 1, #keys do + local k = keys[i] + out[#out + 1] = stable_signature(k) + out[#out + 1] = '=' + out[#out + 1] = stable_signature(v[k]) + out[#out + 1] = ';' + end + out[#out + 1] = '}' + return table.concat(out) +end + +local function merge_table(dst, src) + dst = dst or {} + if type(src) ~= 'table' then return dst end + for k, v in pairs(src) do + if v ~= nil then + if type(v) == 'table' and type(dst[k]) == 'table' then + merge_table(dst[k], v) + else + dst[k] = copy(v) + end + end + end + return dst +end + +local function merge_observation(cache, snapshot) + cache = cache or { + status = {}, + identity = {}, + runtime = {}, + power = {}, + surfaces = {}, + topology = {}, + } + snapshot = snapshot or {} + if type(snapshot.status) == 'table' then merge_table(cache.status, snapshot.status) end + for _, key in ipairs({ 'identity', 'runtime', 'power', 'topology' }) do + if type(snapshot[key]) == 'table' then merge_table(cache[key], snapshot[key]) end + end + if type(snapshot.surfaces) == 'table' then + for surface_id, surface in pairs(snapshot.surfaces) do + local id = tostring(surface_id or '') + if id ~= '' and type(surface) == 'table' then + cache.surfaces[id] = merge_table(cache.surfaces[id] or {}, surface) + end + end + end + return cache +end + +local function emit_state_changed(self, key, payload) + local sig = stable_signature(payload or {}) + if self.emitted[key] == sig then return true, nil, false end + self.emitted[key] = sig + local ok, err = self.emit_state(self.provider_id, key, payload or {}) + if ok == false or ok == nil then + self.emitted[key] = nil + return nil, err, false + end + return true, nil, true +end + +local function emit_snapshot(self, snapshot, present) + snapshot = snapshot or {} + present = present or snapshot + local ok, err + if present.status ~= nil then + ok, err = emit_state_changed(self, 'status', snapshot.status or { state = 'available', available = snapshot.ok == true }) + if ok ~= true then return nil, err end + end + if present.identity ~= nil then + ok, err = emit_state_changed(self, 'identity', snapshot.identity or {}) + if ok == false or ok == nil then return nil, err end + end + if present.runtime ~= nil then + ok, err = emit_state_changed(self, 'runtime', snapshot.runtime or {}) + if ok == false or ok == nil then return nil, err end + end + if present.power ~= nil then + ok, err = emit_state_changed(self, 'power', snapshot.power or {}) + if ok == false or ok == nil then return nil, err end + end + if present.surfaces ~= nil then + ok, err = emit_state_changed(self, 'surfaces', { surfaces = snapshot.surfaces or {} }) + if ok == false or ok == nil then return nil, err end + end + if present.topology ~= nil then + ok, err = emit_state_changed(self, 'topology', snapshot.topology or {}) + if ok == false or ok == nil then return nil, err end + end + return true, nil +end + +local function publish_observation(self, snapshot) + self.observation = merge_observation(self.observation, snapshot) + return emit_snapshot(self, self.observation, snapshot or {}) +end + +local function emit_status(self, status) + local ok, err = emit_state_changed(self, 'status', status or { state = 'available', available = true }) + if ok == false or ok == nil then return nil, err end + return true, nil +end + +local function copy_list(list) + local out = {} + for i = 1, #(list or {}) do out[i] = list[i] end + return out +end + +local function append_unique(out, seen, value) + value = tostring(value or '') + if value ~= '' and not seen[value] then + seen[value] = true + out[#out + 1] = value + end +end + +local function perform_backend_method(backend, method, opts) + local opname = tostring(method) .. '_op' + local fn = backend and backend[opname] + if type(fn) ~= 'function' then return { ok = false, err = 'wired backend missing ' .. opname } end + local ok, backend_op = safe.pcall(function () return fn(backend, opts or {}) end) + if not ok then return { ok = false, err = tostring(backend_op) } end + if type(backend_op) ~= 'table' then return { ok = false, err = opname .. ' did not return an Op' } end + local ok2, result = safe.pcall(function () return fibers.perform(backend_op) end) + if not ok2 then return { ok = false, err = tostring(result) } end + if type(result) == 'table' then return result end + return { ok = result == true, result = result } +end + +local function failure_status(provider_id, groups, result) + local gs = groups or {} + local err = result and result.err or (result and result.status and result.status.err) or 'wired provider observation failed' + local unavailable = false + for _, group in ipairs(gs) do + if group == 'panel' or group == 'snapshot' then unavailable = true end + end + if #gs == 0 then unavailable = true end + return { + state = unavailable and 'unavailable' or 'degraded', + available = not unavailable, + err = err, + groups = copy_list(gs), + provider_id = provider_id, + polling = true, + } +end + +local function groups_for_plans(plans) + local groups, seen = {}, {} + for _, plan in ipairs(plans or {}) do + for _, group in ipairs(plan.groups or {}) do append_unique(groups, seen, group) end + end + return groups +end + +local function initialise_due_times(self) + local now = runtime.now() + for i, plan in ipairs(self.poll_plan or {}) do + self.next_due[plan.name] = now + math.min(1.0, (i - 1) * 0.25) + end +end + +local function next_due_time(self) + local due = nil + for _, plan in ipairs(self.poll_plan or {}) do + local t = self.next_due[plan.name] + if t ~= nil and (due == nil or t < due) then due = t end + end + if due == nil then due = runtime.now() + 3600 end + return max(due, self.idle_until or 0) +end + +local function due_plans(self, now) + local plans = {} + for _, plan in ipairs(self.poll_plan or {}) do + local t = self.next_due[plan.name] + if t ~= nil and t <= now then plans[#plans + 1] = plan end + end + return plans +end + +local function mark_plans_attempted(self, plans) + local now = runtime.now() + for _, plan in ipairs(plans or {}) do self.next_due[plan.name] = now + plan.interval_s end + self.idle_until = now + self.min_idle_s +end + +local function runner_request_op(self, verb, opts) + return op.guard(function () + if self.closed then return op.always({ ok = false, err = 'wired provider runner closed', code = 'closed' }) end + local reply_ch = channel.new(1) + local msg = { kind = 'request', verb = verb, opts = opts or {}, reply_ch = reply_ch } + return fibers.run_scope_op(function () + fibers.perform(self.request_ch:put_op(msg)) + return fibers.perform(reply_ch:get_op()) + end):wrap(function (status, _report, result_or_primary, err) + if status == 'ok' then return result_or_primary, err end + return { ok = false, err = tostring(result_or_primary or status or 'wired provider request failed') }, nil + end) + end) +end + +function Runner:snapshot_op(req) return runner_request_op(self, 'snapshot', req) end +function Runner:watch_op(req) return runner_request_op(self, 'snapshot', req) end +function Runner:observe_groups_op(req) return runner_request_op(self, 'observe_groups', req) end +function Runner:apply_attachments_op(req) return runner_request_op(self, 'apply_attachments', req) end +function Runner:set_poe_op(req) return runner_request_op(self, 'set_poe', req) end +function Runner:bounce_op(req) return runner_request_op(self, 'bounce', req) end + +function Runner:terminate(reason) + self.closed = true + if self.scope then local scope = self.scope; self.scope = nil; scope:cancel(reason or 'wired provider runner terminated') end + if self.backend and type(self.backend.terminate) == 'function' then self.backend:terminate(reason or 'wired provider runner terminated') end + return true, nil +end + +function Runner:emit_observing() + if self.observing_emitted then return true, nil end + local ok, err = emit_status(self, { + state = 'observing', + available = false, + driver = self.provider_name, + polling = true, + }) + if ok == true then self.observing_emitted = true end + return ok, err +end + +function Runner:poll_due() + local now = runtime.now() + local plans = due_plans(self, now) + if #plans == 0 then return true, nil end + local groups = groups_for_plans(plans) + local result = perform_backend_method(self.backend, 'observe_groups', { groups = groups }) + mark_plans_attempted(self, plans) + if result and result.ok == true then return publish_observation(self, result) end + return emit_status(self, failure_status(self.provider_id, groups, result)) +end + +function Runner:handle_request(msg) + if type(msg) ~= 'table' then return end + local verb = msg.verb + local opts = msg.opts or {} + local result + if verb == 'snapshot' or verb == 'watch' then + result = perform_backend_method(self.backend, 'snapshot', opts) + if result and result.ok == true then publish_observation(self, result) end + elseif verb == 'observe_groups' then + result = perform_backend_method(self.backend, 'observe_groups', opts) + if result and result.ok == true then publish_observation(self, result) end + elseif verb == 'apply_attachments' or verb == 'set_poe' or verb == 'bounce' then + result = perform_backend_method(self.backend, verb, opts) + else + result = { ok = false, err = 'unsupported wired-provider verb: ' .. tostring(verb) } + end + if msg.reply_ch then fibers.perform(msg.reply_ch:put_op(result)) end +end + +function Runner:run() + if self.ready_cond ~= nil then fibers.perform(self.ready_cond:wait_op()) end + if self.closed then return end + local ok, err = self:emit_observing() + if ok ~= true then self.log('error', { what = 'wired_provider_initial_status_emit_failed', provider = self.provider_id, err = err }) end + initialise_due_times(self) + while not self.closed do + local which, msg = fibers.perform(op.named_choice({ + request = self.request_ch:get_op(), + due = sleep.sleep_until_op(next_due_time(self)), + })) + if which == 'request' then + self:handle_request(msg) + elseif which == 'due' then + local pok, perr = self:poll_due() + if pok ~= true then self.log('error', { what = 'wired_provider_poll_emit_failed', provider = self.provider_id, err = perr }) end + end + end +end + +function Runner:start() + local ok, err = self.scope:spawn(function () self:run() end) + if not ok then + self:terminate(tostring(err or 'wired provider runner spawn failed')) + return nil, err or 'wired provider runner spawn failed' + end + return true, nil +end + +function M.new(opts) + opts = opts or {} + if type(opts.provider_id) ~= 'string' or opts.provider_id == '' then return nil, 'provider_id is required' end + if type(opts.provider_name) ~= 'string' or opts.provider_name == '' then return nil, 'provider_name is required' end + if type(opts.backend) ~= 'table' then return nil, 'backend is required' end + if type(opts.poll_plan) ~= 'table' then return nil, 'poll_plan is required' end + if type(opts.parent_scope) ~= 'table' then return nil, 'parent_scope is required' end + if type(opts.emit_state) ~= 'function' then return nil, 'emit_state callback is required' end + + local child, err = opts.parent_scope:child() + if not child then return nil, err or 'wired provider runner scope create failed' end + local self = setmetatable({ + provider_id = opts.provider_id, + provider_name = opts.provider_name, + backend = opts.backend, + poll_plan = opts.poll_plan, + ready_cond = opts.ready_cond, + emit_state = opts.emit_state, + observation = nil, + emitted = {}, + log = opts.log or function () end, + request_ch = channel.new(opts.request_queue_size or 32), + scope = child, + closed = false, + next_due = {}, + idle_until = 0, + min_idle_s = tonumber(opts.min_idle_s) or 0.05, + observing_emitted = false, + }, Runner) + return self, nil +end + +M._test = { + groups_for_plans = groups_for_plans, +} + +return M diff --git a/src/services/wired/service.lua b/src/services/wired/service.lua index 5f1dd01f..0360bc15 100644 --- a/src/services/wired/service.lua +++ b/src/services/wired/service.lua @@ -399,7 +399,6 @@ local function apply_config(state, ev) snap.generation = (ev and ev.generation) or (snap.generation + 1) snap.config = { rev = intent.rev, schema = intent.schema, config_schema = intent.config_schema, version = intent.version } snap.config_intent = intent - snap.dependencies = {} snap.stats.config_updates = (snap.stats.config_updates or 0) + 1 return rebuild_derived(snap) end) diff --git a/tests/integration/devhost/rtl8380m_switch_spec.lua b/tests/integration/devhost/rtl8380m_switch_spec.lua index 009ce329..d20e5766 100644 --- a/tests/integration/devhost/rtl8380m_switch_spec.lua +++ b/tests/integration/devhost/rtl8380m_switch_spec.lua @@ -16,11 +16,13 @@ local busmod = require 'bus' local fibers = require 'fibers' +local channel = require 'fibers.channel' local runfibers = require 'tests.support.run_fibers' local probe = require 'tests.support.bus_probe' local http_service = require 'services.http.service' +local wired_manager = require 'services.hal.managers.wired' local wired_service = require 'services.wired.service' local wired_config = require 'services.wired.config' local wired_topics = require 'services.wired.topics' @@ -170,6 +172,22 @@ local function require_successful_snapshot(provider) return snap end +local function require_successful_observe_groups(provider, groups) + local result = fibers.perform(provider:observe_groups_op({ groups = groups })) + assert_not_nil(result, 'observe_groups should return a table') + if result.ok ~= true then + local status = result.status or {} + error('switch observe_groups failed: ' .. tostring(status.err or result.err or 'unknown error'), 2) + end + return result +end + +local function assert_command_once(commands, command) + local n = 0 + for _, cmd in ipairs(commands or {}) do if cmd == command then n = n + 1 end end + assert_eq(n, 1, 'expected command ' .. tostring(command) .. ' exactly once') +end + local function retain_switch_raw(conn, snap) conn:retain({ 'raw', 'host', 'wired', 'provider', 'switch-main', 'status' }, snap.status or {}) conn:retain({ 'raw', 'host', 'wired', 'provider', 'switch-main', 'state', 'identity' }, snap.identity or {}) @@ -227,6 +245,165 @@ local function retain_wired_config(conn) }) end +local function raw_provider_topic(id, suffix) + local topic = { 'raw', 'host', 'wired', 'provider', id } + for i = 1, #(suffix or {}) do topic[#topic + 1] = suffix[i] end + return topic +end + +local function start_wired_manager_hal_harness(scope, bus, dev_ev_ch, cap_emit_ch) + local child = assert(scope:child()) + local writer = bus:connect({ origin_base = { kind = 'local', component = 'wired-manager-hal-harness' } }) + + assert(child:spawn(function () + while true do + local ev = fibers.perform(dev_ev_ch:get_op()) + if ev == nil then return end + if ev.class == 'wired' and ev.id == 'main' then + if ev.event_type == 'added' then + for _, cap in ipairs(ev.capabilities or {}) do + if cap.class == 'wired-provider' then + writer:retain(raw_provider_topic(cap.id, { 'status' }), { + state = 'available', + available = true, + source_kind = 'host', + source = 'wired', + }) + writer:retain(raw_provider_topic(cap.id, { 'meta' }), { + offerings = cap.offerings or {}, + source_kind = 'host', + source = 'wired', + }) + end + end + if ev.ready_cond then ev.ready_cond:signal() end + elseif ev.event_type == 'removed' then + for _, cap in ipairs(ev.capabilities or {}) do + if cap.class == 'wired-provider' then writer:unretain(raw_provider_topic(cap.id, { 'status' })) end + end + if ev.ready_cond then ev.ready_cond:signal() end + end + end + end + end)) + + assert(child:spawn(function () + while true do + local emit = fibers.perform(cap_emit_ch:get_op()) + if emit == nil then return end + if emit.class == 'wired-provider' and emit.mode == 'state' then + if emit.key == 'status' then + writer:retain(raw_provider_topic(emit.id, { 'status' }), emit.data or {}) + else + writer:retain(raw_provider_topic(emit.id, { 'state', emit.key }), emit.data or {}) + end + end + end + end)) + + return child +end + +local function switch_manager_config(env) + return { + providers = { + ['switch-main'] = { + provider = 'rtl8380m_http', + base_url = env.base_url, + username = env.username, + password = env.password, + timeout_s = env.timeout_s, + openssl_bin = env.openssl_bin, + include_raw = true, + http = { capability = 'main', response_parser = 'legacy-http1-close' }, + poll = { + fast = { interval_s = 1.0, groups = { 'panel', 'poe', 'counters' } }, + }, + }, + }, + } +end + +function T.rtl8380m_real_switch_observe_groups_via_http_capability() + local env, err = required_env() + if not env then return skip(err) end + + runfibers.run(function () + local b = busmod.new() + local http = start_http_capability(b, env) + wait_http_available(b) + local provider = new_real_switch_provider(b, env) + local obs = require_successful_observe_groups(provider, { 'panel', 'poe', 'counters' }) + + assert_eq(obs.provider_id, 'switch-main') + assert_eq(obs.status.driver, 'rtl8380m_http') + assert_true(obs.status.available, 'observe_groups status should be available') + assert_not_nil(obs.surfaces, 'observe_groups should include surfaces') + assert_not_nil(obs.surfaces.GE8, 'observe_groups should include GE8') + assert_not_nil(obs.surfaces.GE9, 'observe_groups should include GE9') + assert_eq(obs.surfaces.GE9.link.media, 'fiber') + assert_not_nil(obs.power, 'poe group should include power') + assert_not_nil(obs.power.poe, 'poe group should include power.poe') + assert_not_nil(obs.raw, 'include_raw=true should preserve grouped source payloads') + assert_not_nil(obs.raw.home_main, 'grouped observation should capture home_main') + assert_not_nil(obs.raw.panel_info, 'grouped observation should capture panel_info') + assert_not_nil(obs.raw.poe_poe, 'grouped observation should capture poe_poe') + assert_not_nil(obs.raw.rmon_statistics, 'grouped observation should capture rmon_statistics') + assert_command_once(obs.commands, 'home_main') + assert_command_once(obs.commands, 'panel_info') + assert_command_once(obs.commands, 'poe_poe') + assert_command_once(obs.commands, 'rmon_statistics') + + provider:terminate('test complete') + http:terminate('test complete') + end, { timeout = env.run_timeout_s }) +end + +function T.rtl8380m_real_switch_runner_publishes_raw_observations() + local env, err = required_env() + if not env then return skip(err) end + + runfibers.run(function (scope) + local b = busmod.new() + local http = start_http_capability(b, env) + wait_http_available(b) + + local manager_conn = b:connect({ origin_base = { kind = 'local', component = 'wired-manager-real-switch-test' } }) + local resolver = assert(hal_deps.resolver(manager_conn)) + local dev_ev_ch = channel.new(16) + local cap_emit_ch = channel.new(32) + local harness = start_wired_manager_hal_harness(scope, b, dev_ev_ch, cap_emit_ch) + local reader = b:connect({ origin_base = { kind = 'local', component = 'wired-runner-test-reader' } }) + + wired_manager.terminate('test reset') + local ok_start, start_err = fibers.perform(wired_manager.start_op(nil, dev_ev_ch, cap_emit_ch, { + http_client_for = resolver:factory('http_client'), + })) + assert_true(ok_start, tostring(start_err)) + + local ok_apply, apply_err = fibers.perform(wired_manager.apply_config_op(switch_manager_config(env))) + assert_true(ok_apply, tostring(apply_err)) + + local status = probe.wait_retained_payload(reader, raw_provider_topic('switch-main', { 'status' }), { timeout = 3.0 }) + assert_not_nil(status, 'runner should publish raw provider status') + assert_true(status.available == true or status.state == 'observing', 'raw provider status should be observing or available') + + local surfaces_payload = probe.wait_retained_payload(reader, raw_provider_topic('switch-main', { 'state', 'surfaces' }), { timeout = env.run_timeout_s }) + local surfaces = assert_not_nil(surfaces_payload.surfaces, 'runner should publish raw surfaces') + assert_not_nil(surfaces.GE8, 'runner surfaces should include GE8') + assert_not_nil(surfaces.GE9, 'runner surfaces should include GE9') + assert_eq(surfaces.GE9.link.media, 'fiber') + + local power = probe.wait_retained_payload(reader, raw_provider_topic('switch-main', { 'state', 'power' }), { timeout = env.run_timeout_s }) + assert_not_nil(power.poe, 'runner should publish PoE power') + + wired_manager.terminate('test complete') + harness:cancel('test complete') + fibers.perform(harness:join_op()) + http:terminate('test complete') + end, { timeout = env.run_timeout_s + 5 }) +end + function T.rtl8380m_real_switch_snapshot_via_http_capability() local env, err = required_env() if not env then return skip(err) end @@ -348,4 +525,6 @@ function T.rtl8380m_real_switch_raw_observations_project_to_state_wired() end, { timeout = env.run_timeout_s }) end + + return T diff --git a/tests/unit/hal/wired_provider_spec.lua b/tests/unit/hal/wired_provider_spec.lua index 8bf37a7d..45ae4115 100644 --- a/tests/unit/hal/wired_provider_spec.lua +++ b/tests/unit/hal/wired_provider_spec.lua @@ -214,15 +214,76 @@ function tests.test_rtl8380m_http_accepts_narrow_http_client_factory() end -function tests.test_wired_manager_poll_interval_is_positive_and_defaults_to_one_second() + +function tests.test_wired_driver_separates_backend_provider_name_and_provider_id() + local driver_mod = require 'services.hal.drivers.wired' + local d, err = driver_mod.new({ + provider = 'static', + mode = 'read_only', + surfaces = { eth0 = { provider_surface_id = 'eth0' } }, + }, { provider_id = 'cm5-local-wired' }) + assert_not_nil(d, err) + assert_not_nil(d.backend) + assert_eq(d.provider, nil) + assert_eq(d.provider_name, 'static') + assert_eq(d.provider_id, 'cm5-local-wired') + assert_not_nil(d.snapshot_op) +end + +function tests.test_wired_driver_requires_observe_groups_backend_contract() + package.loaded['services.hal.backends.wired.providers.invalid_missing_observe_groups'] = { + new = function () + return { + snapshot_op = function () end, + watch_op = function () end, + apply_attachments_op = function () end, + set_poe_op = function () end, + bounce_op = function () end, + terminate = function () end, + } + end, + } + local driver_mod = require 'services.hal.drivers.wired' + local d, err = driver_mod.new({ provider = 'invalid_missing_observe_groups' }, { provider_id = 'bad' }) + assert_eq(d, nil) + assert_true(type(err) == 'string' and err:find('observe_groups_op', 1, true) ~= nil, tostring(err)) + package.loaded['services.hal.backends.wired.providers.invalid_missing_observe_groups'] = nil +end + +function tests.test_rtl8380m_observe_groups_deduplicates_shared_commands() + local commands, err = provider._test.commands_for_groups({ 'panel', 'poe', 'counters' }) + assert_not_nil(commands, err) + local seen = {} + for _, cmd in ipairs(commands) do + assert_eq(seen[cmd], nil, 'duplicate command ' .. tostring(cmd)) + seen[cmd] = true + end + assert_true(seen.home_main == true, 'home_main should be included once') + assert_true(seen.panel_info == true, 'panel_info should be included') + assert_true(seen.poe_poe == true, 'poe_poe should be included') + assert_true(seen.rmon_statistics == true, 'rmon_statistics should be included') +end + +function tests.test_wired_manager_requires_canonical_poll_table() local manager = require 'services.hal.managers.wired' - local n, err = manager._test.provider_poll_interval_s({}) - assert_eq(n, 1.0, err) - n, err = manager._test.provider_poll_interval_s({ poll_interval_s = 0.5 }) - assert_eq(n, 0.5, err) - n, err = manager._test.provider_poll_interval_s({ poll_interval_s = 0 }) - assert_eq(n, nil) - assert_true(type(err) == 'string' and err:find('positive', 1, true) ~= nil, tostring(err)) + local plan, err = manager._test.provider_poll_plan({}) + assert_eq(plan, nil) + assert_true(type(err) == 'string' and err:find('poll is required', 1, true) ~= nil, tostring(err)) + + plan, err = manager._test.provider_poll_plan({ poll_interval_s = 0.5 }) + assert_eq(plan, nil) + assert_true(type(err) == 'string' and err:find('poll_interval_s', 1, true) ~= nil, tostring(err)) + + plan, err = manager._test.provider_poll_plan({ + poll = { + static = { interval_s = 30.0, groups = { 'snapshot' } }, + }, + }) + assert_not_nil(plan, err) + assert_eq(#plan, 1) + assert_eq(plan[1].name, 'static') + assert_eq(plan[1].interval_s, 30.0) + assert_eq(plan[1].groups[1], 'snapshot') end return tests