takt/e2e/specs/run-recovery.e2e.ts
2026-02-18 23:15:52 +09:00

326 lines
10 KiB
TypeScript

import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { spawn, execFileSync } from 'node:child_process';
import { resolve, dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import {
mkdtempSync,
mkdirSync,
writeFileSync,
readFileSync,
rmSync,
existsSync,
readdirSync,
} from 'node:fs';
import { tmpdir } from 'node:os';
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
import {
createIsolatedEnv,
updateIsolatedConfig,
type IsolatedEnv,
} from '../helpers/isolated-env';
import { runTakt } from '../helpers/takt-runner';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
interface LocalRepo {
path: string;
cleanup: () => void;
}
interface TaskRecord {
name: string;
status: 'pending' | 'running' | 'failed' | 'completed';
owner_pid?: number | null;
piece?: string;
}
function createLocalRepo(): LocalRepo {
const repoPath = mkdtempSync(join(tmpdir(), 'takt-e2e-run-recovery-'));
execFileSync('git', ['init'], { cwd: repoPath, stdio: 'pipe' });
execFileSync('git', ['config', 'user.email', 'test@example.com'], { cwd: repoPath, stdio: 'pipe' });
execFileSync('git', ['config', 'user.name', 'Test'], { cwd: repoPath, stdio: 'pipe' });
writeFileSync(join(repoPath, 'README.md'), '# test\n');
execFileSync('git', ['add', '.'], { cwd: repoPath, stdio: 'pipe' });
execFileSync('git', ['commit', '-m', 'init'], { cwd: repoPath, stdio: 'pipe' });
return {
path: repoPath,
cleanup: () => {
rmSync(repoPath, { recursive: true, force: true });
},
};
}
function readTasks(tasksFile: string): TaskRecord[] {
const raw = readFileSync(tasksFile, 'utf-8');
const parsed = parseYaml(raw) as { tasks?: TaskRecord[] };
return parsed.tasks ?? [];
}
function waitFor(
predicate: () => boolean,
timeoutMs: number,
intervalMs: number,
): Promise<boolean> {
return new Promise((resolvePromise) => {
const startedAt = Date.now();
const timer = setInterval(() => {
if (predicate()) {
clearInterval(timer);
resolvePromise(true);
return;
}
if (Date.now() - startedAt >= timeoutMs) {
clearInterval(timer);
resolvePromise(false);
}
}, intervalMs);
});
}
function createPendingTasksYaml(
count: number,
piecePath: string,
prefix: string,
): string {
const now = new Date().toISOString();
const tasks = Array.from({ length: count }, (_, index) => ({
name: `${prefix}-${String(index + 1)}`,
status: 'pending' as const,
content: `${prefix} task ${String(index + 1)}`,
piece: piecePath,
created_at: now,
started_at: null,
completed_at: null,
owner_pid: null,
}));
return stringifyYaml({ tasks });
}
function createEnvWithoutGlobalConfig(): {
env: NodeJS.ProcessEnv;
cleanup: () => void;
globalConfigPath: string;
} {
const baseDir = mkdtempSync(join(tmpdir(), 'takt-e2e-init-flow-'));
const globalConfigDir = join(baseDir, '.takt-global');
const globalGitConfigPath = join(baseDir, '.gitconfig');
const globalConfigPath = join(globalConfigDir, 'config.yaml');
writeFileSync(
globalGitConfigPath,
['[user]', ' name = TAKT E2E Test', ' email = e2e@example.com'].join('\n'),
);
return {
env: {
...process.env,
TAKT_CONFIG_DIR: globalConfigDir,
GIT_CONFIG_GLOBAL: globalGitConfigPath,
TAKT_NO_TTY: '1',
},
globalConfigPath,
cleanup: () => {
rmSync(baseDir, { recursive: true, force: true });
},
};
}
// E2E更新時は docs/testing/e2e.md も更新すること
describe('E2E: Run interrupted task recovery and high-priority run flows', () => {
let isolatedEnv: IsolatedEnv;
let repo: LocalRepo;
beforeEach(() => {
isolatedEnv = createIsolatedEnv();
repo = createLocalRepo();
});
afterEach(() => {
repo.cleanup();
isolatedEnv.cleanup();
});
it('should recover stale running task generated by forced process termination', async () => {
// Given: 2 pending tasks exist, then first run is force-killed while task is running
updateIsolatedConfig(isolatedEnv.taktDir, {
provider: 'mock',
model: 'mock-model',
concurrency: 1,
task_poll_interval_ms: 50,
});
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml');
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json');
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
mkdirSync(join(repo.path, '.takt'), { recursive: true });
writeFileSync(tasksFile, createPendingTasksYaml(2, piecePath, 'recovery-target'), 'utf-8');
const binPath = resolve(__dirname, '../../bin/takt');
const child = spawn('node', [binPath, 'run', '--provider', 'mock'], {
cwd: repo.path,
env: {
...isolatedEnv.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
stdio: ['ignore', 'pipe', 'pipe'],
});
let firstStdout = '';
let firstStderr = '';
child.stdout?.on('data', (chunk) => {
firstStdout += chunk.toString();
});
child.stderr?.on('data', (chunk) => {
firstStderr += chunk.toString();
});
const runningObserved = await waitFor(() => {
if (!existsSync(tasksFile)) {
return false;
}
const tasks = readTasks(tasksFile);
return tasks.some((task) => task.status === 'running');
}, 30_000, 20);
expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true);
child.kill('SIGKILL');
await new Promise<void>((resolvePromise) => {
child.once('close', () => {
resolvePromise();
});
});
const staleTasks = readTasks(tasksFile);
const runningTask = staleTasks.find((task) => task.status === 'running');
expect(runningTask).toBeDefined();
expect(runningTask?.owner_pid).toBeTypeOf('number');
// When: run is executed again
const rerunResult = runTakt({
args: ['run', '--provider', 'mock'],
cwd: repo.path,
env: {
...isolatedEnv.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});
// Then: stale running task is recovered and all tasks complete
expect(rerunResult.exitCode).toBe(0);
const combined = rerunResult.stdout + rerunResult.stderr;
expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.');
expect(combined).toContain('recovery-target-1');
expect(combined).toContain('recovery-target-2');
const finalTasks = readTasks(tasksFile);
expect(finalTasks).toEqual([]);
}, 240_000);
it('should process high-concurrency batch without leaving inconsistent task state', () => {
// Given: 12 pending tasks with concurrency=10
updateIsolatedConfig(isolatedEnv.taktDir, {
provider: 'mock',
model: 'mock-model',
concurrency: 10,
task_poll_interval_ms: 50,
});
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
mkdirSync(join(repo.path, '.takt'), { recursive: true });
writeFileSync(tasksFile, createPendingTasksYaml(12, piecePath, 'parallel-load'), 'utf-8');
// When: run all tasks
const result = runTakt({
args: ['run', '--provider', 'mock'],
cwd: repo.path,
env: {
...isolatedEnv.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});
// Then: all tasks complete and queue becomes empty
expect(result.exitCode).toBe(0);
expect(result.stdout).toContain('Concurrency: 10');
expect(result.stdout).toContain('Tasks Summary');
const finalTasks = readTasks(tasksFile);
expect(finalTasks).toEqual([]);
}, 240_000);
it('should initialize project dirs and execute tasks after add+run when global config is absent', () => {
const envWithoutConfig = createEnvWithoutGlobalConfig();
try {
// Given: global config.yaml is absent and project config points to a mock piece path
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
const projectConfigDir = join(repo.path, '.takt');
const projectConfigPath = join(projectConfigDir, 'config.yaml');
mkdirSync(projectConfigDir, { recursive: true });
writeFileSync(projectConfigPath, `piece: ${piecePath}\npermissionMode: default\n`, 'utf-8');
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
// When: add 2 tasks and run once
const addResult1 = runTakt({
args: ['--provider', 'mock', 'add', 'Initialize flow task 1'],
cwd: repo.path,
env: {
...envWithoutConfig.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});
const addResult2 = runTakt({
args: ['--provider', 'mock', 'add', 'Initialize flow task 2'],
cwd: repo.path,
env: {
...envWithoutConfig.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});
const runResult = runTakt({
args: ['--provider', 'mock', 'run'],
cwd: repo.path,
env: {
...envWithoutConfig.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});
// Then: tasks are persisted/executed correctly and project init artifacts exist
expect(addResult1.exitCode).toBe(0);
expect(addResult2.exitCode).toBe(0);
expect(runResult.exitCode).toBe(0);
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] };
expect(parsedFinal.tasks).toEqual([]);
const taskDirsRoot = join(repo.path, '.takt', 'tasks');
const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true })
.filter((entry) => entry.isDirectory())
.map((entry) => entry.name);
expect(taskDirs.length).toBe(2);
expect(existsSync(join(projectConfigDir, '.gitignore'))).toBe(true);
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
} finally {
envWithoutConfig.cleanup();
}
}, 240_000);
});