diff --git a/src/watch.ts b/src/watch.ts index 85058117e2e..12e2df6a2c9 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -44,7 +44,6 @@ export class Watch { const signal = AbortSignal.any([controller.signal, timeoutSignal]); const ctx = new RequestContext(watchURL.toString(), HttpMethod.GET); - await this.config.applySecurityAuthentication(ctx); let doneCalled: boolean = false; const doneCallOnce = (err: any) => { @@ -59,45 +58,50 @@ export class Watch { } }; - try { - const response = await fetch(watchURL, { - method: 'GET', - headers: ctx.getHeaders(), - dispatcher: ctx.getDispatcher(), - signal, - }); + const startWatch = async (): Promise => { + try { + await this.config.applySecurityAuthentication(ctx); - if (response.status === 200) { - const body = Readable.fromWeb(response.body! as any); + const response = await fetch(watchURL, { + method: 'GET', + headers: ctx.getHeaders(), + dispatcher: ctx.getDispatcher(), + signal, + }); - body.on('error', doneCallOnce); - body.on('close', () => doneCallOnce(null)); - body.on('finish', () => doneCallOnce(null)); + if (response.status === 200) { + const body = Readable.fromWeb(response.body! as any); - const lines = createInterface(body); - lines.on('error', doneCallOnce); - lines.on('close', () => doneCallOnce(null)); - lines.on('finish', () => doneCallOnce(null)); - lines.on('line', (line) => { - try { - const data = JSON.parse(line.toString()); - callback(data.type, data.object, data); - } catch { - // ignore parse errors - } - }); - } else { - const statusText = - response.statusText || STATUS_CODES[response.status] || 'Internal Server Error'; - const error = new Error(statusText) as Error & { - statusCode: number | undefined; - }; - error.statusCode = response.status; - throw error; + body.on('error', doneCallOnce); + body.on('close', () => doneCallOnce(null)); + body.on('finish', () => doneCallOnce(null)); + + const lines = createInterface(body); + lines.on('error', doneCallOnce); + lines.on('close', () => doneCallOnce(null)); + lines.on('finish', () => doneCallOnce(null)); + lines.on('line', (line) => { + try { + const data = JSON.parse(line.toString()); + callback(data.type, data.object, data); + } catch { + // ignore parse errors + } + }); + } else { + const statusText = + response.statusText || STATUS_CODES[response.status] || 'Internal Server Error'; + const error = new Error(statusText) as Error & { + statusCode: number | undefined; + }; + error.statusCode = response.status; + throw error; + } + } catch (err) { + doneCallOnce(err); } - } catch (err) { - doneCallOnce(err); - } + }; + startWatch().catch(doneCallOnce); return controller; } diff --git a/src/watch_test.ts b/src/watch_test.ts index 4aad61a05d9..b470f0c3c24 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -65,6 +65,10 @@ describe('Watch', () => { let doneCalled = false; let doneErr: any; + let doneResolve!: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); await watch.watch( path, @@ -73,8 +77,10 @@ describe('Watch', () => { (err: any) => { doneCalled = true; doneErr = err; + doneResolve(); }, ); + await donePromise; strictEqual(doneCalled, true); strictEqual(doneErr.toString(), 'Error: Internal Server Error'); mockAgent.assertNoPendingInterceptors(); @@ -170,7 +176,7 @@ describe('Watch', () => { const watch = new Watch(kc); let doneCalled = 0; - let doneResolve: () => void; + let doneResolve!: () => void; const donePromise = new Promise((resolve) => { doneResolve = resolve; @@ -364,7 +370,7 @@ describe('Watch', () => { let doneErr: any; - let doneResolve: () => void; + let doneResolve!: () => void; const donePromise = new Promise((resolve) => { doneResolve = resolve; }); @@ -386,6 +392,45 @@ describe('Watch', () => { strictEqual(doneErr.name, 'TimeoutError'); }); + it('should return abort controller before receiving response data', async (t) => { + const kc = await setupMockSystem(t, (_req: any, _res: any) => { + // Intentionally do not write headers/body so fetch stays pending. + }); + const watch = new Watch(kc); + + let doneErr: any; + + let doneResolve!: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + const controllerPromise = watch.watch( + '/some/path/to/object', + {}, + () => { + throw new Error('Unexpected data received'); + }, + (err: any) => { + doneErr = err; + doneResolve(); + }, + ); + + const controller = await Promise.race([ + controllerPromise, + new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('watch() did not return AbortController in time')); + }, 100); + }), + ]); + + controller.abort(); + await donePromise; + strictEqual(doneErr?.name, 'AbortError'); + }); + it('should throw on empty config', async () => { const kc = new KubeConfig(); const watch = new Watch(kc);