diff --git a/src/__tests__/it-sigint-worker-pool.test.ts b/src/__tests__/it-sigint-worker-pool.test.ts new file mode 100644 index 0000000..f3831c4 --- /dev/null +++ b/src/__tests__/it-sigint-worker-pool.test.ts @@ -0,0 +1,180 @@ +/** + * Integration test: SIGINT abort signal propagation in worker pool. + * + * Verifies that: + * - AbortSignal is passed to tasks even when concurrency=1 (sequential mode) + * - Aborting the controller causes the signal to fire, enabling task interruption + * - The SIGINT handler in parallelExecution correctly aborts the controller + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { TaskInfo } from '../infra/task/index.js'; + +vi.mock('../shared/ui/index.js', () => ({ + header: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + success: vi.fn(), + status: vi.fn(), + blankLine: vi.fn(), +})); + +vi.mock('../shared/exitCodes.js', () => ({ + EXIT_SIGINT: 130, +})); + +vi.mock('../shared/i18n/index.js', () => ({ + getLabel: vi.fn((key: string) => key), +})); + +vi.mock('../shared/utils/index.js', async (importOriginal) => ({ + ...(await importOriginal>()), + createLogger: () => ({ + info: vi.fn(), + debug: vi.fn(), + error: vi.fn(), + }), +})); + +const mockExecuteAndCompleteTask = vi.fn(); + +vi.mock('../features/tasks/execute/taskExecution.js', () => ({ + executeAndCompleteTask: (...args: unknown[]) => mockExecuteAndCompleteTask(...args), +})); + +import { runWithWorkerPool } from '../features/tasks/execute/parallelExecution.js'; + +function createTask(name: string): TaskInfo { + return { + name, + content: `Task: ${name}`, + filePath: `/tasks/${name}.yaml`, + }; +} + +function createMockTaskRunner() { + return { + getNextTask: vi.fn(() => null), + claimNextTasks: vi.fn(() => []), + completeTask: vi.fn(), + failTask: vi.fn(), + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + mockExecuteAndCompleteTask.mockResolvedValue(true); +}); + +describe('worker pool: abort signal propagation', () => { + let savedSigintListeners: ((...args: unknown[]) => void)[]; + + beforeEach(() => { + savedSigintListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[]; + }); + + afterEach(() => { + process.removeAllListeners('SIGINT'); + for (const listener of savedSigintListeners) { + process.on('SIGINT', listener as NodeJS.SignalsListener); + } + }); + + it('should pass abortSignal to tasks in sequential mode (concurrency=1)', async () => { + // Given + const tasks = [createTask('task-1')]; + const runner = createMockTaskRunner(); + const receivedSignals: (AbortSignal | undefined)[] = []; + + mockExecuteAndCompleteTask.mockImplementation( + (_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => { + receivedSignals.push(parallelOpts?.abortSignal); + return Promise.resolve(true); + }, + ); + + // When + await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default', undefined, 50); + + // Then: AbortSignal is passed even with concurrency=1 + expect(receivedSignals).toHaveLength(1); + expect(receivedSignals[0]).toBeInstanceOf(AbortSignal); + }); + + it('should abort the signal when SIGINT fires in sequential mode', async () => { + // Given + const tasks = [createTask('long-task')]; + const runner = createMockTaskRunner(); + let capturedSignal: AbortSignal | undefined; + + mockExecuteAndCompleteTask.mockImplementation( + (_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => { + capturedSignal = parallelOpts?.abortSignal; + return new Promise((resolve) => { + // Wait long enough for SIGINT to fire + setTimeout(() => resolve(true), 200); + }); + }, + ); + + // Start execution + const resultPromise = runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default', undefined, 50); + + // Wait for task to start + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Find the SIGINT handler added by runWithWorkerPool + const allListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[]; + const newListener = allListeners.find((l) => !savedSigintListeners.includes(l)); + expect(newListener).toBeDefined(); + + // Simulate SIGINT + newListener!(); + + // Wait for execution to complete + await resultPromise; + + // Then: The abort signal should have been triggered + expect(capturedSignal).toBeInstanceOf(AbortSignal); + expect(capturedSignal!.aborted).toBe(true); + }); + + it('should share the same AbortSignal across sequential and parallel tasks', async () => { + // Given: Multiple tasks in both sequential (concurrency=1) and parallel (concurrency=2) + const tasks = [createTask('t1'), createTask('t2')]; + const runner = createMockTaskRunner(); + + const receivedSignalsSeq: (AbortSignal | undefined)[] = []; + const receivedSignalsPar: (AbortSignal | undefined)[] = []; + + mockExecuteAndCompleteTask.mockImplementation( + (_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => { + receivedSignalsSeq.push(parallelOpts?.abortSignal); + return Promise.resolve(true); + }, + ); + + // Sequential mode + await runWithWorkerPool(runner as never, [...tasks], 1, '/cwd', 'default', undefined, 50); + + mockExecuteAndCompleteTask.mockClear(); + mockExecuteAndCompleteTask.mockImplementation( + (_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => { + receivedSignalsPar.push(parallelOpts?.abortSignal); + return Promise.resolve(true); + }, + ); + + // Parallel mode + await runWithWorkerPool(runner as never, [...tasks], 2, '/cwd', 'default', undefined, 50); + + // Then: Both modes pass AbortSignal + for (const signal of receivedSignalsSeq) { + expect(signal).toBeInstanceOf(AbortSignal); + } + for (const signal of receivedSignalsPar) { + expect(signal).toBeInstanceOf(AbortSignal); + } + }); +}); diff --git a/src/__tests__/runAllTasks-concurrency.test.ts b/src/__tests__/runAllTasks-concurrency.test.ts index 2ea5f30..677e786 100644 --- a/src/__tests__/runAllTasks-concurrency.test.ts +++ b/src/__tests__/runAllTasks-concurrency.test.ts @@ -449,7 +449,7 @@ describe('runAllTasks concurrency', () => { expect(pieceOptions).toHaveProperty('taskPrefix', 'parallel-task'); }); - it('should not pass abortSignal or taskPrefix in sequential mode', async () => { + it('should pass abortSignal but not taskPrefix in sequential mode', async () => { // Given: Sequential mode mockLoadGlobalConfig.mockReturnValue({ language: 'en', @@ -470,11 +470,11 @@ describe('runAllTasks concurrency', () => { // When await runAllTasks('/project'); - // Then: executePiece should not have abortSignal or taskPrefix + // Then: executePiece should have abortSignal but not taskPrefix expect(mockExecutePiece).toHaveBeenCalledTimes(1); const callArgs = mockExecutePiece.mock.calls[0]; const pieceOptions = callArgs?.[3]; - expect(pieceOptions?.abortSignal).toBeUndefined(); + expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal); expect(pieceOptions?.taskPrefix).toBeUndefined(); }); }); diff --git a/src/__tests__/taskExecution.test.ts b/src/__tests__/taskExecution.test.ts index 023e317..5d5eb24 100644 --- a/src/__tests__/taskExecution.test.ts +++ b/src/__tests__/taskExecution.test.ts @@ -491,4 +491,24 @@ describe('resolveTaskExecution', () => { // Then expect(result.issueNumber).toBeUndefined(); }); + + it('should not start clone creation when abortSignal is already aborted', async () => { + // Given: Worktree task with pre-aborted signal + const task: TaskInfo = { + name: 'aborted-before-clone', + content: 'Task content', + filePath: '/tasks/task.yaml', + data: { + task: 'Task content', + worktree: true, + }, + }; + const controller = new AbortController(); + controller.abort(); + + // When / Then + await expect(resolveTaskExecution(task, '/project', 'default', controller.signal)).rejects.toThrow('Task execution aborted'); + expect(mockSummarizeTaskName).not.toHaveBeenCalled(); + expect(mockCreateSharedClone).not.toHaveBeenCalled(); + }); }); diff --git a/src/__tests__/workerPool.test.ts b/src/__tests__/workerPool.test.ts index c1b2896..7254ab8 100644 --- a/src/__tests__/workerPool.test.ts +++ b/src/__tests__/workerPool.test.ts @@ -142,7 +142,7 @@ describe('runWithWorkerPool', () => { }); }); - it('should not pass taskPrefix or abortSignal for sequential execution (concurrency = 1)', async () => { + it('should pass abortSignal but not taskPrefix for sequential execution (concurrency = 1)', async () => { // Given const tasks = [createTask('seq-task')]; const runner = createMockTaskRunner([]); @@ -154,7 +154,7 @@ describe('runWithWorkerPool', () => { expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1); const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5]; expect(parallelOpts).toEqual({ - abortSignal: undefined, + abortSignal: expect.any(AbortSignal), taskPrefix: undefined, taskColorIndex: undefined, }); @@ -250,6 +250,51 @@ describe('runWithWorkerPool', () => { expect(result).toEqual({ success: 0, fail: 1 }); }); + it('should wait for in-flight tasks to settle after SIGINT before returning', async () => { + // Given: Two running tasks that resolve after abort is triggered. + const tasks = [createTask('t1'), createTask('t2')]; + const runner = createMockTaskRunner([]); + const deferred: Array<() => void> = []; + const startedSignals: AbortSignal[] = []; + + mockExecuteAndCompleteTask.mockImplementation((_task, _runner, _cwd, _piece, _opts, parallelOpts) => { + const signal = parallelOpts?.abortSignal; + if (signal) startedSignals.push(signal); + return new Promise((resolve) => { + if (signal) { + signal.addEventListener('abort', () => deferred.push(() => resolve(false)), { once: true }); + } else { + deferred.push(() => resolve(true)); + } + }); + }); + + const resultPromise = runWithWorkerPool( + runner as never, tasks, 2, '/cwd', 'default', undefined, TEST_POLL_INTERVAL_MS, + ); + + await new Promise((resolve) => setTimeout(resolve, 10)); + + const sigintListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[]; + const handler = sigintListeners[sigintListeners.length - 1]; + expect(handler).toBeDefined(); + handler!(); + + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(startedSignals).toHaveLength(2); + for (const signal of startedSignals) { + expect(signal.aborted).toBe(true); + } + + for (const resolveTask of deferred) { + resolveTask(); + } + + // Then: pool returns after in-flight tasks settle, counting them as failures. + const result = await resultPromise; + expect(result).toEqual({ success: 0, fail: 2 }); + }); + describe('polling', () => { it('should pick up tasks added during execution via polling', async () => { // Given: 1 initial task running with concurrency=2, a second task appears via poll diff --git a/src/features/tasks/execute/parallelExecution.ts b/src/features/tasks/execute/parallelExecution.ts index 67a3c4c..6878b12 100644 --- a/src/features/tasks/execute/parallelExecution.ts +++ b/src/features/tasks/execute/parallelExecution.ts @@ -107,18 +107,14 @@ export async function runWithWorkerPool( try { while (queue.length > 0 || active.size > 0) { - if (abortController.signal.aborted) { - break; + if (!abortController.signal.aborted) { + fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter); } - fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter); - if (active.size === 0) { break; } - const pollTimer = createPollTimer(pollIntervalMs, abortController.signal); - const completionPromises: Promise[] = [...active.keys()].map((p) => p.then( (result): RaceResult => ({ type: 'completion', promise: p, result }), @@ -126,9 +122,18 @@ export async function runWithWorkerPool( ), ); - const settled = await Promise.race([...completionPromises, pollTimer.promise]); - - pollTimer.cancel(); + let settled: RaceResult; + if (abortController.signal.aborted) { + // Graceful shutdown: stop scheduling new work but wait for in-flight tasks to settle. + settled = await Promise.race(completionPromises); + } else { + const pollTimer = createPollTimer(pollIntervalMs, abortController.signal); + try { + settled = await Promise.race([...completionPromises, pollTimer.promise]); + } finally { + pollTimer.cancel(); + } + } if (settled.type === 'completion') { const task = active.get(settled.promise); @@ -189,7 +194,7 @@ function fillSlots( } const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { - abortSignal: isParallel ? abortController.signal : undefined, + abortSignal: abortController.signal, taskPrefix: isParallel ? task.name : undefined, taskColorIndex: isParallel ? colorIndex : undefined, }); diff --git a/src/features/tasks/execute/resolveTask.ts b/src/features/tasks/execute/resolveTask.ts index 9f4ed82..8a74c93 100644 --- a/src/features/tasks/execute/resolveTask.ts +++ b/src/features/tasks/execute/resolveTask.ts @@ -18,6 +18,12 @@ export interface ResolvedTaskExecution { issueNumber?: number; } +function throwIfAborted(signal?: AbortSignal): void { + if (signal?.aborted) { + throw new Error('Task execution aborted'); + } +} + /** * Resolve execution directory and piece from task data. * If the task has worktree settings, create a shared clone and use it as cwd. @@ -27,7 +33,10 @@ export async function resolveTaskExecution( task: TaskInfo, defaultCwd: string, defaultPiece: string, + abortSignal?: AbortSignal, ): Promise { + throwIfAborted(abortSignal); + const data = task.data; if (!data) { return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false }; @@ -39,10 +48,12 @@ export async function resolveTaskExecution( let baseBranch: string | undefined; if (data.worktree) { + throwIfAborted(abortSignal); baseBranch = getCurrentBranch(defaultCwd); info('Generating branch name...'); const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd }); + throwIfAborted(abortSignal); info('Creating clone...'); const result = createSharedClone(defaultCwd, { worktree: data.worktree, @@ -50,6 +61,7 @@ export async function resolveTaskExecution( taskSlug, issueNumber: data.issue, }); + throwIfAborted(abortSignal); execCwd = result.path; branch = result.branch; isWorktree = true; diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 4798cc9..876652c 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -128,7 +128,7 @@ export async function executeAndCompleteTask( } try { - const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName); + const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName, taskAbortSignal); // cwd is always the project root; pass it as projectCwd so reports/sessions go there const taskRunResult = await executeTaskWithResult({