Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ Raw imported member truth:
* `raw/member/mcu/meta`
* `raw/member/mcu/status`
* `raw/member/mcu/state/...`
* `raw/member/mcu/cap/updater/main/...`
* `cap/self/updater/main/rpc/...`

Fabric domain summaries:

Expand Down
8 changes: 3 additions & 5 deletions src/configs/bigbox-v1-cm-2.json
Original file line number Diff line number Diff line change
Expand Up @@ -1125,13 +1125,11 @@
"peer_id": "mcu",
"transport": {
"kind": "uart",
"source": "uart",
"source": "uart_uart0",
"class": "uart",
"id": "uart0",
"terminator": "\n",
"open_opts": {
"baud": 115200
}
"cap_wait_timeout_s": 60,
"terminator": "\n"
},
"session": {
"identity_claim": {
Expand Down
89 changes: 80 additions & 9 deletions src/devicecode/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,66 @@ local function cleanup_child_scope(child, reason)
child:cancel(reason or 'cleanup')
end

local function shallow_copy(t)
local out = {}
for k, v in pairs(t or {}) do
out[k] = v
end
return out
end

local function env_value(getenv, name)
local v = getenv(name)
if v == nil or v == '' then return nil end
return v
end

local function build_service_opts(base_opts, getenv)
getenv = getenv or os.getenv
local out = shallow_copy(base_opts)

local ui_password = env_value(getenv, 'DEVICECODE_UI_ADMIN_PASSWORD')
if ui_password == nil then
return out
end

local ui_opts = shallow_copy(out.ui)
if ui_opts.auth == nil and ui_opts.auth_opts == nil then
local username = env_value(getenv, 'DEVICECODE_UI_ADMIN_USERNAME') or 'admin'
ui_opts.auth_opts = {
users = {
[username] = {
password = ui_password,
principal = authz.user_principal(username, { roles = { 'admin' } }),
},
},
}
end
out.ui = ui_opts

return out
end

local function service_start_opts(name, env, connect, extra_opts)
local out = shallow_copy(extra_opts)
out.name = name
out.env = env
out.connect = connect
return out
end

local function auth_user_summary(opts)
local users = opts and opts.auth_opts and opts.auth_opts.users
if type(users) ~= 'table' then return nil, nil end
local count = 0
local first = nil
for username in pairs(users) do
count = count + 1
first = first or tostring(username)
end
return count, first
end

local function spawn_service(child, bus, name, mod, env, extra_opts)
return child:spawn(function()
local conn = bus:connect({
Expand All @@ -71,14 +131,7 @@ local function spawn_service(child, bus, name, mod, env, extra_opts)
})
end

mod.start(conn, {
name = name,
env = env,
connect = connect_as,
services = extra_opts and extra_opts.services or nil,
run_http = extra_opts and extra_opts.run_http or nil,
verify_login = extra_opts and extra_opts.verify_login or nil,
})
mod.start(conn, service_start_opts(name, env, connect_as, extra_opts))

error(('service returned unexpectedly: %s'):format(tostring(name)), 0)
end)
Expand Down Expand Up @@ -186,7 +239,7 @@ function M.run(scope, params)
local service_loader = params.service_loader or function(name)
return require('services.' .. name)
end
local service_opts = params.service_opts or {}
local service_opts = build_service_opts(params.service_opts)

local main_conn = bus:connect({
principal = authz.service_principal('main'),
Expand Down Expand Up @@ -215,6 +268,18 @@ function M.run(scope, params)
fail_boot(main_conn, name, 'load_failed', lerr)
end

if name == 'ui' then
local user_count, first_user = auth_user_summary(service_opts[name])
if user_count then
main_conn:publish({ 'obs', 'log', 'main', 'info' }, {
what = 'ui_auth_configured',
source = 'service_opts',
users = user_count,
first_user = first_user,
})
end
end

local child, cerr = scope:child()
if not child then
fail_boot(main_conn, name, 'child_scope_failed', cerr)
Expand Down Expand Up @@ -305,4 +370,10 @@ function M.run(scope, params)
end
end

M._test = {
build_service_opts = build_service_opts,
service_start_opts = service_start_opts,
auth_user_summary = auth_user_summary,
}

return M
1 change: 1 addition & 0 deletions src/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ else
add_path('../vendor/lua-fibers/src/')
add_path('../vendor/lua-bus/src/')
add_path('../vendor/lua-trie/src/')
add_path('./')
end

local fibers = require 'fibers'
Expand Down
134 changes: 129 additions & 5 deletions src/services/device/action_worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local scope_mod = require 'fibers.scope'
local request_owner = require 'devicecode.support.request_owner'
local resource = require 'devicecode.support.resource'
local fabric_stage = require 'services.device.fabric_stage'
local store_bus = require 'services.update.artifacts.store_bus'

local M = {}

Expand All @@ -19,6 +20,13 @@ local function request_payload(req)
return req.payload
end

local function shallow_copy(t)
if type(t) ~= 'table' then return t end
local out = {}
for k, v in pairs(t) do out[k] = v end
return out
end

local function default_call_op(conn, topic, payload, opts)
if type(conn) == 'table' and type(conn.call_op) == 'function' then
return conn:call_op(topic, payload, opts)
Expand All @@ -27,6 +35,49 @@ local function default_call_op(conn, topic, payload, opts)
return nil, 'connection does not support call_op'
end

local function topic_string(topic)
if type(topic) ~= 'table' then return nil end
local out = {}
for i = 1, #topic do out[i] = tostring(topic[i]) end
return table.concat(out, '/')
end

local function prepare_action(ctx)
return ctx and ctx.action == 'prepare-update'
end

local function scalar_payload_field(payload, key)
if type(payload) ~= 'table' then return nil end
local v = payload[key]
local tv = type(v)
if tv == 'string' or tv == 'number' or tv == 'boolean' then
return tostring(v)
end
return nil
end

local function print_action_rpc_diag(ctx, event, payload, fields)
if not prepare_action(ctx) then return end
fields = fields or {}
local action = ctx.action_spec or {}
local parts = { '[device-action-rpc]', 'ev', event }
local function add(key, value)
if value == nil or value == '' then return end
parts[#parts + 1] = key
parts[#parts + 1] = tostring(value)
end
add('component', ctx.component_id)
add('action', ctx.action)
add('request_id', ctx.request_id)
add('topic', topic_string(action.call_topic))
add('job_id', scalar_payload_field(payload, 'job_id'))
add('expected_image_id', scalar_payload_field(payload, 'expected_image_id'))
add('duration_ms', fields.duration_ms)
add('status', fields.status)
add('err', fields.err)
print(table.concat(parts, ' '))
end

local function normalise_reply(reply)
if type(reply) == 'table' and reply.ok == false then
return nil, reply.reason or reply.err or 'action failed'
Expand Down Expand Up @@ -78,6 +129,9 @@ local function public_reply_payload(result)
return sanitise_public_payload(result)
end

local open_source_op
local source_terminator

local function finalise_owner(owner, status, primary)
if owner:done() then return end
if status == 'cancelled' then
Expand All @@ -89,9 +143,10 @@ local function finalise_owner(owner, status, primary)
end
end

local function run_rpc_op(ctx, owner)
local function rpc_call_op(ctx, owner, payload)
local action = ctx.action_spec
local payload = request_payload(ctx.request)
local started = fibers.now()
print_action_rpc_diag(ctx, 'component_action_rpc_start', payload)
local call_ev, cerr = default_call_op(ctx.conn, action.call_topic, payload, {
timeout = action.timeout or ctx.timeout,
deadline = ctx.deadline,
Expand All @@ -100,32 +155,95 @@ local function run_rpc_op(ctx, owner)
if not call_ev then
local reason = cerr or 'rpc_unavailable'
owner:fail_once(reason)
print_action_rpc_diag(ctx, 'component_action_rpc_done', payload, {
duration_ms = math.floor(((fibers.now() - started) * 1000) + 0.5),
status = 'unavailable',
err = reason,
})
return op.always(public_result('unavailable', { err = reason }))
end

return call_ev:wrap(function (reply, err)
if reply == nil then
local reason = err or 'rpc_failed'
owner:fail_once(reason)
print_action_rpc_diag(ctx, 'component_action_rpc_done', payload, {
duration_ms = math.floor(((fibers.now() - started) * 1000) + 0.5),
status = 'remote_failed',
err = reason,
})
return public_result('remote_failed', { err = reason })
end

local value, rerr = normalise_reply(reply)
if value == nil and rerr ~= nil then
owner:fail_once(rerr)
print_action_rpc_diag(ctx, 'component_action_rpc_done', payload, {
duration_ms = math.floor(((fibers.now() - started) * 1000) + 0.5),
status = 'remote_failed',
err = rerr,
})
return public_result('remote_failed', { err = rerr })
end

owner:reply_once(value)
print_action_rpc_diag(ctx, 'component_action_rpc_done', payload, {
duration_ms = math.floor(((fibers.now() - started) * 1000) + 0.5),
status = 'succeeded',
})
return public_result('succeeded', { value = value, reply_payload = value })
end)
end

local function rpc_stage_needs_source(ctx, payload)
if ctx.action ~= 'stage-update' or type(payload) ~= 'table' then return false end
if payload.source ~= nil or payload.artifact ~= nil then return false end
return payload.artifact_ref ~= nil or payload.ref ~= nil
end

local function run_rpc_op(ctx, owner)
local payload = request_payload(ctx.request)
if not rpc_stage_needs_source(ctx, payload) then
return rpc_call_op(ctx, owner, payload)
end

return fibers.run_scope_op(function (source_scope)
local source, err = fibers.perform(open_source_op(ctx))
if not source then
owner:fail_once(err or 'source_required')
return public_result('rejected', { err = err or 'source_required' })
end

local owned = resource.owned(source, {
terminate = source_terminator(ctx),
label = 'device rpc-stage source termination',
})
source_scope:finally(function (_, status, primary)
owned:terminate_checked(
primary or status or 'rpc_stage_terminated',
'device rpc-stage source termination failed'
)
end)

local call_payload = shallow_copy(payload)
call_payload.source = owned:value()
return fibers.perform(rpc_call_op(ctx, owner, call_payload))
end):wrap(function (st, _rep, result_or_primary)
if st == 'ok' then
return result_or_primary
end

local reason = result_or_primary or st or 'rpc_stage_failed'
owner:fail_once(reason)
return public_result(st == 'cancelled' and 'cancelled' or 'failed', { err = reason })
end)
end

local function is_op(v)
return type(v) == 'table' and getmetatable(v) == op.Op
end

local function open_source_op(ctx)
function open_source_op(ctx)
local payload = request_payload(ctx.request) or {}
local opener = ctx.open_source_op

Expand All @@ -145,11 +263,17 @@ local function open_source_op(ctx)

if payload.source ~= nil then return op.always(payload.source, nil) end
if payload.artifact ~= nil then return op.always(payload.artifact, nil) end
if payload.artifact_ref ~= nil or payload.ref ~= nil then
local action = ctx.action_spec or {}
local store_id = payload.artifact_store or action.artifact_store or 'main'
local store = store_bus.new(ctx.conn, { id = store_id })
return store:open_source_op(payload.artifact_ref or payload.ref)
end
return op.always(nil, 'source_required')
end


local function source_terminator(ctx)
function source_terminator(ctx)
local terminate = ctx.terminate_source
if terminate ~= nil and type(terminate) ~= 'function' then
error('terminate_source must be a function', 0)
Expand Down Expand Up @@ -286,7 +410,7 @@ function M.run(scope, ctx)
error('device action worker must run in its owning scope', 0)
end
local req = assert(ctx.request, 'device action worker requires request')
local action_spec = assert(ctx.action_spec, 'device action worker requires action_spec')
assert(ctx.action_spec, 'device action worker requires action_spec')
local owner = ctx.request_owner or request_owner.new(req, ctx.request_owner_opts)

-- When action work is launched through action_manager, the request owner is
Expand Down
Loading