Ctrl+C周りの挙動修正

This commit is contained in:
nrslib 2026-02-10 06:10:15 +09:00
parent 28392b113a
commit 0145928061
7 changed files with 278 additions and 16 deletions

View File

@ -0,0 +1,180 @@
/**
* Integration test: SIGINT abort signal propagation in worker pool.
*
* Verifies that:
* - AbortSignal is passed to tasks even when concurrency=1 (sequential mode)
* - Aborting the controller causes the signal to fire, enabling task interruption
* - The SIGINT handler in parallelExecution correctly aborts the controller
*/
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { TaskInfo } from '../infra/task/index.js';
vi.mock('../shared/ui/index.js', () => ({
header: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
success: vi.fn(),
status: vi.fn(),
blankLine: vi.fn(),
}));
vi.mock('../shared/exitCodes.js', () => ({
EXIT_SIGINT: 130,
}));
vi.mock('../shared/i18n/index.js', () => ({
getLabel: vi.fn((key: string) => key),
}));
vi.mock('../shared/utils/index.js', async (importOriginal) => ({
...(await importOriginal<Record<string, unknown>>()),
createLogger: () => ({
info: vi.fn(),
debug: vi.fn(),
error: vi.fn(),
}),
}));
const mockExecuteAndCompleteTask = vi.fn();
vi.mock('../features/tasks/execute/taskExecution.js', () => ({
executeAndCompleteTask: (...args: unknown[]) => mockExecuteAndCompleteTask(...args),
}));
import { runWithWorkerPool } from '../features/tasks/execute/parallelExecution.js';
function createTask(name: string): TaskInfo {
return {
name,
content: `Task: ${name}`,
filePath: `/tasks/${name}.yaml`,
};
}
function createMockTaskRunner() {
return {
getNextTask: vi.fn(() => null),
claimNextTasks: vi.fn(() => []),
completeTask: vi.fn(),
failTask: vi.fn(),
};
}
beforeEach(() => {
vi.clearAllMocks();
mockExecuteAndCompleteTask.mockResolvedValue(true);
});
describe('worker pool: abort signal propagation', () => {
let savedSigintListeners: ((...args: unknown[]) => void)[];
beforeEach(() => {
savedSigintListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[];
});
afterEach(() => {
process.removeAllListeners('SIGINT');
for (const listener of savedSigintListeners) {
process.on('SIGINT', listener as NodeJS.SignalsListener);
}
});
it('should pass abortSignal to tasks in sequential mode (concurrency=1)', async () => {
// Given
const tasks = [createTask('task-1')];
const runner = createMockTaskRunner();
const receivedSignals: (AbortSignal | undefined)[] = [];
mockExecuteAndCompleteTask.mockImplementation(
(_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => {
receivedSignals.push(parallelOpts?.abortSignal);
return Promise.resolve(true);
},
);
// When
await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default', undefined, 50);
// Then: AbortSignal is passed even with concurrency=1
expect(receivedSignals).toHaveLength(1);
expect(receivedSignals[0]).toBeInstanceOf(AbortSignal);
});
it('should abort the signal when SIGINT fires in sequential mode', async () => {
// Given
const tasks = [createTask('long-task')];
const runner = createMockTaskRunner();
let capturedSignal: AbortSignal | undefined;
mockExecuteAndCompleteTask.mockImplementation(
(_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => {
capturedSignal = parallelOpts?.abortSignal;
return new Promise((resolve) => {
// Wait long enough for SIGINT to fire
setTimeout(() => resolve(true), 200);
});
},
);
// Start execution
const resultPromise = runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default', undefined, 50);
// Wait for task to start
await new Promise((resolve) => setTimeout(resolve, 20));
// Find the SIGINT handler added by runWithWorkerPool
const allListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[];
const newListener = allListeners.find((l) => !savedSigintListeners.includes(l));
expect(newListener).toBeDefined();
// Simulate SIGINT
newListener!();
// Wait for execution to complete
await resultPromise;
// Then: The abort signal should have been triggered
expect(capturedSignal).toBeInstanceOf(AbortSignal);
expect(capturedSignal!.aborted).toBe(true);
});
it('should share the same AbortSignal across sequential and parallel tasks', async () => {
// Given: Multiple tasks in both sequential (concurrency=1) and parallel (concurrency=2)
const tasks = [createTask('t1'), createTask('t2')];
const runner = createMockTaskRunner();
const receivedSignalsSeq: (AbortSignal | undefined)[] = [];
const receivedSignalsPar: (AbortSignal | undefined)[] = [];
mockExecuteAndCompleteTask.mockImplementation(
(_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => {
receivedSignalsSeq.push(parallelOpts?.abortSignal);
return Promise.resolve(true);
},
);
// Sequential mode
await runWithWorkerPool(runner as never, [...tasks], 1, '/cwd', 'default', undefined, 50);
mockExecuteAndCompleteTask.mockClear();
mockExecuteAndCompleteTask.mockImplementation(
(_task: unknown, _runner: unknown, _cwd: unknown, _piece: unknown, _opts: unknown, parallelOpts: { abortSignal?: AbortSignal }) => {
receivedSignalsPar.push(parallelOpts?.abortSignal);
return Promise.resolve(true);
},
);
// Parallel mode
await runWithWorkerPool(runner as never, [...tasks], 2, '/cwd', 'default', undefined, 50);
// Then: Both modes pass AbortSignal
for (const signal of receivedSignalsSeq) {
expect(signal).toBeInstanceOf(AbortSignal);
}
for (const signal of receivedSignalsPar) {
expect(signal).toBeInstanceOf(AbortSignal);
}
});
});

View File

@ -449,7 +449,7 @@ describe('runAllTasks concurrency', () => {
expect(pieceOptions).toHaveProperty('taskPrefix', 'parallel-task');
});
it('should not pass abortSignal or taskPrefix in sequential mode', async () => {
it('should pass abortSignal but not taskPrefix in sequential mode', async () => {
// Given: Sequential mode
mockLoadGlobalConfig.mockReturnValue({
language: 'en',
@ -470,11 +470,11 @@ describe('runAllTasks concurrency', () => {
// When
await runAllTasks('/project');
// Then: executePiece should not have abortSignal or taskPrefix
// Then: executePiece should have abortSignal but not taskPrefix
expect(mockExecutePiece).toHaveBeenCalledTimes(1);
const callArgs = mockExecutePiece.mock.calls[0];
const pieceOptions = callArgs?.[3];
expect(pieceOptions?.abortSignal).toBeUndefined();
expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal);
expect(pieceOptions?.taskPrefix).toBeUndefined();
});
});

View File

@ -491,4 +491,24 @@ describe('resolveTaskExecution', () => {
// Then
expect(result.issueNumber).toBeUndefined();
});
it('should not start clone creation when abortSignal is already aborted', async () => {
// Given: Worktree task with pre-aborted signal
const task: TaskInfo = {
name: 'aborted-before-clone',
content: 'Task content',
filePath: '/tasks/task.yaml',
data: {
task: 'Task content',
worktree: true,
},
};
const controller = new AbortController();
controller.abort();
// When / Then
await expect(resolveTaskExecution(task, '/project', 'default', controller.signal)).rejects.toThrow('Task execution aborted');
expect(mockSummarizeTaskName).not.toHaveBeenCalled();
expect(mockCreateSharedClone).not.toHaveBeenCalled();
});
});

View File

@ -142,7 +142,7 @@ describe('runWithWorkerPool', () => {
});
});
it('should not pass taskPrefix or abortSignal for sequential execution (concurrency = 1)', async () => {
it('should pass abortSignal but not taskPrefix for sequential execution (concurrency = 1)', async () => {
// Given
const tasks = [createTask('seq-task')];
const runner = createMockTaskRunner([]);
@ -154,7 +154,7 @@ describe('runWithWorkerPool', () => {
expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1);
const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5];
expect(parallelOpts).toEqual({
abortSignal: undefined,
abortSignal: expect.any(AbortSignal),
taskPrefix: undefined,
taskColorIndex: undefined,
});
@ -250,6 +250,51 @@ describe('runWithWorkerPool', () => {
expect(result).toEqual({ success: 0, fail: 1 });
});
it('should wait for in-flight tasks to settle after SIGINT before returning', async () => {
// Given: Two running tasks that resolve after abort is triggered.
const tasks = [createTask('t1'), createTask('t2')];
const runner = createMockTaskRunner([]);
const deferred: Array<() => void> = [];
const startedSignals: AbortSignal[] = [];
mockExecuteAndCompleteTask.mockImplementation((_task, _runner, _cwd, _piece, _opts, parallelOpts) => {
const signal = parallelOpts?.abortSignal;
if (signal) startedSignals.push(signal);
return new Promise<boolean>((resolve) => {
if (signal) {
signal.addEventListener('abort', () => deferred.push(() => resolve(false)), { once: true });
} else {
deferred.push(() => resolve(true));
}
});
});
const resultPromise = runWithWorkerPool(
runner as never, tasks, 2, '/cwd', 'default', undefined, TEST_POLL_INTERVAL_MS,
);
await new Promise((resolve) => setTimeout(resolve, 10));
const sigintListeners = process.rawListeners('SIGINT') as ((...args: unknown[]) => void)[];
const handler = sigintListeners[sigintListeners.length - 1];
expect(handler).toBeDefined();
handler!();
await new Promise((resolve) => setTimeout(resolve, 10));
expect(startedSignals).toHaveLength(2);
for (const signal of startedSignals) {
expect(signal.aborted).toBe(true);
}
for (const resolveTask of deferred) {
resolveTask();
}
// Then: pool returns after in-flight tasks settle, counting them as failures.
const result = await resultPromise;
expect(result).toEqual({ success: 0, fail: 2 });
});
describe('polling', () => {
it('should pick up tasks added during execution via polling', async () => {
// Given: 1 initial task running with concurrency=2, a second task appears via poll

View File

@ -107,18 +107,14 @@ export async function runWithWorkerPool(
try {
while (queue.length > 0 || active.size > 0) {
if (abortController.signal.aborted) {
break;
}
if (!abortController.signal.aborted) {
fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter);
}
if (active.size === 0) {
break;
}
const pollTimer = createPollTimer(pollIntervalMs, abortController.signal);
const completionPromises: Promise<RaceResult>[] = [...active.keys()].map((p) =>
p.then(
(result): RaceResult => ({ type: 'completion', promise: p, result }),
@ -126,9 +122,18 @@ export async function runWithWorkerPool(
),
);
const settled = await Promise.race([...completionPromises, pollTimer.promise]);
let settled: RaceResult;
if (abortController.signal.aborted) {
// Graceful shutdown: stop scheduling new work but wait for in-flight tasks to settle.
settled = await Promise.race(completionPromises);
} else {
const pollTimer = createPollTimer(pollIntervalMs, abortController.signal);
try {
settled = await Promise.race([...completionPromises, pollTimer.promise]);
} finally {
pollTimer.cancel();
}
}
if (settled.type === 'completion') {
const task = active.get(settled.promise);
@ -189,7 +194,7 @@ function fillSlots(
}
const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
abortSignal: isParallel ? abortController.signal : undefined,
abortSignal: abortController.signal,
taskPrefix: isParallel ? task.name : undefined,
taskColorIndex: isParallel ? colorIndex : undefined,
});

View File

@ -18,6 +18,12 @@ export interface ResolvedTaskExecution {
issueNumber?: number;
}
function throwIfAborted(signal?: AbortSignal): void {
if (signal?.aborted) {
throw new Error('Task execution aborted');
}
}
/**
* Resolve execution directory and piece from task data.
* If the task has worktree settings, create a shared clone and use it as cwd.
@ -27,7 +33,10 @@ export async function resolveTaskExecution(
task: TaskInfo,
defaultCwd: string,
defaultPiece: string,
abortSignal?: AbortSignal,
): Promise<ResolvedTaskExecution> {
throwIfAborted(abortSignal);
const data = task.data;
if (!data) {
return { execCwd: defaultCwd, execPiece: defaultPiece, isWorktree: false };
@ -39,10 +48,12 @@ export async function resolveTaskExecution(
let baseBranch: string | undefined;
if (data.worktree) {
throwIfAborted(abortSignal);
baseBranch = getCurrentBranch(defaultCwd);
info('Generating branch name...');
const taskSlug = await summarizeTaskName(task.content, { cwd: defaultCwd });
throwIfAborted(abortSignal);
info('Creating clone...');
const result = createSharedClone(defaultCwd, {
worktree: data.worktree,
@ -50,6 +61,7 @@ export async function resolveTaskExecution(
taskSlug,
issueNumber: data.issue,
});
throwIfAborted(abortSignal);
execCwd = result.path;
branch = result.branch;
isWorktree = true;

View File

@ -128,7 +128,7 @@ export async function executeAndCompleteTask(
}
try {
const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName);
const { execCwd, execPiece, isWorktree, branch, baseBranch, startMovement, retryNote, autoPr, issueNumber } = await resolveTaskExecution(task, cwd, pieceName, taskAbortSignal);
// cwd is always the project root; pass it as projectCwd so reports/sessions go there
const taskRunResult = await executeTaskWithResult({