Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/services/http/cap_surface.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ function M.retain_static(conn, id, status, stats)
control_plane_only = true,
compat = { response_parsers = { strict = true, ['legacy-http1-close'] = true } },
state = { stats = topics.state(id, 'stats') },
observability = { stats = topics.obs_metric(id, 'stats') },
observability = { status = topics.obs_metric(id, 'status') },
})
conn:retain(topics.status(id), status or { state = 'starting', available = false })
conn:retain(topics.state(id, 'stats'), stats or {})
conn:retain(topics.obs_metric(id, 'stats'), stats or {})
conn:retain(topics.obs_metric(id, 'status'), stats or {})
end

function M.unretain_static(conn, id)
conn:unretain(topics.meta(id))
conn:unretain(topics.status(id))
conn:unretain(topics.state(id, 'stats'))
conn:unretain(topics.obs_metric(id, 'status'))
conn:unretain(topics.obs_metric(id, 'stats'))
end

Expand Down
47 changes: 47 additions & 0 deletions src/services/http/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,27 @@ local DEFAULTS = {
max_response_body = 16 * 1024 * 1024,
legacy_http1_close_max_response_bytes = 1024 * 1024,
},
observability = {
status_interval_s = 30,
request_trace = false,
success_events = false,
failure_rate_limit_s = 60,
},
}

local ROOT_KEYS = {
schema = true,
enabled = true,
id = true,
policy = true,
observability = true,
}

local OBSERVABILITY_KEYS = {
status_interval_s = true,
request_trace = true,
success_events = true,
failure_rate_limit_s = true,
}

local POLICY_KEYS = {
Expand Down Expand Up @@ -128,6 +142,35 @@ local function normalise_policy(raw)
return out, nil
end


local function normalise_observability(raw)
if raw == nil then raw = {} end
if type(raw) ~= 'table' then return fail('observability must be a table') end
local ok, err = allowed(raw, OBSERVABILITY_KEYS, 'observability')
if not ok then return nil, err end

local out = copy_plain(DEFAULTS.observability)
local v

v, err = positive_number_or_nil(raw.status_interval_s, 'observability.status_interval_s')
if err then return nil, err end
if v ~= nil then out.status_interval_s = v end

v, err = bool_or_nil(raw.request_trace, 'observability.request_trace')
if err then return nil, err end
if v ~= nil then out.request_trace = v end

v, err = bool_or_nil(raw.success_events, 'observability.success_events')
if err then return nil, err end
if v ~= nil then out.success_events = v end

v, err = positive_number_or_nil(raw.failure_rate_limit_s, 'observability.failure_rate_limit_s')
if err then return nil, err end
if v ~= nil then out.failure_rate_limit_s = v end

return out, nil
end

function M.normalise(raw)
if raw == nil then raw = {} end
if type(raw) ~= 'table' then return fail('http config must be a table') end
Expand All @@ -147,11 +190,15 @@ function M.normalise(raw)
local policy, perr = normalise_policy(raw.policy)
if not policy then return nil, perr end

local observability, oerr = normalise_observability(raw.observability)
if not observability then return nil, oerr end

return {
schema = M.SCHEMA,
enabled = enabled ~= false,
id = id or DEFAULTS.id,
policy = policy,
observability = observability,
}, nil
end

Expand Down
177 changes: 170 additions & 7 deletions src/services/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,50 @@ local function count_where(t, pred)
return n
end

local function now()
return fibers.now and fibers.now() or os.clock()
end

local function obs_config(self)
local cfg = self._state and self._state.config and self._state.config.observability
return cfg or config_mod.DEFAULTS.observability
end

local function obs_status_key(snap)
return table.concat({
tostring(snap.state),
tostring(snap.backend),
tostring(snap.ready),
tostring(snap.active_listeners),
tostring(snap.active_websockets),
tostring(snap.last_error),
tostring(snap.policy_generation),
}, '|')
end

local function uri_summary(uri)
if type(uri) ~= 'string' then return {} end
local scheme, rest = uri:match('^([%w%+%-%.]+)://(.+)$')
if not scheme then return { uri = uri } end
local authority, path_query = rest:match('^([^/%?#]+)(.*)$')
path_query = path_query or ''
local host = authority or ''
host = host:gsub('^.-@', '')
local path, query = path_query:match('^([^%?]*)(%?.*)$')
if path == nil then path = path_query end
if path == '' then path = '/' end
local cmd = query and query:match('[%?&]cmd=([^&]+)') or nil
return { scheme = scheme, host = host, path = path, query = query, cmd = cmd }
end

local function request_summary(req)
local p = req and req.payload or {}
local u = uri_summary(p.uri)
u.method = p.method or 'GET'
u.response_parser = p.response_parser
return u
end

function HttpService:_derive_snapshot()
local st = self._state
return {
Expand All @@ -101,7 +145,100 @@ function HttpService:_derive_snapshot()
}
end

function HttpService:_publish_model()

function HttpService:_publish_obs_log(level, payload)
payload = payload or {}
payload.service = 'http'
payload.http_id = self._id
payload.ts = now()
self._conn:publish(topics.obs_log(self._id, level), payload)
return true
end

function HttpService:_should_publish_obs_status(snap, ev)
local cfg = obs_config(self)
local key = obs_status_key(snap)
local obs = self._obs or {}
if key ~= obs.last_status_key then return true, key end
local interval = cfg.status_interval_s or 30
if interval > 0 and ((now() - (obs.last_status_emit_at or 0)) >= interval) then return true, key end
if ev and ev.kind == 'http_operation_done' and ev.status == 'ok' and cfg.success_events == true then return true, key end
return false, key
end

function HttpService:_publish_obs_status(snap, ev)
local publish, key = self:_should_publish_obs_status(snap, ev)
if not publish then return true end
self._conn:retain(topics.obs_metric(self._id, 'status'), snap)
self._obs.last_status_key = key
self._obs.last_status_emit_at = now()
return true
end

function HttpService:_record_http_failure(rec, ev)
local cfg = obs_config(self)
local limit_s = cfg.failure_rate_limit_s or 60
local req = self._state.requests[rec.request_id] or {}
local target = req.target or {}
local reason = ev.primary or ev.status or 'operation_failed'
self._obs.had_failure = true
local key = table.concat({
tostring(rec.operation),
tostring(reason),
tostring(target.method),
tostring(target.host),
tostring(target.path),
tostring(target.cmd or target.query),
}, '|')
local windows = self._obs.failure_windows
local win = windows[key]
local t = now()
if not win then
windows[key] = { first_at = t, last_at = t, suppressed = 0 }
return self:_publish_obs_log('warn', {
what = 'request_failed',
reason = reason,
operation = rec.operation,
method = target.method,
host = target.host,
path = target.path,
cmd = target.cmd,
response_parser = target.response_parser,
})
end
win.last_at = t
win.suppressed = (win.suppressed or 0) + 1
if limit_s <= 0 or (t - (win.first_at or t)) >= limit_s then
local suppressed = win.suppressed or 0
win.first_at = t
win.suppressed = 0
return self:_publish_obs_log('warn', {
what = 'request_failed_suppressed',
reason = reason,
operation = rec.operation,
method = target.method,
host = target.host,
path = target.path,
cmd = target.cmd,
response_parser = target.response_parser,
suppressed = suppressed,
window_s = limit_s,
})
end
return true
end

function HttpService:_record_http_recovery(rec)
if not self._obs.had_failure then return true end
self._obs.had_failure = false
self._obs.failure_windows = {}
return self:_publish_obs_log('info', {
what = 'request_recovered',
operation = rec and rec.operation,
})
end

function HttpService:_publish_model(ev)
if self._closed and not self._publishing_after_close then return end
local snap = self._model:snapshot()
self._conn:retain(topics.status(self._id), {
Expand All @@ -112,12 +249,12 @@ function HttpService:_publish_model()
last_error = snap.last_error,
})
self._conn:retain(topics.state(self._id, 'stats'), snap)
self._conn:retain(topics.obs_metric(self._id, 'stats'), snap)
self:_publish_obs_status(snap, ev)
end

function HttpService:_refresh_model()
function HttpService:_refresh_model(ev)
local changed = self._model:set_snapshot(self:_derive_snapshot())
if changed ~= nil then self:_publish_model() end
if changed ~= nil then self:_publish_model(ev) end
return true
end

Expand Down Expand Up @@ -442,6 +579,7 @@ function HttpService:_handle_cap_request(ev)
generation = ev.generation or self._generation,
verb = verb,
owner = owner,
target = request_summary(req),
state = 'received',
}
end
Expand Down Expand Up @@ -483,34 +621,53 @@ function HttpService:_handle_operation_done(ev)
rec.report = ev.report
rec.result = ev.result
rec.primary = ev.primary
if ev.status ~= 'ok' then self._state.last_error = ev.primary or ev.status end
if ev.status ~= 'ok' then self._state.last_error = ev.primary or ev.status; self._obs.had_failure = true end

local owner = self._owned_requests[rec.request_id]
local reqrec = self._state.requests[rec.request_id]
local operation_success = false
if ev.status == 'ok' then
local reply = ev.result or {}
local ok, rerr = true, nil
if owner and not owner:done() then
ok, rerr = owner:reply_once(reply)
end
if ok == true then
operation_success = true
if reply.handle_id then self._registry:mark_transferred(reply.handle_id) end
if reqrec then reqrec.state = 'resolved' end
else
if reply.handle_id then self:_terminate_handle(reply.handle_id, rerr or 'reply_failed') end
if reqrec then reqrec.state = 'failed'; reqrec.reason = rerr or 'reply_failed' end
self._state.last_error = rerr or 'reply_failed'
self._obs.had_failure = true
self:_record_http_failure(rec, { status = 'failed', primary = rerr or 'reply_failed' })
end
else
if owner and not owner:done() then owner:fail_once(ev.primary or ev.status or 'operation_failed') end
if reqrec then reqrec.state = ev.status or 'failed'; reqrec.reason = ev.primary end
self:_record_http_failure(rec, ev)
end
if obs_config(self).request_trace == true then
local target = reqrec and reqrec.target or {}
self:_publish_obs_log('debug', {
what = 'request_completed',
operation = rec.operation,
status = ev.status,
reason = ev.primary,
method = target.method,
host = target.host,
path = target.path,
cmd = target.cmd,
response_parser = target.response_parser,
})
end
if operation_success == true then self:_record_http_recovery(rec) end
self._owned_requests[rec.request_id] = nil
self:_log_event(ev)
return true
end


function HttpService:_handle_config_changed(ev)
local cfg, err = config_mod.normalise(ev.raw or {})
if not cfg then
Expand Down Expand Up @@ -608,7 +765,7 @@ function HttpService:_handle_event(ev)
local log_now = ev.kind ~= 'http_operation_done'
if log_now then self:_log_event(ev) end
local ok, err = self:_reduce_event(ev)
self:_refresh_model()
self:_refresh_model(ev)
return ok, err
end

Expand Down Expand Up @@ -723,6 +880,12 @@ function M.open_handle(conn, opts)
_events = {},
_event_seq = 0,
_owned_requests = {},
_obs = {
last_status_key = nil,
last_status_emit_at = nil,
failure_windows = {},
had_failure = false,
},
_state = {
service_state = 'starting',
backend = 'starting',
Expand Down
1 change: 1 addition & 0 deletions src/services/http/topics.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function M.status(id) return topic.append(cap(id), 'status') end
function M.state(id, key) return topic.append(cap(id), 'state', key) end
function M.event(id, name) return topic.append(cap(id), 'event', name) end
function M.obs_metric(id, name) return { 'obs', 'v1', 'http', 'metric', id or 'main', name } end
function M.obs_log(id, level) return { 'obs', 'v1', 'http', 'log', id or 'main', level or 'info' } end
function M.rpc(id, verb) return topic.append(cap(id), 'rpc', verb) end

return M
Loading