import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { spawn, execFileSync } from 'node:child_process'; import { resolve, dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; import { mkdtempSync, mkdirSync, writeFileSync, readFileSync, rmSync, existsSync, readdirSync, } from 'node:fs'; import { tmpdir } from 'node:os'; import { parse as parseYaml, stringify as stringifyYaml } from 'yaml'; import { createIsolatedEnv, updateIsolatedConfig, type IsolatedEnv, } from '../helpers/isolated-env'; import { runTakt } from '../helpers/takt-runner'; const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); interface LocalRepo { path: string; cleanup: () => void; } interface TaskRecord { name: string; status: 'pending' | 'running' | 'failed' | 'completed'; owner_pid?: number | null; piece?: string; } function createLocalRepo(): LocalRepo { const repoPath = mkdtempSync(join(tmpdir(), 'takt-e2e-run-recovery-')); execFileSync('git', ['init'], { cwd: repoPath, stdio: 'pipe' }); execFileSync('git', ['config', 'user.email', 'test@example.com'], { cwd: repoPath, stdio: 'pipe' }); execFileSync('git', ['config', 'user.name', 'Test'], { cwd: repoPath, stdio: 'pipe' }); writeFileSync(join(repoPath, 'README.md'), '# test\n'); execFileSync('git', ['add', '.'], { cwd: repoPath, stdio: 'pipe' }); execFileSync('git', ['commit', '-m', 'init'], { cwd: repoPath, stdio: 'pipe' }); return { path: repoPath, cleanup: () => { rmSync(repoPath, { recursive: true, force: true }); }, }; } function readTasks(tasksFile: string): TaskRecord[] { const raw = readFileSync(tasksFile, 'utf-8'); const parsed = parseYaml(raw) as { tasks?: TaskRecord[] }; return parsed.tasks ?? []; } function waitFor( predicate: () => boolean, timeoutMs: number, intervalMs: number, ): Promise { return new Promise((resolvePromise) => { const startedAt = Date.now(); const timer = setInterval(() => { if (predicate()) { clearInterval(timer); resolvePromise(true); return; } if (Date.now() - startedAt >= timeoutMs) { clearInterval(timer); resolvePromise(false); } }, intervalMs); }); } function createPendingTasksYaml( count: number, piecePath: string, prefix: string, ): string { const now = new Date().toISOString(); const tasks = Array.from({ length: count }, (_, index) => ({ name: `${prefix}-${String(index + 1)}`, status: 'pending' as const, content: `${prefix} task ${String(index + 1)}`, piece: piecePath, created_at: now, started_at: null, completed_at: null, owner_pid: null, })); return stringifyYaml({ tasks }); } function createEnvWithoutGlobalConfig(): { env: NodeJS.ProcessEnv; cleanup: () => void; globalConfigPath: string; } { const baseDir = mkdtempSync(join(tmpdir(), 'takt-e2e-init-flow-')); const globalConfigDir = join(baseDir, '.takt-global'); const globalGitConfigPath = join(baseDir, '.gitconfig'); const globalConfigPath = join(globalConfigDir, 'config.yaml'); writeFileSync( globalGitConfigPath, ['[user]', ' name = TAKT E2E Test', ' email = e2e@example.com'].join('\n'), ); return { env: { ...process.env, TAKT_CONFIG_DIR: globalConfigDir, GIT_CONFIG_GLOBAL: globalGitConfigPath, TAKT_NO_TTY: '1', }, globalConfigPath, cleanup: () => { rmSync(baseDir, { recursive: true, force: true }); }, }; } // E2E更新時は docs/testing/e2e.md も更新すること describe('E2E: Run interrupted task recovery and high-priority run flows', () => { let isolatedEnv: IsolatedEnv; let repo: LocalRepo; beforeEach(() => { isolatedEnv = createIsolatedEnv(); repo = createLocalRepo(); }); afterEach(() => { repo.cleanup(); isolatedEnv.cleanup(); }); it('should recover stale running task generated by forced process termination', async () => { // Given: 2 pending tasks exist, then first run is force-killed while task is running updateIsolatedConfig(isolatedEnv.taktDir, { provider: 'mock', model: 'mock-model', concurrency: 1, task_poll_interval_ms: 50, }); const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml'); const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json'); const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); mkdirSync(join(repo.path, '.takt'), { recursive: true }); writeFileSync(tasksFile, createPendingTasksYaml(2, piecePath, 'recovery-target'), 'utf-8'); const binPath = resolve(__dirname, '../../bin/takt'); const child = spawn('node', [binPath, 'run', '--provider', 'mock'], { cwd: repo.path, env: { ...isolatedEnv.env, TAKT_MOCK_SCENARIO: scenarioPath, }, stdio: ['ignore', 'pipe', 'pipe'], }); let firstStdout = ''; let firstStderr = ''; child.stdout?.on('data', (chunk) => { firstStdout += chunk.toString(); }); child.stderr?.on('data', (chunk) => { firstStderr += chunk.toString(); }); const runningObserved = await waitFor(() => { if (!existsSync(tasksFile)) { return false; } const tasks = readTasks(tasksFile); return tasks.some((task) => task.status === 'running'); }, 30_000, 20); expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true); child.kill('SIGKILL'); await new Promise((resolvePromise) => { child.once('close', () => { resolvePromise(); }); }); const staleTasks = readTasks(tasksFile); const runningTask = staleTasks.find((task) => task.status === 'running'); expect(runningTask).toBeDefined(); expect(runningTask?.owner_pid).toBeTypeOf('number'); // When: run is executed again const rerunResult = runTakt({ args: ['run', '--provider', 'mock'], cwd: repo.path, env: { ...isolatedEnv.env, TAKT_MOCK_SCENARIO: scenarioPath, }, timeout: 240_000, }); // Then: stale running task is recovered and all tasks complete expect(rerunResult.exitCode).toBe(0); const combined = rerunResult.stdout + rerunResult.stderr; expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.'); expect(combined).toContain('recovery-target-1'); expect(combined).toContain('recovery-target-2'); const finalTasks = readTasks(tasksFile); expect(finalTasks).toEqual([]); }, 240_000); it('should process high-concurrency batch without leaving inconsistent task state', () => { // Given: 12 pending tasks with concurrency=10 updateIsolatedConfig(isolatedEnv.taktDir, { provider: 'mock', model: 'mock-model', concurrency: 10, task_poll_interval_ms: 50, }); const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml'); const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json'); const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); mkdirSync(join(repo.path, '.takt'), { recursive: true }); writeFileSync(tasksFile, createPendingTasksYaml(12, piecePath, 'parallel-load'), 'utf-8'); // When: run all tasks const result = runTakt({ args: ['run', '--provider', 'mock'], cwd: repo.path, env: { ...isolatedEnv.env, TAKT_MOCK_SCENARIO: scenarioPath, }, timeout: 240_000, }); // Then: all tasks complete and queue becomes empty expect(result.exitCode).toBe(0); expect(result.stdout).toContain('Concurrency: 10'); expect(result.stdout).toContain('Tasks Summary'); const finalTasks = readTasks(tasksFile); expect(finalTasks).toEqual([]); }, 240_000); it('should initialize project dirs and execute tasks after add+run when global config is absent', () => { const envWithoutConfig = createEnvWithoutGlobalConfig(); try { // Given: global config.yaml is absent and project config points to a mock piece path const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml'); const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json'); const projectConfigDir = join(repo.path, '.takt'); const projectConfigPath = join(projectConfigDir, 'config.yaml'); mkdirSync(projectConfigDir, { recursive: true }); writeFileSync(projectConfigPath, `piece: ${piecePath}\npermissionMode: default\n`, 'utf-8'); expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false); // When: add 2 tasks and run once const addResult1 = runTakt({ args: ['--provider', 'mock', 'add', 'Initialize flow task 1'], cwd: repo.path, env: { ...envWithoutConfig.env, TAKT_MOCK_SCENARIO: scenarioPath, }, timeout: 240_000, }); const addResult2 = runTakt({ args: ['--provider', 'mock', 'add', 'Initialize flow task 2'], cwd: repo.path, env: { ...envWithoutConfig.env, TAKT_MOCK_SCENARIO: scenarioPath, }, timeout: 240_000, }); const runResult = runTakt({ args: ['--provider', 'mock', 'run'], cwd: repo.path, env: { ...envWithoutConfig.env, TAKT_MOCK_SCENARIO: scenarioPath, }, timeout: 240_000, }); // Then: tasks are persisted/executed correctly and project init artifacts exist expect(addResult1.exitCode).toBe(0); expect(addResult2.exitCode).toBe(0); expect(runResult.exitCode).toBe(0); const tasksFile = join(repo.path, '.takt', 'tasks.yaml'); const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] }; expect(parsedFinal.tasks).toEqual([]); const taskDirsRoot = join(repo.path, '.takt', 'tasks'); const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true }) .filter((entry) => entry.isDirectory()) .map((entry) => entry.name); expect(taskDirs.length).toBe(2); expect(existsSync(join(projectConfigDir, '.gitignore'))).toBe(true); expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false); } finally { envWithoutConfig.cleanup(); } }, 240_000); });