Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/unit/handlers/minerLogs.handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ test('startMinerLogDownload - uses minerId from route params', async (t) => {
requestData: async (method, payload, callback) => {
capturedPayload = payload
const arr = []
callback({ id: '99' }, arr)
const res = { id: '99' }
callback(res, arr)
return arr
}
}
Expand Down
25 changes: 12 additions & 13 deletions tests/unit/lib/log-downloader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ test('LogDownloader - constructor stores facilities', (t) => {
t.pass()
})

test('LogDownloader - constructor defaults peerTimeoutMs to 30s', (t) => {
test('LogDownloader - constructor defaults peerTimeoutMs to 60s', (t) => {
const dl = new LogDownloader({ netFac: makeNetFac(), storeFac: makeStoreFac() })
t.is(dl._peerTimeoutMs, 30000, 'should default to 30000ms')
t.is(dl._peerTimeoutMs, 60000, 'should default to 60000ms')
t.pass()
})

Expand Down Expand Up @@ -305,7 +305,7 @@ test('LogDownloader - stream cleans up after stream end', async (t) => {
t.is(dl._downloads.size, 1, 'precondition: download tracked before stream ends')

rs.emit('end')
await new Promise(r => setImmediate(r))
await new Promise(resolve => setImmediate(resolve))

t.is(dl._downloads.size, 0, 'should remove download from map after stream end')
t.ok(coreClosed, 'should close core after stream end')
Expand All @@ -331,18 +331,18 @@ test('LogDownloader - stream cleans up after stream error', async (t) => {
await dl.stream(TEST_KEY_HEX, 100)

rs.emit('error', new Error('connection dropped'))
await new Promise(r => setImmediate(r))
await new Promise(resolve => setImmediate(resolve))

t.is(dl._downloads.size, 0, 'should remove download from map after stream error')
t.ok(coreClosed, 'should close core after stream error')
t.pass()
})

// ─────────────────────────────────────────────────────────────────────────────
// connection handler — topic-based routing
// connection handler — replicate all active downloads
// ─────────────────────────────────────────────────────────────────────────────

test('LogDownloader - connection handler replicates core with matching topic', async (t) => {
test('LogDownloader - connection handler replicates core on swarm connection', async (t) => {
const swarm = makeSwarm()
const netFac = { get swarm () { return swarm }, startSwarm: async () => {} }

Expand All @@ -362,12 +362,12 @@ test('LogDownloader - connection handler replicates core with matching topic', a
const fakeSocket = {}
swarm.emit('connection', fakeSocket, { topics: [discoveryKeyBuf] })

t.is(replicateCalls.length, 1, 'should replicate when topic matches discoveryKey')
t.is(replicateCalls.length, 1, 'should replicate active download on connection')
t.ok(replicateCalls[0] === fakeSocket, 'should pass socket to replicate')
t.pass()
})

test('LogDownloader - connection handler skips core with non-matching topic', async (t) => {
test('LogDownloader - connection handler replicates without inspecting peer topics', async (t) => {
const swarm = makeSwarm()
const netFac = { get swarm () { return swarm }, startSwarm: async () => {} }

Expand All @@ -381,14 +381,13 @@ test('LogDownloader - connection handler skips core with non-matching topic', as
const dl = new LogDownloader({ netFac, storeFac })
await dl.stream(TEST_KEY_HEX, 100)

// Emit connection with a completely different topic
swarm.emit('connection', {}, { topics: [Buffer.alloc(32).fill(0xff)] })

t.is(replicateCalls.length, 0, 'should not replicate when topic does not match')
t.is(replicateCalls.length, 1, 'should replicate on every swarm connection')
t.pass()
})

test('LogDownloader - connection handler handles empty topics list', async (t) => {
test('LogDownloader - connection handler replicates when connection has no topics', async (t) => {
const swarm = makeSwarm()
const netFac = { get swarm () { return swarm }, startSwarm: async () => {} }

Expand All @@ -402,9 +401,9 @@ test('LogDownloader - connection handler handles empty topics list', async (t) =
const dl = new LogDownloader({ netFac, storeFac })
await dl.stream(TEST_KEY_HEX, 100)

swarm.emit('connection', {}, { topics: [] })
swarm.emit('connection', {})

t.is(replicateCalls.length, 0, 'should not replicate when topics list is empty')
t.is(replicateCalls.length, 1, 'should replicate without peer topic metadata')
t.pass()
})

Expand Down
4 changes: 2 additions & 2 deletions workers/http.node.wrk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const async = require('async')
const WebsocketPlugin = require('@fastify/websocket')
const MultipartPlugin = require('@fastify/multipart')
const { WORK_ORDER_FILE_MAX_BYTES_DEFAULT } = require('./lib/constants')
const { WORK_ORDER_FILE_MAX_BYTES_DEFAULT, MICROSOFT_AUTH_SCOPE } = require('./lib/constants')
const TetherWrkBase = require('@tetherto/tether-wrk-base/workers/base.wrk.tether')
const AuthLib = require('./lib/auth')
const debug = require('debug')('store:aggr')
Expand Down Expand Up @@ -105,7 +105,7 @@ class WrkServerHttp extends TetherWrkBase {

if (!this.noAuth) {
httpd.addPlugin(httpdAuth.injection())
httpd.addPlugin(httpdAuthMicrosoft.injection())
httpd.addPlugin(httpdAuthMicrosoft.injection({ scope: MICROSOFT_AUTH_SCOPE }))
}

httpd.addPlugin([WebsocketPlugin, {}])
Expand Down
8 changes: 6 additions & 2 deletions workers/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ const ENDPOINTS = {
EXPLORER_RACKS: '/auth/explorer/racks',
// Energy endpoints
ENERGY_FORECAST: '/auth/energy/forecast',
ENERGY_FORECAST_HISTORY: '/auth/energy/forecast/history',
ENERGY_AVAILABLE: '/auth/energy/available',
// Work Order endpoints
WORK_ORDERS: '/auth/work-orders',
Expand Down Expand Up @@ -577,7 +578,8 @@ const MINERPOOL_EXT_DATA_KEYS = {

const ELECTRICITY_EXT_DATA_KEYS = {
FORECAST: 'forecast',
AVAIL_ENERGY_MWH: 'availableEnergyMWh'
FORECAST_HISTORY: 'forecast-history',
AVAIL_ENERGY: 'availableEnergy'
}

const NON_METRIC_KEYS = [
Expand Down Expand Up @@ -697,6 +699,7 @@ const EXPLORER_RACK_AGGR_FIELDS = {

const EXPLORER_RACK_DEFAULT_LIMIT = 20
const EXPLORER_RACK_MAX_LIMIT = 100
const MICROSOFT_AUTH_SCOPE = ['openid', 'profile', 'email', 'User.Read']

module.exports = {
SUPER_ADMIN_ROLE,
Expand Down Expand Up @@ -772,5 +775,6 @@ module.exports = {
WORK_ORDER_FILE_MAX_BYTES_DEFAULT,
WORK_ORDER_FILE_COUNT_CAP_DEFAULT,
WORK_ORDER_FILE_MIME_ALLOWLIST_DEFAULT,
WORK_ORDER_EXPORT_FORMATS
WORK_ORDER_EXPORT_FORMATS,
MICROSOFT_AUTH_SCOPE
}
2 changes: 1 addition & 1 deletion workers/lib/log-downloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class LogDownloader {
try {
await Promise.race([
core.update(),
new Promise((_, reject) =>
new Promise((resolve, reject) =>
setTimeout(() => reject(new Error('ERR_LOG_PEER_TIMEOUT')), this._peerTimeoutMs)
)
])
Expand Down
17 changes: 15 additions & 2 deletions workers/lib/server/handlers/energy.handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,30 @@ const getEnergyForecast = async (ctx, req) => {
})
}

const getEnergyForecastHistory = async (ctx, req) => {
const { start, end } = req.query
return await ctx.dataProxy.requestDataMap(
RPC_METHODS.GET_WRK_EXT_DATA,
{
type: WORKER_TYPES.ELECTRICITY,
query: { key: ELECTRICITY_EXT_DATA_KEYS.FORECAST_HISTORY },
start,
end
})
}

const setAvailableEnergy = async (ctx, req) => {
return await ctx.dataProxy.requestDataMap(
RPC_METHODS.SET_WRK_EXT_DATA,
{
type: WORKER_TYPES.ELECTRICITY,
key: ELECTRICITY_EXT_DATA_KEYS.AVAIL_ENERGY_MWH,
key: ELECTRICITY_EXT_DATA_KEYS.AVAIL_ENERGY,
value: req.body.data
})
}

module.exports = {
getEnergyForecast,
setAvailableEnergy
setAvailableEnergy,
getEnergyForecastHistory
}
22 changes: 21 additions & 1 deletion workers/lib/server/routes/energy.routes.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { ENDPOINTS, HTTP_METHODS, AUTH_CAPS } = require('../../constants')
const { getEnergyForecast, setAvailableEnergy } = require('../handlers/energy.handlers')
const { getEnergyForecast, setAvailableEnergy, getEnergyForecastHistory } = require('../handlers/energy.handlers')
const { createCachedAuthRoute, createAuthRoute } = require('../lib/routeHelpers')
const schemas = require('../schemas/energy.schemas')

Expand All @@ -16,6 +16,26 @@ module.exports = (ctx) => [
getEnergyForecast
)
},
{
method: HTTP_METHODS.GET,
url: ENDPOINTS.ENERGY_FORECAST_HISTORY,
schema: {
querystring: {
type: 'object',
properties: {
start: { type: 'integer', minimum: 0 },
end: { type: 'integer', minimum: 0 }
},
required: ['start', 'end']
}
},
...createCachedAuthRoute(
ctx,
(req) => ['energy-forecast-history'],
ENDPOINTS.ENERGY_FORECAST_HISTORY,
getEnergyForecastHistory
)
},
{
method: HTTP_METHODS.POST,
url: ENDPOINTS.ENERGY_AVAILABLE,
Expand Down
Loading