takt: takt-e2e (#298)
This commit is contained in:
parent
fb071e3b11
commit
2313b3985f
@ -103,6 +103,20 @@ E2Eテストを追加・変更した場合は、このドキュメントも更
|
||||
- `.takt/tasks.yaml` に pending タスクを追加する(`piece` に `e2e/fixtures/pieces/mock-single-step.yaml` を指定)。
|
||||
- 出力に `Task "watch-task" completed` が含まれることを確認する。
|
||||
- `Ctrl+C` で終了する。
|
||||
- Run recovery and high-priority run flows(`e2e/specs/run-recovery.e2e.ts`)
|
||||
- 目的: 高優先度ユースケース(異常終了リカバリー、並列実行、初期化〜add〜run)をまとめて確認。
|
||||
- LLM: 呼び出さない(`--provider mock` 固定)
|
||||
- 手順(ユーザー行動/コマンド):
|
||||
- 異常終了リカバリー:
|
||||
- `.takt/tasks.yaml` に pending タスク2件を投入し、`takt run --provider mock` 実行中にプロセスを強制終了する。
|
||||
- 再度 `takt run --provider mock` を実行し、`Recovered 1 interrupted running task(s) to pending.` が出力されることを確認する。
|
||||
- 復旧対象を含む全タスクが完了し、`.takt/tasks.yaml` が空になることを確認する。
|
||||
- 高並列実行:
|
||||
- `concurrency: 10` を設定し、pending タスク12件を投入して `takt run --provider mock` を実行する。
|
||||
- 出力に `Concurrency: 10` と `Tasks Summary` が含まれること、および `.takt/tasks.yaml` が空になることを確認する。
|
||||
- 初期化〜add〜run:
|
||||
- グローバル `config.yaml` 不在の環境で `takt add` を2回実行し、`takt run --provider mock` を実行する。
|
||||
- タスク実行完了後に `.takt/tasks/` 配下の2タスクディレクトリ生成、`.takt/.gitignore` 生成、`.takt/tasks.yaml` の空状態を確認する。
|
||||
- Run tasks graceful shutdown on SIGINT(`e2e/specs/run-sigint-graceful.e2e.ts`)
|
||||
- 目的: `takt run` を並列実行中に `Ctrl+C` した際、新規クローン投入を止めてグレースフルに終了することを確認。
|
||||
- LLM: 呼び出さない(`--provider mock` 固定)
|
||||
|
||||
325
e2e/specs/run-recovery.e2e.ts
Normal file
325
e2e/specs/run-recovery.e2e.ts
Normal file
@ -0,0 +1,325 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import { spawn, execFileSync } from 'node:child_process';
|
||||
import { resolve, dirname, join } from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import {
|
||||
mkdtempSync,
|
||||
mkdirSync,
|
||||
writeFileSync,
|
||||
readFileSync,
|
||||
rmSync,
|
||||
existsSync,
|
||||
readdirSync,
|
||||
} from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
|
||||
import {
|
||||
createIsolatedEnv,
|
||||
updateIsolatedConfig,
|
||||
type IsolatedEnv,
|
||||
} from '../helpers/isolated-env';
|
||||
import { runTakt } from '../helpers/takt-runner';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
interface LocalRepo {
|
||||
path: string;
|
||||
cleanup: () => void;
|
||||
}
|
||||
|
||||
interface TaskRecord {
|
||||
name: string;
|
||||
status: 'pending' | 'running' | 'failed' | 'completed';
|
||||
owner_pid?: number | null;
|
||||
piece?: string;
|
||||
}
|
||||
|
||||
function createLocalRepo(): LocalRepo {
|
||||
const repoPath = mkdtempSync(join(tmpdir(), 'takt-e2e-run-recovery-'));
|
||||
execFileSync('git', ['init'], { cwd: repoPath, stdio: 'pipe' });
|
||||
execFileSync('git', ['config', 'user.email', 'test@example.com'], { cwd: repoPath, stdio: 'pipe' });
|
||||
execFileSync('git', ['config', 'user.name', 'Test'], { cwd: repoPath, stdio: 'pipe' });
|
||||
writeFileSync(join(repoPath, 'README.md'), '# test\n');
|
||||
execFileSync('git', ['add', '.'], { cwd: repoPath, stdio: 'pipe' });
|
||||
execFileSync('git', ['commit', '-m', 'init'], { cwd: repoPath, stdio: 'pipe' });
|
||||
return {
|
||||
path: repoPath,
|
||||
cleanup: () => {
|
||||
rmSync(repoPath, { recursive: true, force: true });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function readTasks(tasksFile: string): TaskRecord[] {
|
||||
const raw = readFileSync(tasksFile, 'utf-8');
|
||||
const parsed = parseYaml(raw) as { tasks?: TaskRecord[] };
|
||||
return parsed.tasks ?? [];
|
||||
}
|
||||
|
||||
function waitFor(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs: number,
|
||||
): Promise<boolean> {
|
||||
return new Promise((resolvePromise) => {
|
||||
const startedAt = Date.now();
|
||||
const timer = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearInterval(timer);
|
||||
resolvePromise(true);
|
||||
return;
|
||||
}
|
||||
if (Date.now() - startedAt >= timeoutMs) {
|
||||
clearInterval(timer);
|
||||
resolvePromise(false);
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
function createPendingTasksYaml(
|
||||
count: number,
|
||||
piecePath: string,
|
||||
prefix: string,
|
||||
): string {
|
||||
const now = new Date().toISOString();
|
||||
const tasks = Array.from({ length: count }, (_, index) => ({
|
||||
name: `${prefix}-${String(index + 1)}`,
|
||||
status: 'pending' as const,
|
||||
content: `${prefix} task ${String(index + 1)}`,
|
||||
piece: piecePath,
|
||||
created_at: now,
|
||||
started_at: null,
|
||||
completed_at: null,
|
||||
owner_pid: null,
|
||||
}));
|
||||
return stringifyYaml({ tasks });
|
||||
}
|
||||
|
||||
function createEnvWithoutGlobalConfig(): {
|
||||
env: NodeJS.ProcessEnv;
|
||||
cleanup: () => void;
|
||||
globalConfigPath: string;
|
||||
} {
|
||||
const baseDir = mkdtempSync(join(tmpdir(), 'takt-e2e-init-flow-'));
|
||||
const globalConfigDir = join(baseDir, '.takt-global');
|
||||
const globalGitConfigPath = join(baseDir, '.gitconfig');
|
||||
const globalConfigPath = join(globalConfigDir, 'config.yaml');
|
||||
|
||||
writeFileSync(
|
||||
globalGitConfigPath,
|
||||
['[user]', ' name = TAKT E2E Test', ' email = e2e@example.com'].join('\n'),
|
||||
);
|
||||
|
||||
return {
|
||||
env: {
|
||||
...process.env,
|
||||
TAKT_CONFIG_DIR: globalConfigDir,
|
||||
GIT_CONFIG_GLOBAL: globalGitConfigPath,
|
||||
TAKT_NO_TTY: '1',
|
||||
},
|
||||
globalConfigPath,
|
||||
cleanup: () => {
|
||||
rmSync(baseDir, { recursive: true, force: true });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// E2E更新時は docs/testing/e2e.md も更新すること
|
||||
describe('E2E: Run interrupted task recovery and high-priority run flows', () => {
|
||||
let isolatedEnv: IsolatedEnv;
|
||||
let repo: LocalRepo;
|
||||
|
||||
beforeEach(() => {
|
||||
isolatedEnv = createIsolatedEnv();
|
||||
repo = createLocalRepo();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
repo.cleanup();
|
||||
isolatedEnv.cleanup();
|
||||
});
|
||||
|
||||
it('should recover stale running task generated by forced process termination', async () => {
|
||||
// Given: 2 pending tasks exist, then first run is force-killed while task is running
|
||||
updateIsolatedConfig(isolatedEnv.taktDir, {
|
||||
provider: 'mock',
|
||||
model: 'mock-model',
|
||||
concurrency: 1,
|
||||
task_poll_interval_ms: 50,
|
||||
});
|
||||
|
||||
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml');
|
||||
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json');
|
||||
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
||||
|
||||
mkdirSync(join(repo.path, '.takt'), { recursive: true });
|
||||
writeFileSync(tasksFile, createPendingTasksYaml(2, piecePath, 'recovery-target'), 'utf-8');
|
||||
|
||||
const binPath = resolve(__dirname, '../../bin/takt');
|
||||
const child = spawn('node', [binPath, 'run', '--provider', 'mock'], {
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...isolatedEnv.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
});
|
||||
|
||||
let firstStdout = '';
|
||||
let firstStderr = '';
|
||||
child.stdout?.on('data', (chunk) => {
|
||||
firstStdout += chunk.toString();
|
||||
});
|
||||
child.stderr?.on('data', (chunk) => {
|
||||
firstStderr += chunk.toString();
|
||||
});
|
||||
|
||||
const runningObserved = await waitFor(() => {
|
||||
if (!existsSync(tasksFile)) {
|
||||
return false;
|
||||
}
|
||||
const tasks = readTasks(tasksFile);
|
||||
return tasks.some((task) => task.status === 'running');
|
||||
}, 30_000, 20);
|
||||
|
||||
expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true);
|
||||
|
||||
child.kill('SIGKILL');
|
||||
|
||||
await new Promise<void>((resolvePromise) => {
|
||||
child.once('close', () => {
|
||||
resolvePromise();
|
||||
});
|
||||
});
|
||||
|
||||
const staleTasks = readTasks(tasksFile);
|
||||
const runningTask = staleTasks.find((task) => task.status === 'running');
|
||||
expect(runningTask).toBeDefined();
|
||||
expect(runningTask?.owner_pid).toBeTypeOf('number');
|
||||
|
||||
// When: run is executed again
|
||||
const rerunResult = runTakt({
|
||||
args: ['run', '--provider', 'mock'],
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...isolatedEnv.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
timeout: 240_000,
|
||||
});
|
||||
|
||||
// Then: stale running task is recovered and all tasks complete
|
||||
expect(rerunResult.exitCode).toBe(0);
|
||||
const combined = rerunResult.stdout + rerunResult.stderr;
|
||||
expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.');
|
||||
expect(combined).toContain('recovery-target-1');
|
||||
expect(combined).toContain('recovery-target-2');
|
||||
|
||||
const finalTasks = readTasks(tasksFile);
|
||||
expect(finalTasks).toEqual([]);
|
||||
}, 240_000);
|
||||
|
||||
it('should process high-concurrency batch without leaving inconsistent task state', () => {
|
||||
// Given: 12 pending tasks with concurrency=10
|
||||
updateIsolatedConfig(isolatedEnv.taktDir, {
|
||||
provider: 'mock',
|
||||
model: 'mock-model',
|
||||
concurrency: 10,
|
||||
task_poll_interval_ms: 50,
|
||||
});
|
||||
|
||||
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
|
||||
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
|
||||
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
||||
|
||||
mkdirSync(join(repo.path, '.takt'), { recursive: true });
|
||||
writeFileSync(tasksFile, createPendingTasksYaml(12, piecePath, 'parallel-load'), 'utf-8');
|
||||
|
||||
// When: run all tasks
|
||||
const result = runTakt({
|
||||
args: ['run', '--provider', 'mock'],
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...isolatedEnv.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
timeout: 240_000,
|
||||
});
|
||||
|
||||
// Then: all tasks complete and queue becomes empty
|
||||
expect(result.exitCode).toBe(0);
|
||||
expect(result.stdout).toContain('Concurrency: 10');
|
||||
expect(result.stdout).toContain('Tasks Summary');
|
||||
const finalTasks = readTasks(tasksFile);
|
||||
expect(finalTasks).toEqual([]);
|
||||
}, 240_000);
|
||||
|
||||
it('should initialize project dirs and execute tasks after add+run when global config is absent', () => {
|
||||
const envWithoutConfig = createEnvWithoutGlobalConfig();
|
||||
|
||||
try {
|
||||
// Given: global config.yaml is absent and project config points to a mock piece path
|
||||
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
|
||||
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
|
||||
const projectConfigDir = join(repo.path, '.takt');
|
||||
const projectConfigPath = join(projectConfigDir, 'config.yaml');
|
||||
mkdirSync(projectConfigDir, { recursive: true });
|
||||
writeFileSync(projectConfigPath, `piece: ${piecePath}\npermissionMode: default\n`, 'utf-8');
|
||||
|
||||
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
|
||||
|
||||
// When: add 2 tasks and run once
|
||||
const addResult1 = runTakt({
|
||||
args: ['--provider', 'mock', 'add', 'Initialize flow task 1'],
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...envWithoutConfig.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
timeout: 240_000,
|
||||
});
|
||||
|
||||
const addResult2 = runTakt({
|
||||
args: ['--provider', 'mock', 'add', 'Initialize flow task 2'],
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...envWithoutConfig.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
timeout: 240_000,
|
||||
});
|
||||
|
||||
const runResult = runTakt({
|
||||
args: ['--provider', 'mock', 'run'],
|
||||
cwd: repo.path,
|
||||
env: {
|
||||
...envWithoutConfig.env,
|
||||
TAKT_MOCK_SCENARIO: scenarioPath,
|
||||
},
|
||||
timeout: 240_000,
|
||||
});
|
||||
|
||||
// Then: tasks are persisted/executed correctly and project init artifacts exist
|
||||
expect(addResult1.exitCode).toBe(0);
|
||||
expect(addResult2.exitCode).toBe(0);
|
||||
expect(runResult.exitCode).toBe(0);
|
||||
|
||||
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
||||
const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] };
|
||||
expect(parsedFinal.tasks).toEqual([]);
|
||||
|
||||
const taskDirsRoot = join(repo.path, '.takt', 'tasks');
|
||||
const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true })
|
||||
.filter((entry) => entry.isDirectory())
|
||||
.map((entry) => entry.name);
|
||||
expect(taskDirs.length).toBe(2);
|
||||
|
||||
expect(existsSync(join(projectConfigDir, '.gitignore'))).toBe(true);
|
||||
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
|
||||
} finally {
|
||||
envWithoutConfig.cleanup();
|
||||
}
|
||||
}, 240_000);
|
||||
});
|
||||
Loading…
x
Reference in New Issue
Block a user