From 2313b3985f9fea64399c19c6d7419969bf8a298f Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Wed, 18 Feb 2026 23:15:52 +0900 Subject: [PATCH] takt: takt-e2e (#298) --- docs/testing/e2e.md | 14 ++ e2e/specs/run-recovery.e2e.ts | 325 ++++++++++++++++++++++++++++++++++ 2 files changed, 339 insertions(+) create mode 100644 e2e/specs/run-recovery.e2e.ts diff --git a/docs/testing/e2e.md b/docs/testing/e2e.md index 01ed20c..6f75380 100644 --- a/docs/testing/e2e.md +++ b/docs/testing/e2e.md @@ -103,6 +103,20 @@ E2Eテストを追加・変更した場合は、このドキュメントも更 - `.takt/tasks.yaml` に pending タスクを追加する(`piece` に `e2e/fixtures/pieces/mock-single-step.yaml` を指定)。 - 出力に `Task "watch-task" completed` が含まれることを確認する。 - `Ctrl+C` で終了する。 +- Run recovery and high-priority run flows(`e2e/specs/run-recovery.e2e.ts`) + - 目的: 高優先度ユースケース(異常終了リカバリー、並列実行、初期化〜add〜run)をまとめて確認。 + - LLM: 呼び出さない(`--provider mock` 固定) + - 手順(ユーザー行動/コマンド): + - 異常終了リカバリー: + - `.takt/tasks.yaml` に pending タスク2件を投入し、`takt run --provider mock` 実行中にプロセスを強制終了する。 + - 再度 `takt run --provider mock` を実行し、`Recovered 1 interrupted running task(s) to pending.` が出力されることを確認する。 + - 復旧対象を含む全タスクが完了し、`.takt/tasks.yaml` が空になることを確認する。 + - 高並列実行: + - `concurrency: 10` を設定し、pending タスク12件を投入して `takt run --provider mock` を実行する。 + - 出力に `Concurrency: 10` と `Tasks Summary` が含まれること、および `.takt/tasks.yaml` が空になることを確認する。 + - 初期化〜add〜run: + - グローバル `config.yaml` 不在の環境で `takt add` を2回実行し、`takt run --provider mock` を実行する。 + - タスク実行完了後に `.takt/tasks/` 配下の2タスクディレクトリ生成、`.takt/.gitignore` 生成、`.takt/tasks.yaml` の空状態を確認する。 - Run tasks graceful shutdown on SIGINT(`e2e/specs/run-sigint-graceful.e2e.ts`) - 目的: `takt run` を並列実行中に `Ctrl+C` した際、新規クローン投入を止めてグレースフルに終了することを確認。 - LLM: 呼び出さない(`--provider mock` 固定) diff --git a/e2e/specs/run-recovery.e2e.ts b/e2e/specs/run-recovery.e2e.ts new file mode 100644 index 0000000..aced013 --- /dev/null +++ b/e2e/specs/run-recovery.e2e.ts @@ -0,0 +1,325 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { spawn, execFileSync } from 'node:child_process'; +import { resolve, dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; +import { + mkdtempSync, + mkdirSync, + writeFileSync, + readFileSync, + rmSync, + existsSync, + readdirSync, +} from 'node:fs'; +import { tmpdir } from 'node:os'; +import { parse as parseYaml, stringify as stringifyYaml } from 'yaml'; +import { + createIsolatedEnv, + updateIsolatedConfig, + type IsolatedEnv, +} from '../helpers/isolated-env'; +import { runTakt } from '../helpers/takt-runner'; + +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +interface LocalRepo { + path: string; + cleanup: () => void; +} + +interface TaskRecord { + name: string; + status: 'pending' | 'running' | 'failed' | 'completed'; + owner_pid?: number | null; + piece?: string; +} + +function createLocalRepo(): LocalRepo { + const repoPath = mkdtempSync(join(tmpdir(), 'takt-e2e-run-recovery-')); + execFileSync('git', ['init'], { cwd: repoPath, stdio: 'pipe' }); + execFileSync('git', ['config', 'user.email', 'test@example.com'], { cwd: repoPath, stdio: 'pipe' }); + execFileSync('git', ['config', 'user.name', 'Test'], { cwd: repoPath, stdio: 'pipe' }); + writeFileSync(join(repoPath, 'README.md'), '# test\n'); + execFileSync('git', ['add', '.'], { cwd: repoPath, stdio: 'pipe' }); + execFileSync('git', ['commit', '-m', 'init'], { cwd: repoPath, stdio: 'pipe' }); + return { + path: repoPath, + cleanup: () => { + rmSync(repoPath, { recursive: true, force: true }); + }, + }; +} + +function readTasks(tasksFile: string): TaskRecord[] { + const raw = readFileSync(tasksFile, 'utf-8'); + const parsed = parseYaml(raw) as { tasks?: TaskRecord[] }; + return parsed.tasks ?? []; +} + +function waitFor( + predicate: () => boolean, + timeoutMs: number, + intervalMs: number, +): Promise { + return new Promise((resolvePromise) => { + const startedAt = Date.now(); + const timer = setInterval(() => { + if (predicate()) { + clearInterval(timer); + resolvePromise(true); + return; + } + if (Date.now() - startedAt >= timeoutMs) { + clearInterval(timer); + resolvePromise(false); + } + }, intervalMs); + }); +} + +function createPendingTasksYaml( + count: number, + piecePath: string, + prefix: string, +): string { + const now = new Date().toISOString(); + const tasks = Array.from({ length: count }, (_, index) => ({ + name: `${prefix}-${String(index + 1)}`, + status: 'pending' as const, + content: `${prefix} task ${String(index + 1)}`, + piece: piecePath, + created_at: now, + started_at: null, + completed_at: null, + owner_pid: null, + })); + return stringifyYaml({ tasks }); +} + +function createEnvWithoutGlobalConfig(): { + env: NodeJS.ProcessEnv; + cleanup: () => void; + globalConfigPath: string; +} { + const baseDir = mkdtempSync(join(tmpdir(), 'takt-e2e-init-flow-')); + const globalConfigDir = join(baseDir, '.takt-global'); + const globalGitConfigPath = join(baseDir, '.gitconfig'); + const globalConfigPath = join(globalConfigDir, 'config.yaml'); + + writeFileSync( + globalGitConfigPath, + ['[user]', ' name = TAKT E2E Test', ' email = e2e@example.com'].join('\n'), + ); + + return { + env: { + ...process.env, + TAKT_CONFIG_DIR: globalConfigDir, + GIT_CONFIG_GLOBAL: globalGitConfigPath, + TAKT_NO_TTY: '1', + }, + globalConfigPath, + cleanup: () => { + rmSync(baseDir, { recursive: true, force: true }); + }, + }; +} + +// E2E更新時は docs/testing/e2e.md も更新すること +describe('E2E: Run interrupted task recovery and high-priority run flows', () => { + let isolatedEnv: IsolatedEnv; + let repo: LocalRepo; + + beforeEach(() => { + isolatedEnv = createIsolatedEnv(); + repo = createLocalRepo(); + }); + + afterEach(() => { + repo.cleanup(); + isolatedEnv.cleanup(); + }); + + it('should recover stale running task generated by forced process termination', async () => { + // Given: 2 pending tasks exist, then first run is force-killed while task is running + updateIsolatedConfig(isolatedEnv.taktDir, { + provider: 'mock', + model: 'mock-model', + concurrency: 1, + task_poll_interval_ms: 50, + }); + + const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml'); + const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json'); + const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); + + mkdirSync(join(repo.path, '.takt'), { recursive: true }); + writeFileSync(tasksFile, createPendingTasksYaml(2, piecePath, 'recovery-target'), 'utf-8'); + + const binPath = resolve(__dirname, '../../bin/takt'); + const child = spawn('node', [binPath, 'run', '--provider', 'mock'], { + cwd: repo.path, + env: { + ...isolatedEnv.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + let firstStdout = ''; + let firstStderr = ''; + child.stdout?.on('data', (chunk) => { + firstStdout += chunk.toString(); + }); + child.stderr?.on('data', (chunk) => { + firstStderr += chunk.toString(); + }); + + const runningObserved = await waitFor(() => { + if (!existsSync(tasksFile)) { + return false; + } + const tasks = readTasks(tasksFile); + return tasks.some((task) => task.status === 'running'); + }, 30_000, 20); + + expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true); + + child.kill('SIGKILL'); + + await new Promise((resolvePromise) => { + child.once('close', () => { + resolvePromise(); + }); + }); + + const staleTasks = readTasks(tasksFile); + const runningTask = staleTasks.find((task) => task.status === 'running'); + expect(runningTask).toBeDefined(); + expect(runningTask?.owner_pid).toBeTypeOf('number'); + + // When: run is executed again + const rerunResult = runTakt({ + args: ['run', '--provider', 'mock'], + cwd: repo.path, + env: { + ...isolatedEnv.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + timeout: 240_000, + }); + + // Then: stale running task is recovered and all tasks complete + expect(rerunResult.exitCode).toBe(0); + const combined = rerunResult.stdout + rerunResult.stderr; + expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.'); + expect(combined).toContain('recovery-target-1'); + expect(combined).toContain('recovery-target-2'); + + const finalTasks = readTasks(tasksFile); + expect(finalTasks).toEqual([]); + }, 240_000); + + it('should process high-concurrency batch without leaving inconsistent task state', () => { + // Given: 12 pending tasks with concurrency=10 + updateIsolatedConfig(isolatedEnv.taktDir, { + provider: 'mock', + model: 'mock-model', + concurrency: 10, + task_poll_interval_ms: 50, + }); + + const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml'); + const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json'); + const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); + + mkdirSync(join(repo.path, '.takt'), { recursive: true }); + writeFileSync(tasksFile, createPendingTasksYaml(12, piecePath, 'parallel-load'), 'utf-8'); + + // When: run all tasks + const result = runTakt({ + args: ['run', '--provider', 'mock'], + cwd: repo.path, + env: { + ...isolatedEnv.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + timeout: 240_000, + }); + + // Then: all tasks complete and queue becomes empty + expect(result.exitCode).toBe(0); + expect(result.stdout).toContain('Concurrency: 10'); + expect(result.stdout).toContain('Tasks Summary'); + const finalTasks = readTasks(tasksFile); + expect(finalTasks).toEqual([]); + }, 240_000); + + it('should initialize project dirs and execute tasks after add+run when global config is absent', () => { + const envWithoutConfig = createEnvWithoutGlobalConfig(); + + try { + // Given: global config.yaml is absent and project config points to a mock piece path + const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml'); + const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json'); + const projectConfigDir = join(repo.path, '.takt'); + const projectConfigPath = join(projectConfigDir, 'config.yaml'); + mkdirSync(projectConfigDir, { recursive: true }); + writeFileSync(projectConfigPath, `piece: ${piecePath}\npermissionMode: default\n`, 'utf-8'); + + expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false); + + // When: add 2 tasks and run once + const addResult1 = runTakt({ + args: ['--provider', 'mock', 'add', 'Initialize flow task 1'], + cwd: repo.path, + env: { + ...envWithoutConfig.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + timeout: 240_000, + }); + + const addResult2 = runTakt({ + args: ['--provider', 'mock', 'add', 'Initialize flow task 2'], + cwd: repo.path, + env: { + ...envWithoutConfig.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + timeout: 240_000, + }); + + const runResult = runTakt({ + args: ['--provider', 'mock', 'run'], + cwd: repo.path, + env: { + ...envWithoutConfig.env, + TAKT_MOCK_SCENARIO: scenarioPath, + }, + timeout: 240_000, + }); + + // Then: tasks are persisted/executed correctly and project init artifacts exist + expect(addResult1.exitCode).toBe(0); + expect(addResult2.exitCode).toBe(0); + expect(runResult.exitCode).toBe(0); + + const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); + const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] }; + expect(parsedFinal.tasks).toEqual([]); + + const taskDirsRoot = join(repo.path, '.takt', 'tasks'); + const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true }) + .filter((entry) => entry.isDirectory()) + .map((entry) => entry.name); + expect(taskDirs.length).toBe(2); + + expect(existsSync(join(projectConfigDir, '.gitignore'))).toBe(true); + expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false); + } finally { + envWithoutConfig.cleanup(); + } + }, 240_000); +});