From 7c23ec36372d93396910634ff4554ba6534315ba Mon Sep 17 00:00:00 2001 From: Hemant Tekwani Date: Thu, 4 Jun 2026 04:59:35 +0530 Subject: [PATCH] feat: forecast history endpoint --- .../unit/handlers/minerLogs.handlers.test.js | 3 ++- tests/unit/lib/log-downloader.test.js | 25 +++++++++---------- workers/http.node.wrk.js | 4 +-- workers/lib/constants.js | 8 ++++-- workers/lib/log-downloader.js | 2 +- .../lib/server/handlers/energy.handlers.js | 17 +++++++++++-- workers/lib/server/routes/energy.routes.js | 22 +++++++++++++++- 7 files changed, 59 insertions(+), 22 deletions(-) diff --git a/tests/unit/handlers/minerLogs.handlers.test.js b/tests/unit/handlers/minerLogs.handlers.test.js index 8bea9a4..487c8dc 100644 --- a/tests/unit/handlers/minerLogs.handlers.test.js +++ b/tests/unit/handlers/minerLogs.handlers.test.js @@ -165,7 +165,8 @@ test('startMinerLogDownload - uses minerId from route params', async (t) => { requestData: async (method, payload, callback) => { capturedPayload = payload const arr = [] - callback({ id: '99' }, arr) + const res = { id: '99' } + callback(res, arr) return arr } } diff --git a/tests/unit/lib/log-downloader.test.js b/tests/unit/lib/log-downloader.test.js index b77d41b..6d19b85 100644 --- a/tests/unit/lib/log-downloader.test.js +++ b/tests/unit/lib/log-downloader.test.js @@ -105,9 +105,9 @@ test('LogDownloader - constructor stores facilities', (t) => { t.pass() }) -test('LogDownloader - constructor defaults peerTimeoutMs to 30s', (t) => { +test('LogDownloader - constructor defaults peerTimeoutMs to 60s', (t) => { const dl = new LogDownloader({ netFac: makeNetFac(), storeFac: makeStoreFac() }) - t.is(dl._peerTimeoutMs, 30000, 'should default to 30000ms') + t.is(dl._peerTimeoutMs, 60000, 'should default to 60000ms') t.pass() }) @@ -305,7 +305,7 @@ test('LogDownloader - stream cleans up after stream end', async (t) => { t.is(dl._downloads.size, 1, 'precondition: download tracked before stream ends') rs.emit('end') - await new Promise(r => setImmediate(r)) + await new Promise(resolve => setImmediate(resolve)) t.is(dl._downloads.size, 0, 'should remove download from map after stream end') t.ok(coreClosed, 'should close core after stream end') @@ -331,7 +331,7 @@ test('LogDownloader - stream cleans up after stream error', async (t) => { await dl.stream(TEST_KEY_HEX, 100) rs.emit('error', new Error('connection dropped')) - await new Promise(r => setImmediate(r)) + await new Promise(resolve => setImmediate(resolve)) t.is(dl._downloads.size, 0, 'should remove download from map after stream error') t.ok(coreClosed, 'should close core after stream error') @@ -339,10 +339,10 @@ test('LogDownloader - stream cleans up after stream error', async (t) => { }) // ───────────────────────────────────────────────────────────────────────────── -// connection handler — topic-based routing +// connection handler — replicate all active downloads // ───────────────────────────────────────────────────────────────────────────── -test('LogDownloader - connection handler replicates core with matching topic', async (t) => { +test('LogDownloader - connection handler replicates core on swarm connection', async (t) => { const swarm = makeSwarm() const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } @@ -362,12 +362,12 @@ test('LogDownloader - connection handler replicates core with matching topic', a const fakeSocket = {} swarm.emit('connection', fakeSocket, { topics: [discoveryKeyBuf] }) - t.is(replicateCalls.length, 1, 'should replicate when topic matches discoveryKey') + t.is(replicateCalls.length, 1, 'should replicate active download on connection') t.ok(replicateCalls[0] === fakeSocket, 'should pass socket to replicate') t.pass() }) -test('LogDownloader - connection handler skips core with non-matching topic', async (t) => { +test('LogDownloader - connection handler replicates without inspecting peer topics', async (t) => { const swarm = makeSwarm() const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } @@ -381,14 +381,13 @@ test('LogDownloader - connection handler skips core with non-matching topic', as const dl = new LogDownloader({ netFac, storeFac }) await dl.stream(TEST_KEY_HEX, 100) - // Emit connection with a completely different topic swarm.emit('connection', {}, { topics: [Buffer.alloc(32).fill(0xff)] }) - t.is(replicateCalls.length, 0, 'should not replicate when topic does not match') + t.is(replicateCalls.length, 1, 'should replicate on every swarm connection') t.pass() }) -test('LogDownloader - connection handler handles empty topics list', async (t) => { +test('LogDownloader - connection handler replicates when connection has no topics', async (t) => { const swarm = makeSwarm() const netFac = { get swarm () { return swarm }, startSwarm: async () => {} } @@ -402,9 +401,9 @@ test('LogDownloader - connection handler handles empty topics list', async (t) = const dl = new LogDownloader({ netFac, storeFac }) await dl.stream(TEST_KEY_HEX, 100) - swarm.emit('connection', {}, { topics: [] }) + swarm.emit('connection', {}) - t.is(replicateCalls.length, 0, 'should not replicate when topics list is empty') + t.is(replicateCalls.length, 1, 'should replicate without peer topic metadata') t.pass() }) diff --git a/workers/http.node.wrk.js b/workers/http.node.wrk.js index 33029b8..97d2307 100644 --- a/workers/http.node.wrk.js +++ b/workers/http.node.wrk.js @@ -3,7 +3,7 @@ const async = require('async') const WebsocketPlugin = require('@fastify/websocket') const MultipartPlugin = require('@fastify/multipart') -const { WORK_ORDER_FILE_MAX_BYTES_DEFAULT } = require('./lib/constants') +const { WORK_ORDER_FILE_MAX_BYTES_DEFAULT, MICROSOFT_AUTH_SCOPE } = require('./lib/constants') const TetherWrkBase = require('@tetherto/tether-wrk-base/workers/base.wrk.tether') const AuthLib = require('./lib/auth') const debug = require('debug')('store:aggr') @@ -105,7 +105,7 @@ class WrkServerHttp extends TetherWrkBase { if (!this.noAuth) { httpd.addPlugin(httpdAuth.injection()) - httpd.addPlugin(httpdAuthMicrosoft.injection()) + httpd.addPlugin(httpdAuthMicrosoft.injection({ scope: MICROSOFT_AUTH_SCOPE })) } httpd.addPlugin([WebsocketPlugin, {}]) diff --git a/workers/lib/constants.js b/workers/lib/constants.js index 1b35d83..6eef72a 100644 --- a/workers/lib/constants.js +++ b/workers/lib/constants.js @@ -193,6 +193,7 @@ const ENDPOINTS = { EXPLORER_RACKS: '/auth/explorer/racks', // Energy endpoints ENERGY_FORECAST: '/auth/energy/forecast', + ENERGY_FORECAST_HISTORY: '/auth/energy/forecast/history', ENERGY_AVAILABLE: '/auth/energy/available', // Work Order endpoints WORK_ORDERS: '/auth/work-orders', @@ -577,7 +578,8 @@ const MINERPOOL_EXT_DATA_KEYS = { const ELECTRICITY_EXT_DATA_KEYS = { FORECAST: 'forecast', - AVAIL_ENERGY_MWH: 'availableEnergyMWh' + FORECAST_HISTORY: 'forecast-history', + AVAIL_ENERGY: 'availableEnergy' } const NON_METRIC_KEYS = [ @@ -697,6 +699,7 @@ const EXPLORER_RACK_AGGR_FIELDS = { const EXPLORER_RACK_DEFAULT_LIMIT = 20 const EXPLORER_RACK_MAX_LIMIT = 100 +const MICROSOFT_AUTH_SCOPE = ['openid', 'profile', 'email', 'User.Read'] module.exports = { SUPER_ADMIN_ROLE, @@ -772,5 +775,6 @@ module.exports = { WORK_ORDER_FILE_MAX_BYTES_DEFAULT, WORK_ORDER_FILE_COUNT_CAP_DEFAULT, WORK_ORDER_FILE_MIME_ALLOWLIST_DEFAULT, - WORK_ORDER_EXPORT_FORMATS + WORK_ORDER_EXPORT_FORMATS, + MICROSOFT_AUTH_SCOPE } diff --git a/workers/lib/log-downloader.js b/workers/lib/log-downloader.js index f61efc9..1a449e9 100644 --- a/workers/lib/log-downloader.js +++ b/workers/lib/log-downloader.js @@ -69,7 +69,7 @@ class LogDownloader { try { await Promise.race([ core.update(), - new Promise((_, reject) => + new Promise((resolve, reject) => setTimeout(() => reject(new Error('ERR_LOG_PEER_TIMEOUT')), this._peerTimeoutMs) ) ]) diff --git a/workers/lib/server/handlers/energy.handlers.js b/workers/lib/server/handlers/energy.handlers.js index 34d28f8..3c07466 100644 --- a/workers/lib/server/handlers/energy.handlers.js +++ b/workers/lib/server/handlers/energy.handlers.js @@ -11,17 +11,30 @@ const getEnergyForecast = async (ctx, req) => { }) } +const getEnergyForecastHistory = async (ctx, req) => { + const { start, end } = req.query + return await ctx.dataProxy.requestDataMap( + RPC_METHODS.GET_WRK_EXT_DATA, + { + type: WORKER_TYPES.ELECTRICITY, + query: { key: ELECTRICITY_EXT_DATA_KEYS.FORECAST_HISTORY }, + start, + end + }) +} + const setAvailableEnergy = async (ctx, req) => { return await ctx.dataProxy.requestDataMap( RPC_METHODS.SET_WRK_EXT_DATA, { type: WORKER_TYPES.ELECTRICITY, - key: ELECTRICITY_EXT_DATA_KEYS.AVAIL_ENERGY_MWH, + key: ELECTRICITY_EXT_DATA_KEYS.AVAIL_ENERGY, value: req.body.data }) } module.exports = { getEnergyForecast, - setAvailableEnergy + setAvailableEnergy, + getEnergyForecastHistory } diff --git a/workers/lib/server/routes/energy.routes.js b/workers/lib/server/routes/energy.routes.js index bab4d29..10116c0 100644 --- a/workers/lib/server/routes/energy.routes.js +++ b/workers/lib/server/routes/energy.routes.js @@ -1,7 +1,7 @@ 'use strict' const { ENDPOINTS, HTTP_METHODS, AUTH_CAPS } = require('../../constants') -const { getEnergyForecast, setAvailableEnergy } = require('../handlers/energy.handlers') +const { getEnergyForecast, setAvailableEnergy, getEnergyForecastHistory } = require('../handlers/energy.handlers') const { createCachedAuthRoute, createAuthRoute } = require('../lib/routeHelpers') const schemas = require('../schemas/energy.schemas') @@ -16,6 +16,26 @@ module.exports = (ctx) => [ getEnergyForecast ) }, + { + method: HTTP_METHODS.GET, + url: ENDPOINTS.ENERGY_FORECAST_HISTORY, + schema: { + querystring: { + type: 'object', + properties: { + start: { type: 'integer', minimum: 0 }, + end: { type: 'integer', minimum: 0 } + }, + required: ['start', 'end'] + } + }, + ...createCachedAuthRoute( + ctx, + (req) => ['energy-forecast-history'], + ENDPOINTS.ENERGY_FORECAST_HISTORY, + getEnergyForecastHistory + ) + }, { method: HTTP_METHODS.POST, url: ENDPOINTS.ENERGY_AVAILABLE,