Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/configs/bigbox-v1-cm-2.json
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,8 @@
"kind": "fabric_stage",
"target": "updater/main",
"chunk_size": 2048,
"artifact_store": "main"
"artifact_store": "main",
"timeout_s": 300
},
"commit-update": {
"kind": "rpc",
Expand Down Expand Up @@ -1125,13 +1126,10 @@
"peer_id": "mcu",
"transport": {
"kind": "uart",
"source": "uart",
"source": "uart_manager",
"class": "uart",
"id": "uart0",
"terminator": "\n",
"open_opts": {
"baud": 115200
}
"terminator": "\n"
},
"session": {
"identity_claim": {
Expand Down Expand Up @@ -1265,6 +1263,9 @@
"cap_id": "main",
"host": "0.0.0.0",
"port": 8080
},
"uploads": {
"require_auth": true
}
}
},
Expand All @@ -1278,7 +1279,8 @@
},
"mcu": {
"component": "mcu",
"backend": "component"
"backend": "component",
"stage_timeout_s": 300
},
"switch-main": {
"component": "switch-main"
Expand Down
43 changes: 42 additions & 1 deletion src/services/device/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local cap_deps_mod = require 'devicecode.support.capability_dependencies'
local dep_failure = require 'devicecode.support.dependency_failure'
local backpressure = require 'services.device.backpressure'
local dependency_mod = require 'services.device.dependencies'
local fabric_topics = require 'services.fabric.topics'
local tablex = require 'shared.table'

local M = {}
Expand All @@ -37,6 +38,45 @@ local function new_service_id()
return ('device-%d-%d'):format(os.time(), math.random(1, 1000000))
end

-- Device action timeout is owned by the action worker scope. The Fabric
-- transfer-manager request may legitimately stay open for that whole action, so
-- do not add lua-bus' default one-second call timeout here. The transfer budget
-- is passed as payload policy, while caller abandonment still aborts this Op.
local FABRIC_SEND_BLOB_CALL_OPTS = { timeout = false }

local function default_fabric_client(conn)
if type(conn) ~= 'table' or type(conn.call_op) ~= 'function' then return nil end

return {
send_blob_op = function (_, params, opts)
params = params or {}
opts = opts or {}
local timeout_s = opts.timeout or params.timeout
local ev, err = conn:call_op(fabric_topics.transfer_manager_rpc('send-blob'), {
link_id = params.link_id,
request_id = params.request_id or params.job_id,
xfer_id = params.xfer_id,
target = params.target,
source_owner = params.source_owner,
size = params.size,
digest_alg = params.digest_alg,
digest = params.digest,
chunk_size = params.chunk_size,
meta = params.meta,
timeout_s = timeout_s,
}, FABRIC_SEND_BLOB_CALL_OPTS)
if not ev then return nil, err end
return ev:wrap(function (reply, call_err)
if reply == nil then return nil, call_err end
if type(reply) == 'table' and reply.ok == false then
return nil, reply.err or reply.error or reply.reason or call_err or 'fabric_transfer_failed'
end
return (type(reply) == 'table' and (reply.result or reply.transfer)) or reply, nil
end)
end,
}
end

local function request_publication(state)
if state.auto_publish == false then return end
if state.publication_requested then return end
Expand Down Expand Up @@ -837,7 +877,7 @@ local function build_state(scope, params)
enable_observers = params.enable_observers,
auto_publish = params.auto_publish,
emit_events = params.emit_events,
fabric_client = params.fabric_client,
fabric_client = params.fabric_client or default_fabric_client(params.conn),
open_source = params.open_source,
open_source_op = params.open_source_op,
terminate_source = params.terminate_source,
Expand Down Expand Up @@ -974,5 +1014,6 @@ M.start_generation = start_generation
M.cancel_active_generation = cancel_active_generation
M.flush_publication = flush_publication
M.cleanup_publication_now = cleanup_publication_now
M.default_fabric_client = default_fabric_client

return M
6 changes: 5 additions & 1 deletion src/services/ui/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ local DEFAULTS = {
uploads = {
enabled = true,
max_bytes = 64 * 1024 * 1024,
require_auth = false,
},
sessions = {
prune_interval = 60,
Expand Down Expand Up @@ -56,7 +57,7 @@ local HTTP_KEYS = {

local STATIC_KEYS = { root = true, index = true, chunk_size = true }
local SSE_KEYS = { enabled = true, queue_len = true, max_replay = true, replay = true, pattern = true }
local UPLOAD_KEYS = { enabled = true, max_bytes = true }
local UPLOAD_KEYS = { enabled = true, max_bytes = true, require_auth = true }
local SESSION_KEYS = { prune_interval = true }

local function fail(msg) return nil, msg end
Expand Down Expand Up @@ -218,6 +219,9 @@ local function normalise_uploads(raw)
v, err = non_negative_int_or_nil(raw.max_bytes, 'uploads.max_bytes')
if err then return nil, err end
if v ~= nil then out.max_bytes = v end
v, err = bool_or_nil(raw.require_auth, 'uploads.require_auth')
if err then return nil, err end
if v ~= nil then out.require_auth = v end
return out, nil
end

Expand Down
10 changes: 9 additions & 1 deletion src/services/ui/http/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,15 @@ function M.run(scope, ctx, deps)
elseif route.kind == 'command' then
return handle_command(scope, owner, ctx, route, deps)
elseif route.kind == 'upload' then
return upload.run(scope, owner, ctx, deps.update or deps)
local update_deps = deps.update or deps
if update_deps.require_auth == true then
local principal = principal_from(ctx, deps)
if principal == nil then
perform_response(owner:reply_error_op(401, 'unauthenticated'))
return { status = 'unauthenticated' }
end
end
return upload.run(scope, owner, ctx, update_deps)
elseif route.kind == 'sse' then
return sse.run(scope, owner, route, deps)
elseif route.kind == 'static' then
Expand Down
1 change: 1 addition & 0 deletions src/services/ui/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ local function build_listener_opts(state, cfg, listener_generation, listener_id)
local update_opts = shallow_copy(params.update or {})
update_opts.max_bytes = uploads.max_bytes
update_opts.enabled = uploads.enabled
update_opts.require_auth = uploads.require_auth
update_opts.connect = update_opts.connect or params.connect
update_opts.bus = update_opts.bus or params.bus

Expand Down
40 changes: 39 additions & 1 deletion src/services/update/active_job.lua
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,48 @@ local function stage_context(params)
return backend, job, ctx
end

local function perform_backend_op_until(backend, name, required, deadline, ...)
local fn = backend_method(backend, name, required)
if not fn then
return nil, nil
end

local backend_op = fn(backend, ...)
local result, err

if deadline ~= nil then
if fibers.now() >= deadline then
return nil, name .. '_timeout'
end

-- The update phase owns this deadline. The backend Op, including any
-- bus request it opens, has no hidden call timeout. When timeout wins the
-- losing backend Op is aborted, allowing lua-bus to abandon the request and
-- Device to observe caller cancellation.
local which, a, b = fibers.perform(fibers.named_choice {
backend = backend_op,
timeout = sleep.sleep_until_op(deadline),
})
if which == 'timeout' then
return nil, name .. '_timeout'
end
result, err = a, b
else
result, err = fibers.perform(backend_op)
end

if result == nil then
return nil, err or (name .. '_failed')
end

return result, nil
end

function M.stage(_scope, params)
params = params or {}
local backend, job, ctx = stage_context(params)
local staged = perform_backend_op(backend, 'stage_op', true, job, ctx)
local staged, err = perform_backend_op_until(backend, 'stage_op', true, params.deadline, job, ctx)
if staged == nil then error(err or 'stage_op_failed', 0) end

return {
tag = 'staged',
Expand Down
29 changes: 29 additions & 0 deletions src/services/update/active_runtime.lua
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,28 @@ local function active_intent_for(job)
return job and (job.active_intent or job.active) or nil
end

local function positive_number(v)
local n = tonumber(v)
if type(n) ~= 'number' or n <= 0 then return nil end
return n
end

local function configured_phase_timeout_s(self, component, phase)
local cfg = self and self._config or nil
local components = cfg and cfg.components or nil
local rec = type(components) == 'table' and component and components[component] or nil
if type(rec) ~= 'table' then return nil end
phase = phase or 'stage'
return positive_number(rec[phase .. '_timeout_s'])
or positive_number(rec.timeout_s)
end

local function phase_deadline(self, job, phase)
local timeout_s = configured_phase_timeout_s(self, job and job.component, phase)
if timeout_s == nil then return nil end
return fibers.now() + timeout_s
end

local function reconcile_token_for(job, generation)
return table.concat({
tostring(generation or 0),
Expand Down Expand Up @@ -589,6 +611,7 @@ function Component:_launch_active_intent(job)
}, job, {
backend = self._backend,
phase = intent.phase,
deadline = phase_deadline(self, job, intent.phase),
})

if not handle then
Expand All @@ -610,6 +633,11 @@ function Component:update_adoption(adoption)
return true, nil
end

function Component:update_config(config)
self._config = copy(config or {})
return true, nil
end

function Component:consider_jobs()
if not self._jobs then return false, 'not_ready' end
if self._state.active ~= nil then return false, 'slot_busy' end
Expand Down Expand Up @@ -843,6 +871,7 @@ function M.start_component(scope, params)
_jobs = params.jobs,
_backend = params.backend,
_observer = params.observer,
_config = copy(params.config or {}),
_adoption = params.adoption or {},
_active_applies = {},
_active_launched = {},
Expand Down
9 changes: 8 additions & 1 deletion src/services/update/backends/component.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,18 @@ local function transfer_from(job, ctx)
}
end

local NO_BUS_TIMEOUT = { timeout = false }

local function call_component_op(self, component, method, payload, opts)
if type(self._conn) ~= 'table' or type(self._conn.call_op) ~= 'function' then
return op.always(nil, 'component_backend_connection_required')
end
return self._conn:call_op(topics.component_rpc(component, method), payload, opts or self._call_opts):wrap(function (reply, err)
-- Component update calls may legitimately take as long as the update phase
-- owns them for. Do not use lua-bus' default one-second call timeout here;
-- callers that own a budget must compose this Op with a sleep/deadline Op.
-- If that outer choice loses, lua-bus observes the abort and abandons the
-- request through the Request owner path.
return self._conn:call_op(topics.component_rpc(component, method), payload, opts or self._call_opts or NO_BUS_TIMEOUT):wrap(function (reply, err)
if reply == false then return nil, err or 'component_call_failed' end
return reply, err
end)
Expand Down
23 changes: 23 additions & 0 deletions src/services/update/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ local function sorted_count(t)
return n
end

local function normalise_optional_positive_number(t, key, where)
local v = t[key]
if v == nil then return true, nil end
local n = tonumber(v)
if type(n) ~= 'number' or n <= 0 then
return false, where .. '.' .. key .. ' must be a positive number'
end
t[key] = n
return true, nil
end

local function normalise_component_timeouts(c, where)
for _, key in ipairs({ 'timeout_s', 'stage_timeout_s', 'commit_timeout_s', 'reconcile_timeout_s' }) do
local ok, err = normalise_optional_positive_number(c, key, where)
if not ok then return nil, err end
end
return c, nil
end

local function normalise_components(raw)
local out = {}
local list = raw or {}
Expand All @@ -47,6 +66,8 @@ local function normalise_components(raw)
end
local c = copy(item)
c.component = id
local ok, terr = normalise_component_timeouts(c, 'components[' .. tostring(i) .. ']')
if not ok then return nil, terr end
out[id] = c
end
else
Expand All @@ -65,6 +86,8 @@ local function normalise_components(raw)
end
local c = copy(item)
c.component = id
local ok, terr = normalise_component_timeouts(c, 'components.' .. id)
if not ok then return nil, terr end
out[id] = c
end
end
Expand Down
4 changes: 4 additions & 0 deletions src/services/update/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,9 @@ local function ensure_active_runtime(self, reason)
if self._active_component and type(self._active_component.update_adoption) == 'function' then
self._active_component:update_adoption(adoption)
end
if self._active_component and type(self._active_component.update_config) == 'function' then
self._active_component:update_config(self._config)
end

if self._active_component ~= nil then return true, nil end

Expand All @@ -1103,6 +1106,7 @@ local function ensure_active_runtime(self, reason)
jobs = self._jobs,
backend = self._backend,
observer = self._component_observer,
config = self._config,
adoption = adoption,
})
if not active_component then
Expand Down
Loading