From cec300b09c369326da60ac85e275a94c41e824fc Mon Sep 17 00:00:00 2001 From: Parag More Date: Fri, 29 May 2026 20:44:07 +0530 Subject: [PATCH 1/4] feat: add miner log download REST endpoints and unit tests --- .../unit/handlers/minerLogs.handlers.test.js | 335 +++++++++++++ tests/unit/lib/log-downloader.test.js | 455 ++++++++++++++++++ workers/http.node.wrk.js | 9 + workers/lib/constants.js | 6 + workers/lib/log-downloader.js | 113 +++++ .../lib/server/handlers/actions.handlers.js | 64 ++- .../lib/server/handlers/minerLogs.handlers.js | 160 ++++++ workers/lib/server/index.js | 2 + workers/lib/server/routes/actions.routes.js | 20 +- workers/lib/server/routes/minerLogs.routes.js | 59 +++ 10 files changed, 1220 insertions(+), 3 deletions(-) create mode 100644 tests/unit/handlers/minerLogs.handlers.test.js create mode 100644 tests/unit/lib/log-downloader.test.js create mode 100644 workers/lib/log-downloader.js create mode 100644 workers/lib/server/handlers/minerLogs.handlers.js create mode 100644 workers/lib/server/routes/minerLogs.routes.js diff --git a/tests/unit/handlers/minerLogs.handlers.test.js b/tests/unit/handlers/minerLogs.handlers.test.js new file mode 100644 index 0000000..8bea9a4 --- /dev/null +++ b/tests/unit/handlers/minerLogs.handlers.test.js @@ -0,0 +1,335 @@ +'use strict' + +const test = require('brittle') +const { + startMinerLogDownload, + getMinerLogDownloadStatus +} = require('../../../workers/lib/server/handlers/minerLogs.handlers') + +// ───────────────────────────────────────────────────────────────────────────── +// Helpers +// ───────────────────────────────────────────────────────────────────────────── + +function makeMockReply () { + let _code = 200 + let _body = null + const reply = { + get statusCode () { return _code }, + get body () { return _body }, + code (statusCode) { + _code = statusCode + return reply + }, + send (body) { + _body = body + return body + } + } + return reply +} + +function makeMockReq (minerId = 'miner-001', jobId = null, token = 'test-token') { + const req = { + params: { minerId }, + _info: { + authToken: token, + user: { metadata: { email: 'ops@example.com' } } + } + } + if (jobId !== null) req.params.jobId = jobId + return req +} + +function makeMockCtx ({ write = true, permissions = ['admin'], requestDataResult = null } = {}) { + return { + authLib: { + getTokenPerms: async () => ({ write, permissions }) + }, + dataProxy: { + requestData: async (method, payload, callback) => { + if (requestDataResult === null) return [] + if (typeof callback === 'function') { + const arr = [] + const items = Array.isArray(requestDataResult) ? requestDataResult : [requestDataResult] + for (const item of items) callback(item, arr) + return arr + } + return Array.isArray(requestDataResult) ? requestDataResult : [requestDataResult] + } + } + } +} + +function makeActionResult (overrides = {}) { + return { + targets: { + 'rack-001': { + calls: [ + { + result: { + success: true, + data: { + coreKey: 'a'.repeat(64), + byteLength: 1024, + expiresAt: Date.now() + 3600000, + minerId: 'miner-001', + ...overrides.data + } + } + } + ] + } + }, + ...overrides + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// startMinerLogDownload +// ───────────────────────────────────────────────────────────────────────────── + +test('startMinerLogDownload - returns 202 with jobId on success', async (t) => { + const ctx = makeMockCtx({ requestDataResult: { id: '12345' } }) + const req = makeMockReq('miner-001') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(reply.statusCode, 202, 'should return 202 Accepted') + t.is(reply.body.jobId, '12345', 'should include jobId') + t.ok(reply.body.statusUrl.includes('/status'), 'should include statusUrl') + t.ok(reply.body.fileUrl.includes('/file'), 'should include fileUrl') + t.ok(reply.body.statusUrl.includes('miner-001'), 'statusUrl should include minerId') + t.ok(reply.body.fileUrl.includes('miner-001'), 'fileUrl should include minerId') + t.pass() +}) + +test('startMinerLogDownload - returns 403 when token has no write permission', async (t) => { + const ctx = makeMockCtx({ write: false }) + const req = makeMockReq('miner-001') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(reply.statusCode, 403, 'should return 403 Forbidden') + t.is(reply.body.error, 'ERR_WRITE_PERM_REQUIRED', 'should return ERR_WRITE_PERM_REQUIRED') + t.pass() +}) + +test('startMinerLogDownload - returns 400 when action submit fails', async (t) => { + const ctx = makeMockCtx({ requestDataResult: { id: null, errors: ['ERR_MINER_NOT_FOUND'] } }) + const req = makeMockReq('miner-001') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(reply.statusCode, 400, 'should return 400 when action has no valid id') + t.ok(reply.body.error, 'should include error message') + t.pass() +}) + +test('startMinerLogDownload - returns 400 with error from result when available', async (t) => { + const ctx = makeMockCtx({ requestDataResult: { id: null, errors: ['ERR_SPECIFIC_FAILURE'] } }) + const req = makeMockReq('miner-001') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(reply.statusCode, 400, 'should return 400') + t.is(reply.body.error, 'ERR_SPECIFIC_FAILURE', 'should propagate the action error message') + t.pass() +}) + +test('startMinerLogDownload - returns 500 on unexpected dataProxy error', async (t) => { + const ctx = { + authLib: { getTokenPerms: async () => ({ write: true, permissions: [] }) }, + dataProxy: { + requestData: async () => { throw new Error('connection refused') } + } + } + const req = makeMockReq('miner-001') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(reply.statusCode, 500, 'should return 500 on unexpected error') + t.ok(reply.body.error, 'should include error message') + t.pass() +}) + +test('startMinerLogDownload - uses minerId from route params', async (t) => { + let capturedPayload = null + const ctx = { + authLib: { getTokenPerms: async () => ({ write: true, permissions: ['admin'] }) }, + dataProxy: { + requestData: async (method, payload, callback) => { + capturedPayload = payload + const arr = [] + callback({ id: '99' }, arr) + return arr + } + } + } + const req = makeMockReq('specific-miner-id') + const reply = makeMockReply() + + await startMinerLogDownload(ctx, req, reply) + + t.is(capturedPayload.query.id, 'specific-miner-id', 'should use minerId from route params') + t.is(capturedPayload.action, 'downloadLogs', 'should submit downloadLogs action') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// getMinerLogDownloadStatus +// ───────────────────────────────────────────────────────────────────────────── + +test('getMinerLogDownloadStatus - returns pending when action not in done bucket', async (t) => { + const ctx = makeMockCtx({ requestDataResult: [] }) + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.statusCode, 200, 'should return 200') + t.is(reply.body.status, 'pending', 'should return pending status') + t.is(reply.body.jobId, '42', 'should echo jobId') + t.pass() +}) + +test('getMinerLogDownloadStatus - returns ready with metadata when log is available', async (t) => { + const expiresAt = Date.now() + 3600000 + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { + requestData: async () => [makeActionResult({ data: { expiresAt, byteLength: 2048 } })] + } + } + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.statusCode, 200, 'should return 200') + t.is(reply.body.status, 'ready', 'should return ready status') + t.is(reply.body.jobId, '42', 'should echo jobId') + t.is(reply.body.byteLength, 2048, 'should include byteLength') + t.is(reply.body.expiresAt, expiresAt, 'should include expiresAt') + t.ok(reply.body.fileUrl.includes('/file'), 'should include fileUrl') + t.pass() +}) + +test('getMinerLogDownloadStatus - returns failed when no coreKey in targets', async (t) => { + const action = { + targets: { + 'rack-001': { + calls: [ + { result: { success: false, error_msg: 'ERR_MINER_UNREACHABLE' } } + ] + } + } + } + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { requestData: async () => [action] } + } + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.statusCode, 200, 'should return 200') + t.is(reply.body.status, 'failed', 'should return failed status') + t.is(reply.body.error, 'ERR_MINER_UNREACHABLE', 'should propagate error message from action result') + t.pass() +}) + +test('getMinerLogDownloadStatus - returns failed with generic error when no error_msg', async (t) => { + const action = { + targets: { + 'rack-001': { + calls: [{ result: { success: false } }] + } + } + } + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { requestData: async () => [action] } + } + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.body.status, 'failed', 'should return failed status') + t.is(reply.body.error, 'ERR_LOG_NOT_AVAILABLE', 'should use fallback error code') + t.pass() +}) + +test('getMinerLogDownloadStatus - returns expired when TTL has passed', async (t) => { + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { + requestData: async () => [makeActionResult({ data: { expiresAt: Date.now() - 1000 } })] + } + } + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.statusCode, 200, 'should return 200') + t.is(reply.body.status, 'expired', 'should return expired status') + t.is(reply.body.error, 'ERR_LOG_EXPIRED', 'should return ERR_LOG_EXPIRED') + t.pass() +}) + +test('getMinerLogDownloadStatus - returns 500 on unexpected dataProxy error', async (t) => { + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { + requestData: async () => { throw new Error('redis timeout') } + } + } + const req = makeMockReq('miner-001', '42') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.statusCode, 500, 'should return 500 on unexpected error') + t.ok(reply.body.error, 'should include error message') + t.pass() +}) + +test('getMinerLogDownloadStatus - finds successful result across multiple racks', async (t) => { + const expiresAt = Date.now() + 3600000 + const action = { + targets: { + 'rack-001': { + calls: [{ result: { success: false, error_msg: 'offline' } }] + }, + 'rack-002': { + calls: [ + { + result: { + success: true, + data: { coreKey: 'b'.repeat(64), byteLength: 512, expiresAt, minerId: 'miner-002' } + } + } + ] + } + } + } + const ctx = { + authLib: { getTokenPerms: async () => ({}) }, + dataProxy: { requestData: async () => [action] } + } + const req = makeMockReq('miner-002', '77') + const reply = makeMockReply() + + await getMinerLogDownloadStatus(ctx, req, reply) + + t.is(reply.body.status, 'ready', 'should return ready when at least one rack has a result') + t.is(reply.body.byteLength, 512, 'should return correct byteLength from second rack') + t.pass() +}) diff --git a/tests/unit/lib/log-downloader.test.js b/tests/unit/lib/log-downloader.test.js new file mode 100644 index 0000000..b77d41b --- /dev/null +++ b/tests/unit/lib/log-downloader.test.js @@ -0,0 +1,455 @@ +'use strict' + +const test = require('brittle') +const { Readable } = require('node:stream') +const { randomBytes } = require('node:crypto') +const LogDownloader = require('../../../workers/lib/log-downloader') + +// ───────────────────────────────────────────────────────────────────────────── +// Fake builders +// ───────────────────────────────────────────────────────────────────────────── + +function makeDiscovery () { + let destroyed = false + return { + destroy: async () => { destroyed = true }, + get _destroyed () { return destroyed } + } +} + +function makeReadStream () { + return new Readable({ read () {} }) +} + +function makeCore (opts = {}) { + const key = opts.key || randomBytes(32) + const discoveryKey = opts.discoveryKey || randomBytes(32) + const rs = opts.readStream || makeReadStream() + let _length = opts.length !== undefined ? opts.length : 1 + let cleared = false + let closed = false + + return { + key, + discoveryKey, + get length () { return _length }, + set length (v) { _length = v }, + ready: async () => {}, + findingPeers: () => () => {}, + update: opts.update || (async () => ({ changed: true })), + createByteStream: () => rs, + replicate: () => {}, + clear: async () => { cleared = true }, + close: async () => { closed = true }, + _rs: rs, + get _cleared () { return cleared }, + get _closed () { return closed } + } +} + +function makeSwarm () { + const _listeners = {} + const joined = [] + + return { + joined, + on (event, fn) { + _listeners[event] = _listeners[event] || [] + _listeners[event].push(fn) + }, + join (discoveryKey, opts) { + joined.push({ discoveryKey, opts }) + return makeDiscovery() + }, + flush: async () => {}, + emit (event, ...args) { + for (const fn of (_listeners[event] || [])) fn(...args) + }, + listenerCount (event) { + return (_listeners[event] || []).length + } + } +} + +function makeNetFac (existingSwarm = null) { + let _swarm = existingSwarm + return { + get swarm () { return _swarm }, + startSwarm: async () => { _swarm = makeSwarm() } + } +} + +function makeStoreFac (coreFactory = null) { + return { + getCore (opts) { + return coreFactory ? coreFactory(opts) : makeCore({ key: opts.key }) + } + } +} + +const TEST_KEY = Buffer.alloc(32).fill(0x01) +const TEST_KEY_HEX = TEST_KEY.toString('hex') + +// ───────────────────────────────────────────────────────────────────────────── +// Constructor +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - constructor stores facilities', (t) => { + const netFac = makeNetFac() + const storeFac = makeStoreFac() + + const dl = new LogDownloader({ netFac, storeFac }) + + t.ok(dl._netFac === netFac, 'should store netFac reference') + t.ok(dl._storeFac === storeFac, 'should store storeFac reference') + t.pass() +}) + +test('LogDownloader - constructor defaults peerTimeoutMs to 30s', (t) => { + const dl = new LogDownloader({ netFac: makeNetFac(), storeFac: makeStoreFac() }) + t.is(dl._peerTimeoutMs, 30000, 'should default to 30000ms') + t.pass() +}) + +test('LogDownloader - constructor accepts custom peerTimeoutMs', (t) => { + const dl = new LogDownloader({ netFac: makeNetFac(), storeFac: makeStoreFac(), peerTimeoutMs: 5000 }) + t.is(dl._peerTimeoutMs, 5000, 'should use provided peerTimeoutMs') + t.pass() +}) + +test('LogDownloader - constructor initialises empty downloads map', (t) => { + const dl = new LogDownloader({ netFac: makeNetFac(), storeFac: makeStoreFac() }) + t.ok(dl._downloads instanceof Map, 'should be a Map') + t.is(dl._downloads.size, 0, 'should start empty') + t.is(dl._swarmReady, false, 'swarmReady should start false') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// _ensureSwarm +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - _ensureSwarm calls startSwarm when swarm is null', async (t) => { + let startCalls = 0 + const swarm = makeSwarm() + const netFac = { + get swarm () { return startCalls > 0 ? swarm : null }, + startSwarm: async () => { startCalls++ } + } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + await dl._ensureSwarm() + + t.is(startCalls, 1, 'should call startSwarm exactly once') + t.ok(dl._swarmReady, 'should set swarmReady to true') + t.pass() +}) + +test('LogDownloader - _ensureSwarm skips startSwarm when swarm already exists', async (t) => { + let startCalls = 0 + const swarm = makeSwarm() + const netFac = { + get swarm () { return swarm }, + startSwarm: async () => { startCalls++ } + } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + await dl._ensureSwarm() + + t.is(startCalls, 0, 'should not call startSwarm when swarm already exists') + t.pass() +}) + +test('LogDownloader - _ensureSwarm registers connection handler exactly once', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + await dl._ensureSwarm() + await dl._ensureSwarm() + await dl._ensureSwarm() + + t.is(swarm.listenerCount('connection'), 1, 'should register connection listener only once') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// stream — happy path +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - stream returns a Readable', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + const rs = await dl.stream(TEST_KEY_HEX, 100) + + t.ok(rs instanceof Readable, 'should return a Readable stream') + t.pass() +}) + +test('LogDownloader - stream joins swarm as client only', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + await dl.stream(TEST_KEY_HEX, 100) + + t.is(swarm.joined.length, 1, 'should join exactly one DHT topic') + t.alike(swarm.joined[0].opts, { server: false, client: true }, 'should join as client only') + t.pass() +}) + +test('LogDownloader - stream registers download entry while active', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const dl = new LogDownloader({ netFac, storeFac: makeStoreFac() }) + await dl.stream(TEST_KEY_HEX, 100) + + t.is(dl._downloads.size, 1, 'should track the active download') + t.ok(dl._downloads.has(TEST_KEY_HEX), 'should key the entry by coreKeyHex') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// stream — error cases +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - stream throws ERR_LOG_PEER_TIMEOUT when update hangs', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const storeFac = makeStoreFac(() => { + const c = makeCore({ + length: 0, + update: () => new Promise(() => {}) // never resolves + }) + c.length = 0 + return c + }) + + const dl = new LogDownloader({ netFac, storeFac, peerTimeoutMs: 50 }) + + try { + await dl.stream(TEST_KEY_HEX, 100) + t.fail('should have thrown ERR_LOG_PEER_TIMEOUT') + } catch (err) { + t.is(err.message, 'ERR_LOG_PEER_TIMEOUT', 'should throw ERR_LOG_PEER_TIMEOUT') + } + + t.pass() +}) + +test('LogDownloader - stream throws ERR_LOG_PEER_NOT_FOUND when update has no change', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const storeFac = makeStoreFac(() => { + const c = makeCore({ update: async () => ({ changed: false }) }) + c.length = 0 + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + + try { + await dl.stream(TEST_KEY_HEX, 100) + t.fail('should have thrown ERR_LOG_PEER_NOT_FOUND') + } catch (err) { + t.is(err.message, 'ERR_LOG_PEER_NOT_FOUND', 'should throw ERR_LOG_PEER_NOT_FOUND') + } + + t.pass() +}) + +test('LogDownloader - stream removes download from map on peer-not-found', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const storeFac = makeStoreFac(() => { + const c = makeCore({ update: async () => ({ changed: false }) }) + c.length = 0 + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + + try { await dl.stream(TEST_KEY_HEX, 100) } catch {} + + t.is(dl._downloads.size, 0, 'should remove download from map after failure') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// stream — auto-cleanup on stream events +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - stream cleans up after stream end', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const rs = makeReadStream() + let coreClosed = false + + const storeFac = makeStoreFac(() => { + const c = makeCore() + c.createByteStream = () => rs + c.close = async () => { coreClosed = true } + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + await dl.stream(TEST_KEY_HEX, 100) + + t.is(dl._downloads.size, 1, 'precondition: download tracked before stream ends') + + rs.emit('end') + await new Promise(r => setImmediate(r)) + + t.is(dl._downloads.size, 0, 'should remove download from map after stream end') + t.ok(coreClosed, 'should close core after stream end') + t.pass() +}) + +test('LogDownloader - stream cleans up after stream error', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const rs = makeReadStream() + rs.on('error', () => {}) // suppress unhandled error + let coreClosed = false + + const storeFac = makeStoreFac(() => { + const c = makeCore() + c.createByteStream = () => rs + c.close = async () => { coreClosed = true } + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + await dl.stream(TEST_KEY_HEX, 100) + + rs.emit('error', new Error('connection dropped')) + await new Promise(r => setImmediate(r)) + + t.is(dl._downloads.size, 0, 'should remove download from map after stream error') + t.ok(coreClosed, 'should close core after stream error') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// connection handler — topic-based routing +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - connection handler replicates core with matching topic', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const replicateCalls = [] + const storeFac = makeStoreFac(() => { + const c = makeCore() + c.replicate = (socket) => replicateCalls.push(socket) + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + await dl.stream(TEST_KEY_HEX, 100) + + const entry = dl._downloads.get(TEST_KEY_HEX) + const discoveryKeyBuf = Buffer.from(entry.discoveryKeyHex, 'hex') + + const fakeSocket = {} + swarm.emit('connection', fakeSocket, { topics: [discoveryKeyBuf] }) + + t.is(replicateCalls.length, 1, 'should replicate when topic matches discoveryKey') + t.ok(replicateCalls[0] === fakeSocket, 'should pass socket to replicate') + t.pass() +}) + +test('LogDownloader - connection handler skips core with non-matching topic', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const replicateCalls = [] + const storeFac = makeStoreFac(() => { + const c = makeCore() + c.replicate = (socket) => replicateCalls.push(socket) + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + await dl.stream(TEST_KEY_HEX, 100) + + // Emit connection with a completely different topic + swarm.emit('connection', {}, { topics: [Buffer.alloc(32).fill(0xff)] }) + + t.is(replicateCalls.length, 0, 'should not replicate when topic does not match') + t.pass() +}) + +test('LogDownloader - connection handler handles empty topics list', async (t) => { + const swarm = makeSwarm() + const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } + + const replicateCalls = [] + const storeFac = makeStoreFac(() => { + const c = makeCore() + c.replicate = (socket) => replicateCalls.push(socket) + return c + }) + + const dl = new LogDownloader({ netFac, storeFac }) + await dl.stream(TEST_KEY_HEX, 100) + + swarm.emit('connection', {}, { topics: [] }) + + t.is(replicateCalls.length, 0, 'should not replicate when topics list is empty') + t.pass() +}) + +// ───────────────────────────────────────────────────────────────────────────── +// _cleanup +// ───────────────────────────────────────────────────────────────────────────── + +test('LogDownloader - _cleanup destroys discovery and closes core', async (t) => { + let discoveryDestroyed = false + let coreClosed = false + let coreCleared = false + + const discovery = { + destroy: async () => { discoveryDestroyed = true } + } + const core = makeCore() + core.length = 1 + core.close = async () => { coreClosed = true } + core.clear = async () => { coreCleared = true } + + const dl = new LogDownloader({ netFac: makeNetFac(makeSwarm()), storeFac: makeStoreFac() }) + dl._downloads.set(TEST_KEY_HEX, { core, discoveryKeyHex: TEST_KEY_HEX }) + + await dl._cleanup(TEST_KEY_HEX, core, discovery) + + t.ok(discoveryDestroyed, 'should destroy the DHT discovery') + t.ok(coreClosed, 'should close the core session') + t.ok(coreCleared, 'should clear downloaded blocks from storage') + t.is(dl._downloads.size, 0, 'should remove entry from downloads map') + t.pass() +}) + +test('LogDownloader - _cleanup skips clear when core has no blocks', async (t) => { + let coreCleared = false + + const discovery = { destroy: async () => {} } + const core = makeCore() + core.length = 0 + core.clear = async () => { coreCleared = true } + + const dl = new LogDownloader({ netFac: makeNetFac(makeSwarm()), storeFac: makeStoreFac() }) + dl._downloads.set(TEST_KEY_HEX, { core, discoveryKeyHex: TEST_KEY_HEX }) + + await dl._cleanup(TEST_KEY_HEX, core, discovery) + + t.not(coreCleared, true, 'should not call clear when length is 0') + t.pass() +}) diff --git a/workers/http.node.wrk.js b/workers/http.node.wrk.js index c79ca89..5bd0e83 100644 --- a/workers/http.node.wrk.js +++ b/workers/http.node.wrk.js @@ -14,6 +14,7 @@ const { AlertsService } = require('./lib/alerts') const { auditLogger } = require('./lib/server/lib/auditLogger') const { createDataProxy } = require('./lib/data.proxy') const { AUTH_CACHE_TTL } = require('./lib/constants') +const LogDownloader = require('./lib/log-downloader') class WrkServerHttp extends TetherWrkBase { constructor (conf, ctx) { @@ -32,6 +33,7 @@ class WrkServerHttp extends TetherWrkBase { this.isRpcMode = ctx.isRpcMode !== false if (ctx.ork) this.ork = ctx.ork this.dataProxy = createDataProxy(this) + this.logDownloader = null // initialised in _start() after facilities are ready this.init() this.start() @@ -89,6 +91,13 @@ class WrkServerHttp extends TetherWrkBase { async.series([ next => { super._start(next) }, async () => { + // Facilities are ready — construct LogDownloader with live facility references + this.logDownloader = new LogDownloader({ + netFac: this.net_r0, + storeFac: this.store_s0, + peerTimeoutMs: this.conf?.logDownloader?.peerTimeoutMs + }) + await this.net_r0.startRpcServer() const httpd = this.httpd_h0 diff --git a/workers/lib/constants.js b/workers/lib/constants.js index f9e7ecc..1b7aa00 100644 --- a/workers/lib/constants.js +++ b/workers/lib/constants.js @@ -112,6 +112,12 @@ const ENDPOINTS = { ACTIONS_VOTING_BATCH: '/auth/actions/voting/batch', ACTIONS_VOTE: '/auth/actions/voting/:id/vote', ACTIONS_CANCEL: '/auth/actions/voting/cancel', + DOWNLOAD_LOGS: '/auth/download-logs/:id', + + // Miner log download flow (start → poll status → stream file) + MINER_DOWNLOAD_LOGS_START: '/auth/miners/:minerId/download-logs', + MINER_DOWNLOAD_LOGS_STATUS: '/auth/miners/:minerId/download-logs/:jobId/status', + MINER_DOWNLOAD_LOGS_FILE: '/auth/miners/:minerId/download-logs/:jobId/file', // Logs endpoints TAIL_LOG: '/auth/tail-log', diff --git a/workers/lib/log-downloader.js b/workers/lib/log-downloader.js new file mode 100644 index 0000000..cd8423d --- /dev/null +++ b/workers/lib/log-downloader.js @@ -0,0 +1,113 @@ +'use strict' + +const DEFAULT_PEER_TIMEOUT_MS = 30000 + +/** + * Downloads miner log files from wrk-miner via Hypercore/Hyperswarm P2P. + * + * Uses the hp-svc-facs-net facility (net_r0) for Hyperswarm and the + * hp-svc-facs-store facility (store_s0) for Hypercore/Corestore storage. + * No direct hypercore or hyperswarm require needed. + * + * The wrk-miner exposes a Hypercore identified by coreKey. + * This class: + * 1. Creates a read-only clone of that core via the Corestore facility + * 2. Joins Hyperswarm (via net_r0) on the core's discoveryKey to find the wrk-miner peer + * 3. Returns a Readable byte stream suitable for piping directly to an HTTP response + * 4. Clears downloaded blocks and closes the Corestore session after the stream ends + * + * One shared connection handler on net_r0.swarm routes connections to the correct + * in-flight download by matching peerInfo.topics against active download discoveryKeys. + */ +class LogDownloader { + constructor ({ netFac, storeFac, peerTimeoutMs } = {}) { + this._netFac = netFac + this._storeFac = storeFac + this._peerTimeoutMs = peerTimeoutMs || DEFAULT_PEER_TIMEOUT_MS + // coreKeyHex -> { core, discoveryKeyHex } + this._downloads = new Map() + this._swarmReady = false + } + + /** + * Ensure the swarm is started and our (one-time) connection handler is registered. + */ + async _ensureSwarm () { + if (!this._netFac.swarm) { + await this._netFac.startSwarm() + } + + if (this._swarmReady) return + this._swarmReady = true + + // Route each new connection to the matching in-flight download core. + this._netFac.swarm.on('connection', (socket, peerInfo) => { + const topics = new Set( + (peerInfo.topics || []).map(t => Buffer.from(t).toString('hex')) + ) + for (const [, entry] of this._downloads) { + if (topics.has(entry.discoveryKeyHex)) { + entry.core.replicate(socket) + } + } + }) + } + + /** + * Open a streaming read of a remote Hypercore and return it as a Node.js Readable. + * The stream fetches blocks lazily from the wrk-miner peer — no full buffering. + * + * @param {string} coreKeyHex Hex-encoded public key of the Hypercore (from action result) + * @param {number} byteLength Expected total byte length (for stream bounds + HTTP Content-Length) + * @returns {Promise} + */ + async stream (coreKeyHex, byteLength) { + await this._ensureSwarm() + + // Open a read-only Corestore session for the remote core — no temp dir needed + const core = this._storeFac.getCore({ key: Buffer.from(coreKeyHex, 'hex') }) + await core.ready() + + const discoveryKeyHex = core.discoveryKey.toString('hex') + this._downloads.set(coreKeyHex, { core, discoveryKeyHex }) + + // Join the DHT topic to find the wrk-miner serving this core + const discovery = this._netFac.swarm.join(core.discoveryKey, { server: false, client: true }) + const peersDone = core.findingPeers() + this._netFac.swarm.flush().then(peersDone, peersDone) + + // Wait for the remote core's block tree to arrive (core.length > 0 required) + const updateResult = await Promise.race([ + core.update(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('ERR_LOG_PEER_TIMEOUT')), this._peerTimeoutMs) + ) + ]) + + if (!core.length && (!updateResult || !updateResult.changed)) { + await this._cleanup(coreKeyHex, core, discovery) + throw new Error('ERR_LOG_PEER_NOT_FOUND') + } + + // Lazy byte stream — blocks are fetched from the peer as the HTTP client reads + const rs = core.createByteStream({ byteOffset: 0, byteLength }) + + const cleanup = () => this._cleanup(coreKeyHex, core, discovery).catch(() => {}) + rs.once('end', cleanup) + rs.once('error', cleanup) + + return rs + } + + async _cleanup (coreKeyHex, core, discovery) { + this._downloads.delete(coreKeyHex) + try { await discovery.destroy() } catch {} + try { + // Free downloaded blocks from Corestore storage, then close the session + if (core.length > 0) await core.clear(0, core.length) + await core.close() + } catch {} + } +} + +module.exports = LogDownloader diff --git a/workers/lib/server/handlers/actions.handlers.js b/workers/lib/server/handlers/actions.handlers.js index c1ddb20..dd57583 100644 --- a/workers/lib/server/handlers/actions.handlers.js +++ b/workers/lib/server/handlers/actions.handlers.js @@ -142,6 +142,67 @@ async function cancelActionsBatch (ctx, req) { }) } +/** + * Stream a miner log file to the HTTP client. + * + * The action result (stored by ork) contains only metadata: { coreKey, byteLength, expiresAt }. + * The actual log bytes are fetched directly from the wrk-miner via Hypercore/Hyperswarm P2P, + * bypassing the HRPC action pipeline entirely. + * + * Route: GET /auth/download-logs/:id + */ +async function downloadLogFile (ctx, req, reply) { + const { id } = req.params + + // Fetch the completed action from ork + const results = await ctx.dataProxy.requestData('getAction', { id, type: 'done' }) + const action = Array.isArray(results) ? results.find(r => r && !r.error) : results + + if (!action || !action.targets) { + return reply.code(404).send({ error: 'ERR_ACTION_NOT_FOUND' }) + } + + // Walk targets to find the first successful downloadLogs result + let meta = null + for (const rack of Object.values(action.targets)) { + for (const call of (rack.calls || [])) { + if (call.result?.success && call.result?.data?.coreKey) { + meta = call.result.data + break + } + } + if (meta) break + } + + if (!meta) { + return reply.code(404).send({ error: 'ERR_LOG_NOT_AVAILABLE' }) + } + + if (meta.expiresAt && Date.now() > meta.expiresAt) { + return reply.code(410).send({ error: 'ERR_LOG_EXPIRED' }) + } + + // Set streaming headers — Content-Length enables browser download progress + const filename = `miner-log-${meta.minerId || 'unknown'}-${id}.log` + reply.header('Content-Type', 'application/octet-stream') + reply.header('Content-Disposition', `attachment; filename="${filename}"`) + reply.header('Content-Length', meta.byteLength) + reply.header('Cache-Control', 'no-store') + + let stream + try { + stream = await ctx.logDownloader.stream(meta.coreKey, meta.byteLength) + } catch (err) { + const code = err.message === 'ERR_LOG_PEER_TIMEOUT' || err.message === 'ERR_LOG_PEER_NOT_FOUND' + ? 503 + : 500 + return reply.code(code).send({ error: err.message }) + } + + // Fastify pipes a Readable stream directly to the HTTP response — no buffering + return reply.send(stream) +} + module.exports = { queryActionsBatch, queryActions, @@ -149,5 +210,6 @@ module.exports = { pushAction, voteAction, cancelActionsBatch, - pushActionsBatch + pushActionsBatch, + downloadLogFile } diff --git a/workers/lib/server/handlers/minerLogs.handlers.js b/workers/lib/server/handlers/minerLogs.handlers.js new file mode 100644 index 0000000..f437353 --- /dev/null +++ b/workers/lib/server/handlers/minerLogs.handlers.js @@ -0,0 +1,160 @@ +'use strict' + +const { downloadLogFile } = require('./actions.handlers') + +/** + * Miner log download — three-step REST flow for large async file transfers: + * + * 1. POST /auth/miners/:minerId/download-logs + * Submits a downloadLogs action for the miner. + * Returns 202 Accepted immediately with { jobId }. + * + * 2. GET /auth/miners/:minerId/download-logs/:jobId/status + * Polls whether the action has completed. + * Returns { status: 'pending' | 'ready' | 'failed' | 'expired', ... }. + * + * 3. GET /auth/miners/:minerId/download-logs/:jobId/file + * Streams the binary log file once status is 'ready'. + * Uses the Hypercore P2P pipeline — no buffering. + */ + +/** + * POST /auth/miners/:minerId/download-logs + * + * Submits the downloadLogs action for the given miner and returns the jobId + * (which is the action ID) the client can use to poll status. + */ +async function startMinerLogDownload (ctx, req, reply) { + const { minerId } = req.params + + const { write, permissions } = await ctx.authLib.getTokenPerms(req._info.authToken) + if (!write) { + return reply.code(403).send({ error: 'ERR_WRITE_PERM_REQUIRED' }) + } + + const payload = { + query: { id: minerId }, + action: 'downloadLogs', + params: [], + voter: req._info.user.metadata.email, + authPerms: permissions + } + + let results + try { + results = await ctx.dataProxy.requestData('pushAction', payload, (res, arr) => { + if (res.error) { + arr.push({ id: null, errors: [res.error] }) + } else { + arr.push(res) + } + }) + } catch (err) { + return reply.code(500).send({ error: err.message }) + } + + const result = Array.isArray(results) + ? results.find(r => r && r.id !== null && r.id !== undefined) + : results + + if (!result || result.id == null) { + const errMsg = (Array.isArray(results) ? results[0]?.errors?.[0] : null) || 'ERR_ACTION_SUBMIT_FAILED' + return reply.code(400).send({ error: errMsg }) + } + + const jobId = String(result.id) + + return reply.code(202).send({ + jobId, + statusUrl: `/auth/miners/${encodeURIComponent(minerId)}/download-logs/${jobId}/status`, + fileUrl: `/auth/miners/${encodeURIComponent(minerId)}/download-logs/${jobId}/file` + }) +} + +/** + * GET /auth/miners/:minerId/download-logs/:jobId/status + * + * Polls the action result to determine whether the log is ready to download. + * + * Status values: + * pending — action not yet completed (still in voting/executing pipeline) + * ready — Hypercore is serving the log; use fileUrl to download + * failed — action completed but the miner returned an error + * expired — log was ready but the Hypercore TTL has passed + */ +async function getMinerLogDownloadStatus (ctx, req, reply) { + const { minerId, jobId } = req.params + + let action = null + try { + const results = await ctx.dataProxy.requestData('getAction', { id: jobId, type: 'done' }) + action = Array.isArray(results) ? results.find(r => r && !r.error) : results + } catch (err) { + return reply.code(500).send({ error: err.message }) + } + + // Action not yet in the 'done' bucket — still executing through the pipeline + if (!action || !action.targets) { + return reply.code(200).send({ status: 'pending', jobId }) + } + + // Walk targets to find the first successful downloadLogs result with a coreKey + let meta = null + let firstError = null + + for (const rack of Object.values(action.targets)) { + for (const call of (rack.calls || [])) { + if (call.result?.success && call.result?.data?.coreKey) { + meta = call.result.data + break + } + if (!firstError && call.result?.error_msg) { + firstError = call.result.error_msg + } + } + if (meta) break + } + + if (!meta) { + return reply.code(200).send({ + status: 'failed', + jobId, + error: firstError || 'ERR_LOG_NOT_AVAILABLE' + }) + } + + if (meta.expiresAt && Date.now() > meta.expiresAt) { + return reply.code(200).send({ + status: 'expired', + jobId, + error: 'ERR_LOG_EXPIRED' + }) + } + + return reply.code(200).send({ + status: 'ready', + jobId, + minerId: meta.minerId || minerId, + byteLength: meta.byteLength, + expiresAt: meta.expiresAt, + fileUrl: `/auth/miners/${encodeURIComponent(minerId)}/download-logs/${jobId}/file` + }) +} + +/** + * GET /auth/miners/:minerId/download-logs/:jobId/file + * + * Streams the binary miner log directly to the HTTP client via the + * Hypercore P2P pipeline. Delegates entirely to downloadLogFile which + * handles action lookup, TTL check, streaming headers, and Hyperswarm transfer. + */ +function getMinerLogFile (ctx, req, reply) { + // downloadLogFile expects req.params.id — bridge from our :jobId param + return downloadLogFile(ctx, { ...req, params: { ...req.params, id: req.params.jobId } }, reply) +} + +module.exports = { + startMinerLogDownload, + getMinerLogDownloadStatus, + getMinerLogFile +} diff --git a/workers/lib/server/index.js b/workers/lib/server/index.js index 8d5011f..5ced8d0 100644 --- a/workers/lib/server/index.js +++ b/workers/lib/server/index.js @@ -16,6 +16,7 @@ const devicesRoutes = require('./routes/devices.routes') const metricsRoutes = require('./routes/metrics.routes') const alertsRoutes = require('./routes/alerts.routes') const minersRoutes = require('./routes/miners.routes') +const minerLogsRoutes = require('./routes/minerLogs.routes') const groupsRoutes = require('./routes/groups.routes') const coolingSystemRoutes = require('./routes/cooling.system.routes') const energySystemRoutes = require('./routes/energy.system.routes') @@ -47,6 +48,7 @@ function routes (ctx) { ...metricsRoutes(ctx), ...alertsRoutes(ctx), ...minersRoutes(ctx), + ...minerLogsRoutes(ctx), ...groupsRoutes(ctx), ...coolingSystemRoutes(ctx), ...energySystemRoutes(ctx), diff --git a/workers/lib/server/routes/actions.routes.js b/workers/lib/server/routes/actions.routes.js index dda05e6..c9ca539 100644 --- a/workers/lib/server/routes/actions.routes.js +++ b/workers/lib/server/routes/actions.routes.js @@ -11,9 +11,10 @@ const { pushAction, voteAction, cancelActionsBatch, - pushActionsBatch + pushActionsBatch, + downloadLogFile } = require('../handlers/actions.handlers') -const { createAuthRoute, createCachedAuthRoute } = require('../lib/routeHelpers') +const { createAuthRoute, createCachedAuthRoute, createAuthOnRequest } = require('../lib/routeHelpers') module.exports = (ctx) => { return [ @@ -154,6 +155,21 @@ module.exports = (ctx) => { } }, ...createAuthRoute(ctx, cancelActionsBatch) + }, + { + method: HTTP_METHODS.GET, + url: ENDPOINTS.DOWNLOAD_LOGS, + schema: { + params: { + type: 'object', + properties: { + id: { type: 'string' } + }, + required: ['id'] + } + }, + onRequest: createAuthOnRequest(ctx), + handler: (req, reply) => downloadLogFile(ctx, req, reply) } ] } diff --git a/workers/lib/server/routes/minerLogs.routes.js b/workers/lib/server/routes/minerLogs.routes.js new file mode 100644 index 0000000..2836995 --- /dev/null +++ b/workers/lib/server/routes/minerLogs.routes.js @@ -0,0 +1,59 @@ +'use strict' + +const { ENDPOINTS, HTTP_METHODS } = require('../../constants') +const { + startMinerLogDownload, + getMinerLogDownloadStatus, + getMinerLogFile +} = require('../handlers/minerLogs.handlers') +const { createAuthOnRequest } = require('../lib/routeHelpers') + +module.exports = (ctx) => [ + { + method: HTTP_METHODS.POST, + url: ENDPOINTS.MINER_DOWNLOAD_LOGS_START, + schema: { + params: { + type: 'object', + properties: { + minerId: { type: 'string' } + }, + required: ['minerId'] + } + }, + onRequest: createAuthOnRequest(ctx), + handler: (req, reply) => startMinerLogDownload(ctx, req, reply) + }, + { + method: HTTP_METHODS.GET, + url: ENDPOINTS.MINER_DOWNLOAD_LOGS_STATUS, + schema: { + params: { + type: 'object', + properties: { + minerId: { type: 'string' }, + jobId: { type: 'string' } + }, + required: ['minerId', 'jobId'] + } + }, + onRequest: createAuthOnRequest(ctx), + handler: (req, reply) => getMinerLogDownloadStatus(ctx, req, reply) + }, + { + method: HTTP_METHODS.GET, + url: ENDPOINTS.MINER_DOWNLOAD_LOGS_FILE, + schema: { + params: { + type: 'object', + properties: { + minerId: { type: 'string' }, + jobId: { type: 'string' } + }, + required: ['minerId', 'jobId'] + } + }, + onRequest: createAuthOnRequest(ctx), + handler: (req, reply) => getMinerLogFile(ctx, req, reply) + } +] From 279d0477cb93ccfba2102905d9e271d27b8958eb Mon Sep 17 00:00:00 2001 From: Parag More Date: Fri, 29 May 2026 20:46:01 +0530 Subject: [PATCH 2/4] add LogDownloader for P2P miner log streaming Introduces a LogDownloader class that fetches miner log files directly from wrk-miner via Hypercore/Hyperswarm, bypassing the HRPC pipeline. Wired up in the http worker after facilities are ready. --- workers/http.node.wrk.js | 1 - workers/lib/log-downloader.js | 7 ------- 2 files changed, 8 deletions(-) diff --git a/workers/http.node.wrk.js b/workers/http.node.wrk.js index 5bd0e83..33029b8 100644 --- a/workers/http.node.wrk.js +++ b/workers/http.node.wrk.js @@ -91,7 +91,6 @@ class WrkServerHttp extends TetherWrkBase { async.series([ next => { super._start(next) }, async () => { - // Facilities are ready — construct LogDownloader with live facility references this.logDownloader = new LogDownloader({ netFac: this.net_r0, storeFac: this.store_s0, diff --git a/workers/lib/log-downloader.js b/workers/lib/log-downloader.js index cd8423d..81c733f 100644 --- a/workers/lib/log-downloader.js +++ b/workers/lib/log-downloader.js @@ -29,9 +29,6 @@ class LogDownloader { this._swarmReady = false } - /** - * Ensure the swarm is started and our (one-time) connection handler is registered. - */ async _ensureSwarm () { if (!this._netFac.swarm) { await this._netFac.startSwarm() @@ -40,7 +37,6 @@ class LogDownloader { if (this._swarmReady) return this._swarmReady = true - // Route each new connection to the matching in-flight download core. this._netFac.swarm.on('connection', (socket, peerInfo) => { const topics = new Set( (peerInfo.topics || []).map(t => Buffer.from(t).toString('hex')) @@ -64,14 +60,12 @@ class LogDownloader { async stream (coreKeyHex, byteLength) { await this._ensureSwarm() - // Open a read-only Corestore session for the remote core — no temp dir needed const core = this._storeFac.getCore({ key: Buffer.from(coreKeyHex, 'hex') }) await core.ready() const discoveryKeyHex = core.discoveryKey.toString('hex') this._downloads.set(coreKeyHex, { core, discoveryKeyHex }) - // Join the DHT topic to find the wrk-miner serving this core const discovery = this._netFac.swarm.join(core.discoveryKey, { server: false, client: true }) const peersDone = core.findingPeers() this._netFac.swarm.flush().then(peersDone, peersDone) @@ -103,7 +97,6 @@ class LogDownloader { this._downloads.delete(coreKeyHex) try { await discovery.destroy() } catch {} try { - // Free downloaded blocks from Corestore storage, then close the session if (core.length > 0) await core.clear(0, core.length) await core.close() } catch {} From 0f93ce4720ffcb26c3ef1883fc9c7b8948fd699c Mon Sep 17 00:00:00 2001 From: Parag More Date: Fri, 29 May 2026 20:47:18 +0530 Subject: [PATCH 3/4] trim verbose comments in log download handlers --- workers/lib/server/handlers/actions.handlers.js | 15 +++------------ workers/lib/server/handlers/minerLogs.handlers.js | 14 -------------- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/workers/lib/server/handlers/actions.handlers.js b/workers/lib/server/handlers/actions.handlers.js index dd57583..67db838 100644 --- a/workers/lib/server/handlers/actions.handlers.js +++ b/workers/lib/server/handlers/actions.handlers.js @@ -142,19 +142,11 @@ async function cancelActionsBatch (ctx, req) { }) } -/** - * Stream a miner log file to the HTTP client. - * - * The action result (stored by ork) contains only metadata: { coreKey, byteLength, expiresAt }. - * The actual log bytes are fetched directly from the wrk-miner via Hypercore/Hyperswarm P2P, - * bypassing the HRPC action pipeline entirely. - * - * Route: GET /auth/download-logs/:id - */ +// Action result contains only metadata (coreKey, byteLength, expiresAt) — actual bytes +// come directly from wrk-miner over Hypercore/Hyperswarm, bypassing the HRPC pipeline. async function downloadLogFile (ctx, req, reply) { const { id } = req.params - // Fetch the completed action from ork const results = await ctx.dataProxy.requestData('getAction', { id, type: 'done' }) const action = Array.isArray(results) ? results.find(r => r && !r.error) : results @@ -162,7 +154,6 @@ async function downloadLogFile (ctx, req, reply) { return reply.code(404).send({ error: 'ERR_ACTION_NOT_FOUND' }) } - // Walk targets to find the first successful downloadLogs result let meta = null for (const rack of Object.values(action.targets)) { for (const call of (rack.calls || [])) { @@ -182,7 +173,7 @@ async function downloadLogFile (ctx, req, reply) { return reply.code(410).send({ error: 'ERR_LOG_EXPIRED' }) } - // Set streaming headers — Content-Length enables browser download progress + // Content-Length lets the browser show download progress const filename = `miner-log-${meta.minerId || 'unknown'}-${id}.log` reply.header('Content-Type', 'application/octet-stream') reply.header('Content-Disposition', `attachment; filename="${filename}"`) diff --git a/workers/lib/server/handlers/minerLogs.handlers.js b/workers/lib/server/handlers/minerLogs.handlers.js index f437353..b7275e8 100644 --- a/workers/lib/server/handlers/minerLogs.handlers.js +++ b/workers/lib/server/handlers/minerLogs.handlers.js @@ -18,12 +18,6 @@ const { downloadLogFile } = require('./actions.handlers') * Uses the Hypercore P2P pipeline — no buffering. */ -/** - * POST /auth/miners/:minerId/download-logs - * - * Submits the downloadLogs action for the given miner and returns the jobId - * (which is the action ID) the client can use to poll status. - */ async function startMinerLogDownload (ctx, req, reply) { const { minerId } = req.params @@ -98,7 +92,6 @@ async function getMinerLogDownloadStatus (ctx, req, reply) { return reply.code(200).send({ status: 'pending', jobId }) } - // Walk targets to find the first successful downloadLogs result with a coreKey let meta = null let firstError = null @@ -141,13 +134,6 @@ async function getMinerLogDownloadStatus (ctx, req, reply) { }) } -/** - * GET /auth/miners/:minerId/download-logs/:jobId/file - * - * Streams the binary miner log directly to the HTTP client via the - * Hypercore P2P pipeline. Delegates entirely to downloadLogFile which - * handles action lookup, TTL check, streaming headers, and Hyperswarm transfer. - */ function getMinerLogFile (ctx, req, reply) { // downloadLogFile expects req.params.id — bridge from our :jobId param return downloadLogFile(ctx, { ...req, params: { ...req.params, id: req.params.jobId } }, reply) From ce6bbc8c44ad55a447c284e9fa2be86f555d0c73 Mon Sep 17 00:00:00 2001 From: Parag More Date: Mon, 1 Jun 2026 20:54:10 +0530 Subject: [PATCH 4/4] Fix LOGS_PEER_NOT_FOUND --- workers/lib/log-downloader.js | 38 +++++++++---------- .../lib/server/handlers/actions.handlers.js | 16 ++++---- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/workers/lib/log-downloader.js b/workers/lib/log-downloader.js index 81c733f..f61efc9 100644 --- a/workers/lib/log-downloader.js +++ b/workers/lib/log-downloader.js @@ -1,6 +1,6 @@ 'use strict' -const DEFAULT_PEER_TIMEOUT_MS = 30000 +const DEFAULT_PEER_TIMEOUT_MS = 60000 /** * Downloads miner log files from wrk-miner via Hypercore/Hyperswarm P2P. @@ -16,8 +16,9 @@ const DEFAULT_PEER_TIMEOUT_MS = 30000 * 3. Returns a Readable byte stream suitable for piping directly to an HTTP response * 4. Clears downloaded blocks and closes the Corestore session after the stream ends * - * One shared connection handler on net_r0.swarm routes connections to the correct - * in-flight download by matching peerInfo.topics against active download discoveryKeys. + * One shared connection handler on net_r0.swarm replicates all active downloads on + * every connection (net_r0.swarm is not shared with the RPC layer, so all connections + * on this swarm are log-transfer peers). */ class LogDownloader { constructor ({ netFac, storeFac, peerTimeoutMs } = {}) { @@ -37,14 +38,9 @@ class LogDownloader { if (this._swarmReady) return this._swarmReady = true - this._netFac.swarm.on('connection', (socket, peerInfo) => { - const topics = new Set( - (peerInfo.topics || []).map(t => Buffer.from(t).toString('hex')) - ) + this._netFac.swarm.on('connection', (socket) => { for (const [, entry] of this._downloads) { - if (topics.has(entry.discoveryKeyHex)) { - entry.core.replicate(socket) - } + entry.core.replicate(socket) } }) } @@ -68,17 +64,21 @@ class LogDownloader { const discovery = this._netFac.swarm.join(core.discoveryKey, { server: false, client: true }) const peersDone = core.findingPeers() - this._netFac.swarm.flush().then(peersDone, peersDone) + const peersDoneTimer = setTimeout(peersDone, this._peerTimeoutMs) - // Wait for the remote core's block tree to arrive (core.length > 0 required) - const updateResult = await Promise.race([ - core.update(), - new Promise((_, reject) => - setTimeout(() => reject(new Error('ERR_LOG_PEER_TIMEOUT')), this._peerTimeoutMs) - ) - ]) + try { + await Promise.race([ + core.update(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('ERR_LOG_PEER_TIMEOUT')), this._peerTimeoutMs) + ) + ]) + } finally { + clearTimeout(peersDoneTimer) + peersDone() + } - if (!core.length && (!updateResult || !updateResult.changed)) { + if (!core.length) { await this._cleanup(coreKeyHex, core, discovery) throw new Error('ERR_LOG_PEER_NOT_FOUND') } diff --git a/workers/lib/server/handlers/actions.handlers.js b/workers/lib/server/handlers/actions.handlers.js index 67db838..a3bbd20 100644 --- a/workers/lib/server/handlers/actions.handlers.js +++ b/workers/lib/server/handlers/actions.handlers.js @@ -173,13 +173,6 @@ async function downloadLogFile (ctx, req, reply) { return reply.code(410).send({ error: 'ERR_LOG_EXPIRED' }) } - // Content-Length lets the browser show download progress - const filename = `miner-log-${meta.minerId || 'unknown'}-${id}.log` - reply.header('Content-Type', 'application/octet-stream') - reply.header('Content-Disposition', `attachment; filename="${filename}"`) - reply.header('Content-Length', meta.byteLength) - reply.header('Cache-Control', 'no-store') - let stream try { stream = await ctx.logDownloader.stream(meta.coreKey, meta.byteLength) @@ -190,6 +183,15 @@ async function downloadLogFile (ctx, req, reply) { return reply.code(code).send({ error: err.message }) } + // Set headers only after stream is ready — if set before the try-catch and stream() + // throws, the error response would carry application/octet-stream content-type and + // Fastify would refuse to serialize the JSON error object. + const filename = `miner-log-${meta.minerId || 'unknown'}-${id}.log` + reply.header('Content-Type', 'application/octet-stream') + reply.header('Content-Disposition', `attachment; filename="${filename}"`) + reply.header('Content-Length', meta.byteLength) + reply.header('Cache-Control', 'no-store') + // Fastify pipes a Readable stream directly to the HTTP response — no buffering return reply.send(stream) }