diff --git a/docs/testing/e2e.md b/docs/testing/e2e.md index 6d386db..331d1e2 100644 --- a/docs/testing/e2e.md +++ b/docs/testing/e2e.md @@ -92,6 +92,16 @@ E2Eテストを追加・変更した場合は、このドキュメントも更 - `.takt/tasks/` にタスクYAMLを追加する(`piece` に `e2e/fixtures/pieces/mock-single-step.yaml` を指定)。 - 出力に `Task "watch-task" completed` が含まれることを確認する。 - `Ctrl+C` で終了する。 +- Run tasks graceful shutdown on SIGINT(`e2e/specs/run-sigint-graceful.e2e.ts`) + - 目的: `takt run` を並列実行中に `Ctrl+C` した際、新規クローン投入を止めてグレースフルに終了することを確認。 + - LLM: 呼び出さない(`--provider mock` 固定) + - 手順(ユーザー行動/コマンド): + - `.takt/tasks.yaml` に `worktree: true` の pending タスクを3件投入する(`concurrency: 2`)。 + - 各タスクの `piece` に `e2e/fixtures/pieces/mock-slow-multi-step.yaml` を指定する。 + - `TAKT_MOCK_SCENARIO=e2e/fixtures/scenarios/run-sigint-parallel.json` を設定する。 + - `takt run --provider mock` を起動し、`=== Running Piece:` が出たら `Ctrl+C` を送る。 + - 3件目タスク(`sigint-c`)が開始されないことを確認する。 + - `=== Tasks Summary ===` 以降に新規タスク開始やクローン作成ログが出ないことを確認する。 - List tasks non-interactive(`e2e/specs/list-non-interactive.e2e.ts`) - 目的: `takt list` の非対話モードでブランチ操作ができることを確認。 - LLM: 呼び出さない(LLM不使用の操作のみ) diff --git a/e2e/fixtures/pieces/mock-slow-multi-step.yaml b/e2e/fixtures/pieces/mock-slow-multi-step.yaml new file mode 100644 index 0000000..5e4d8d0 --- /dev/null +++ b/e2e/fixtures/pieces/mock-slow-multi-step.yaml @@ -0,0 +1,79 @@ +name: e2e-mock-slow-multi-step +description: Multi-step mock piece to keep tasks in-flight long enough for SIGINT E2E + +max_iterations: 20 + +initial_movement: step-1 + +movements: + - name: step-1 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + {task} + rules: + - condition: Done + next: step-2 + + - name: step-2 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-3 + + - name: step-3 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-4 + + - name: step-4 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-5 + + - name: step-5 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-6 + + - name: step-6 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-7 + + - name: step-7 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Continue task execution. + rules: + - condition: Done + next: step-8 + + - name: step-8 + edit: true + persona: ../agents/test-coder.md + instruction_template: | + Finalize task execution. + rules: + - condition: Done + next: COMPLETE diff --git a/e2e/fixtures/scenarios/run-sigint-parallel.json b/e2e/fixtures/scenarios/run-sigint-parallel.json new file mode 100644 index 0000000..bc242e6 --- /dev/null +++ b/e2e/fixtures/scenarios/run-sigint-parallel.json @@ -0,0 +1,32 @@ +[ + { + "persona": "summarizer", + "status": "done", + "content": "sigint-a" + }, + { + "persona": "summarizer", + "status": "done", + "content": "sigint-b" + }, + { + "persona": "summarizer", + "status": "done", + "content": "sigint-c" + }, + { + "persona": "test-coder", + "status": "done", + "content": "[EXECUTE:1]\n\nDone" + }, + { + "persona": "test-coder", + "status": "done", + "content": "[EXECUTE:1]\n\nDone" + }, + { + "persona": "test-coder", + "status": "done", + "content": "[EXECUTE:1]\n\nDone" + } +] diff --git a/e2e/specs/run-sigint-graceful.e2e.ts b/e2e/specs/run-sigint-graceful.e2e.ts new file mode 100644 index 0000000..941baea --- /dev/null +++ b/e2e/specs/run-sigint-graceful.e2e.ts @@ -0,0 +1,176 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { spawn } from 'node:child_process'; +import { mkdirSync, writeFileSync, readFileSync } from 'node:fs'; +import { join, resolve, dirname } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { createIsolatedEnv, type IsolatedEnv } from '../helpers/isolated-env'; +import { createTestRepo, type TestRepo } from '../helpers/test-repo'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +async function waitFor( + predicate: () => boolean, + timeoutMs: number, + intervalMs: number = 100, +): Promise { + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + if (predicate()) { + return true; + } + await new Promise((resolvePromise) => setTimeout(resolvePromise, intervalMs)); + } + return false; +} + +async function waitForClose( + child: ReturnType, + timeoutMs: number, +): Promise<{ code: number | null; signal: NodeJS.Signals | null }> { + return await new Promise((resolvePromise, rejectPromise) => { + const timeout = setTimeout(() => { + child.kill('SIGKILL'); + rejectPromise(new Error(`Process did not exit within ${timeoutMs}ms`)); + }, timeoutMs); + + child.on('close', (code, signal) => { + clearTimeout(timeout); + resolvePromise({ code, signal }); + }); + }); +} + +// E2E更新時は docs/testing/e2e.md も更新すること +describe('E2E: Run tasks graceful shutdown on SIGINT (parallel)', () => { + let isolatedEnv: IsolatedEnv; + let testRepo: TestRepo; + + beforeEach(() => { + isolatedEnv = createIsolatedEnv(); + testRepo = createTestRepo(); + + writeFileSync( + join(isolatedEnv.taktDir, 'config.yaml'), + [ + 'provider: mock', + 'model: mock-model', + 'language: en', + 'log_level: info', + 'default_piece: default', + 'concurrency: 2', + 'task_poll_interval_ms: 100', + ].join('\n'), + ); + }); + + afterEach(() => { + try { + testRepo.cleanup(); + } catch { + // best-effort + } + try { + isolatedEnv.cleanup(); + } catch { + // best-effort + } + }); + + it('should stop scheduling new clone work after SIGINT and exit cleanly', async () => { + const binPath = resolve(__dirname, '../../bin/takt'); + const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml'); + const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json'); + + const tasksFile = join(testRepo.path, '.takt', 'tasks.yaml'); + mkdirSync(join(testRepo.path, '.takt'), { recursive: true }); + + const now = new Date().toISOString(); + writeFileSync( + tasksFile, + [ + 'tasks:', + ' - name: sigint-a', + ' status: pending', + ' content: "E2E SIGINT task A"', + ` piece: "${piecePath}"`, + ' worktree: true', + ` created_at: "${now}"`, + ' started_at: null', + ' completed_at: null', + ' owner_pid: null', + ' - name: sigint-b', + ' status: pending', + ' content: "E2E SIGINT task B"', + ` piece: "${piecePath}"`, + ' worktree: true', + ` created_at: "${now}"`, + ' started_at: null', + ' completed_at: null', + ' owner_pid: null', + ' - name: sigint-c', + ' status: pending', + ' content: "E2E SIGINT task C"', + ` piece: "${piecePath}"`, + ' worktree: true', + ` created_at: "${now}"`, + ' started_at: null', + ' completed_at: null', + ' owner_pid: null', + ].join('\n'), + 'utf-8', + ); + + const child = spawn('node', [binPath, 'run', '--provider', 'mock'], { + cwd: testRepo.path, + env: { + ...isolatedEnv.env, + TAKT_MOCK_SCENARIO: scenarioPath, + TAKT_E2E_SELF_SIGINT_ONCE: '1', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + let stdout = ''; + let stderr = ''; + child.stdout?.on('data', (chunk) => { + stdout += chunk.toString(); + }); + child.stderr?.on('data', (chunk) => { + stderr += chunk.toString(); + }); + + const workersFilled = await waitFor( + () => stdout.includes('=== Task: sigint-b ==='), + 30_000, + 20, + ); + expect(workersFilled, `stdout:\n${stdout}\n\nstderr:\n${stderr}`).toBe(true); + + const exit = await waitForClose(child, 60_000); + + expect( + exit.signal === 'SIGINT' || exit.code === 130 || exit.code === 0, + `unexpected exit: code=${exit.code}, signal=${exit.signal}`, + ).toBe(true); + expect(stdout).not.toContain('=== Task: sigint-c ==='); + expect(stdout).not.toContain('Task "sigint-c" completed'); + + const summaryIndex = stdout.lastIndexOf('=== Tasks Summary ==='); + expect(summaryIndex).toBeGreaterThan(-1); + + const afterSummary = stdout.slice(summaryIndex); + expect(afterSummary).not.toContain('=== Task:'); + expect(afterSummary).not.toContain('=== Running Piece:'); + expect(afterSummary).not.toContain('Creating clone...'); + + const finalTasksYaml = readFileSync(tasksFile, 'utf-8'); + expect(finalTasksYaml).toMatch( + /name: sigint-c[\s\S]*?status: pending/, + ); + + if (stderr.trim().length > 0) { + expect(stderr).not.toContain('UnhandledPromiseRejection'); + } + }, 120_000); +}); diff --git a/src/features/tasks/execute/parallelExecution.ts b/src/features/tasks/execute/parallelExecution.ts index 6878b12..ee65819 100644 --- a/src/features/tasks/execute/parallelExecution.ts +++ b/src/features/tasks/execute/parallelExecution.ts @@ -97,6 +97,8 @@ export async function runWithWorkerPool( ): Promise { const abortController = new AbortController(); const { cleanup } = installSigIntHandler(() => abortController.abort()); + const selfSigintOnce = process.env.TAKT_E2E_SELF_SIGINT_ONCE === '1'; + let selfSigintInjected = false; let successCount = 0; let failCount = 0; @@ -109,6 +111,10 @@ export async function runWithWorkerPool( while (queue.length > 0 || active.size > 0) { if (!abortController.signal.aborted) { fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter); + if (selfSigintOnce && !selfSigintInjected && active.size > 0) { + selfSigintInjected = true; + process.emit('SIGINT'); + } } if (active.size === 0) { diff --git a/vitest.config.e2e.mock.ts b/vitest.config.e2e.mock.ts index 94d2c03..246ed68 100644 --- a/vitest.config.e2e.mock.ts +++ b/vitest.config.e2e.mock.ts @@ -10,6 +10,7 @@ export default defineConfig({ 'e2e/specs/watch.e2e.ts', 'e2e/specs/list-non-interactive.e2e.ts', 'e2e/specs/multi-step-parallel.e2e.ts', + 'e2e/specs/run-sigint-graceful.e2e.ts', ], environment: 'node', globals: false,