326 lines
10 KiB
TypeScript
326 lines
10 KiB
TypeScript
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
|
import { spawn, execFileSync } from 'node:child_process';
|
|
import { resolve, dirname, join } from 'node:path';
|
|
import { fileURLToPath } from 'node:url';
|
|
import {
|
|
mkdtempSync,
|
|
mkdirSync,
|
|
writeFileSync,
|
|
readFileSync,
|
|
rmSync,
|
|
existsSync,
|
|
readdirSync,
|
|
} from 'node:fs';
|
|
import { tmpdir } from 'node:os';
|
|
import { parse as parseYaml, stringify as stringifyYaml } from 'yaml';
|
|
import {
|
|
createIsolatedEnv,
|
|
updateIsolatedConfig,
|
|
type IsolatedEnv,
|
|
} from '../helpers/isolated-env';
|
|
import { runTakt } from '../helpers/takt-runner';
|
|
|
|
const __filename = fileURLToPath(import.meta.url);
|
|
const __dirname = dirname(__filename);
|
|
|
|
interface LocalRepo {
|
|
path: string;
|
|
cleanup: () => void;
|
|
}
|
|
|
|
interface TaskRecord {
|
|
name: string;
|
|
status: 'pending' | 'running' | 'failed' | 'completed';
|
|
owner_pid?: number | null;
|
|
piece?: string;
|
|
}
|
|
|
|
function createLocalRepo(): LocalRepo {
|
|
const repoPath = mkdtempSync(join(tmpdir(), 'takt-e2e-run-recovery-'));
|
|
execFileSync('git', ['init'], { cwd: repoPath, stdio: 'pipe' });
|
|
execFileSync('git', ['config', 'user.email', 'test@example.com'], { cwd: repoPath, stdio: 'pipe' });
|
|
execFileSync('git', ['config', 'user.name', 'Test'], { cwd: repoPath, stdio: 'pipe' });
|
|
writeFileSync(join(repoPath, 'README.md'), '# test\n');
|
|
execFileSync('git', ['add', '.'], { cwd: repoPath, stdio: 'pipe' });
|
|
execFileSync('git', ['commit', '-m', 'init'], { cwd: repoPath, stdio: 'pipe' });
|
|
return {
|
|
path: repoPath,
|
|
cleanup: () => {
|
|
rmSync(repoPath, { recursive: true, force: true });
|
|
},
|
|
};
|
|
}
|
|
|
|
function readTasks(tasksFile: string): TaskRecord[] {
|
|
const raw = readFileSync(tasksFile, 'utf-8');
|
|
const parsed = parseYaml(raw) as { tasks?: TaskRecord[] };
|
|
return parsed.tasks ?? [];
|
|
}
|
|
|
|
function waitFor(
|
|
predicate: () => boolean,
|
|
timeoutMs: number,
|
|
intervalMs: number,
|
|
): Promise<boolean> {
|
|
return new Promise((resolvePromise) => {
|
|
const startedAt = Date.now();
|
|
const timer = setInterval(() => {
|
|
if (predicate()) {
|
|
clearInterval(timer);
|
|
resolvePromise(true);
|
|
return;
|
|
}
|
|
if (Date.now() - startedAt >= timeoutMs) {
|
|
clearInterval(timer);
|
|
resolvePromise(false);
|
|
}
|
|
}, intervalMs);
|
|
});
|
|
}
|
|
|
|
function createPendingTasksYaml(
|
|
count: number,
|
|
piecePath: string,
|
|
prefix: string,
|
|
): string {
|
|
const now = new Date().toISOString();
|
|
const tasks = Array.from({ length: count }, (_, index) => ({
|
|
name: `${prefix}-${String(index + 1)}`,
|
|
status: 'pending' as const,
|
|
content: `${prefix} task ${String(index + 1)}`,
|
|
piece: piecePath,
|
|
created_at: now,
|
|
started_at: null,
|
|
completed_at: null,
|
|
owner_pid: null,
|
|
}));
|
|
return stringifyYaml({ tasks });
|
|
}
|
|
|
|
function createEnvWithoutGlobalConfig(): {
|
|
env: NodeJS.ProcessEnv;
|
|
cleanup: () => void;
|
|
globalConfigPath: string;
|
|
} {
|
|
const baseDir = mkdtempSync(join(tmpdir(), 'takt-e2e-init-flow-'));
|
|
const globalConfigDir = join(baseDir, '.takt-global');
|
|
const globalGitConfigPath = join(baseDir, '.gitconfig');
|
|
const globalConfigPath = join(globalConfigDir, 'config.yaml');
|
|
|
|
writeFileSync(
|
|
globalGitConfigPath,
|
|
['[user]', ' name = TAKT E2E Test', ' email = e2e@example.com'].join('\n'),
|
|
);
|
|
|
|
return {
|
|
env: {
|
|
...process.env,
|
|
TAKT_CONFIG_DIR: globalConfigDir,
|
|
GIT_CONFIG_GLOBAL: globalGitConfigPath,
|
|
TAKT_NO_TTY: '1',
|
|
},
|
|
globalConfigPath,
|
|
cleanup: () => {
|
|
rmSync(baseDir, { recursive: true, force: true });
|
|
},
|
|
};
|
|
}
|
|
|
|
// E2E更新時は docs/testing/e2e.md も更新すること
|
|
describe('E2E: Run interrupted task recovery and high-priority run flows', () => {
|
|
let isolatedEnv: IsolatedEnv;
|
|
let repo: LocalRepo;
|
|
|
|
beforeEach(() => {
|
|
isolatedEnv = createIsolatedEnv();
|
|
repo = createLocalRepo();
|
|
});
|
|
|
|
afterEach(() => {
|
|
repo.cleanup();
|
|
isolatedEnv.cleanup();
|
|
});
|
|
|
|
it('should recover stale running task generated by forced process termination', async () => {
|
|
// Given: 2 pending tasks exist, then first run is force-killed while task is running
|
|
updateIsolatedConfig(isolatedEnv.taktDir, {
|
|
provider: 'mock',
|
|
model: 'mock-model',
|
|
concurrency: 1,
|
|
task_poll_interval_ms: 50,
|
|
});
|
|
|
|
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-slow-multi-step.yaml');
|
|
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/run-sigint-parallel.json');
|
|
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
|
|
|
mkdirSync(join(repo.path, '.takt'), { recursive: true });
|
|
writeFileSync(tasksFile, createPendingTasksYaml(2, piecePath, 'recovery-target'), 'utf-8');
|
|
|
|
const binPath = resolve(__dirname, '../../bin/takt');
|
|
const child = spawn('node', [binPath, 'run', '--provider', 'mock'], {
|
|
cwd: repo.path,
|
|
env: {
|
|
...isolatedEnv.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
stdio: ['ignore', 'pipe', 'pipe'],
|
|
});
|
|
|
|
let firstStdout = '';
|
|
let firstStderr = '';
|
|
child.stdout?.on('data', (chunk) => {
|
|
firstStdout += chunk.toString();
|
|
});
|
|
child.stderr?.on('data', (chunk) => {
|
|
firstStderr += chunk.toString();
|
|
});
|
|
|
|
const runningObserved = await waitFor(() => {
|
|
if (!existsSync(tasksFile)) {
|
|
return false;
|
|
}
|
|
const tasks = readTasks(tasksFile);
|
|
return tasks.some((task) => task.status === 'running');
|
|
}, 30_000, 20);
|
|
|
|
expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true);
|
|
|
|
child.kill('SIGKILL');
|
|
|
|
await new Promise<void>((resolvePromise) => {
|
|
child.once('close', () => {
|
|
resolvePromise();
|
|
});
|
|
});
|
|
|
|
const staleTasks = readTasks(tasksFile);
|
|
const runningTask = staleTasks.find((task) => task.status === 'running');
|
|
expect(runningTask).toBeDefined();
|
|
expect(runningTask?.owner_pid).toBeTypeOf('number');
|
|
|
|
// When: run is executed again
|
|
const rerunResult = runTakt({
|
|
args: ['run', '--provider', 'mock'],
|
|
cwd: repo.path,
|
|
env: {
|
|
...isolatedEnv.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
timeout: 240_000,
|
|
});
|
|
|
|
// Then: stale running task is recovered and all tasks complete
|
|
expect(rerunResult.exitCode).toBe(0);
|
|
const combined = rerunResult.stdout + rerunResult.stderr;
|
|
expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.');
|
|
expect(combined).toContain('recovery-target-1');
|
|
expect(combined).toContain('recovery-target-2');
|
|
|
|
const finalTasks = readTasks(tasksFile);
|
|
expect(finalTasks).toEqual([]);
|
|
}, 240_000);
|
|
|
|
it('should process high-concurrency batch without leaving inconsistent task state', () => {
|
|
// Given: 12 pending tasks with concurrency=10
|
|
updateIsolatedConfig(isolatedEnv.taktDir, {
|
|
provider: 'mock',
|
|
model: 'mock-model',
|
|
concurrency: 10,
|
|
task_poll_interval_ms: 50,
|
|
});
|
|
|
|
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
|
|
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
|
|
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
|
|
|
mkdirSync(join(repo.path, '.takt'), { recursive: true });
|
|
writeFileSync(tasksFile, createPendingTasksYaml(12, piecePath, 'parallel-load'), 'utf-8');
|
|
|
|
// When: run all tasks
|
|
const result = runTakt({
|
|
args: ['run', '--provider', 'mock'],
|
|
cwd: repo.path,
|
|
env: {
|
|
...isolatedEnv.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
timeout: 240_000,
|
|
});
|
|
|
|
// Then: all tasks complete and queue becomes empty
|
|
expect(result.exitCode).toBe(0);
|
|
expect(result.stdout).toContain('Concurrency: 10');
|
|
expect(result.stdout).toContain('Tasks Summary');
|
|
const finalTasks = readTasks(tasksFile);
|
|
expect(finalTasks).toEqual([]);
|
|
}, 240_000);
|
|
|
|
it('should initialize project dirs and execute tasks after add+run when global config is absent', () => {
|
|
const envWithoutConfig = createEnvWithoutGlobalConfig();
|
|
|
|
try {
|
|
// Given: global config.yaml is absent and project config points to a mock piece path
|
|
const piecePath = resolve(__dirname, '../fixtures/pieces/mock-single-step.yaml');
|
|
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
|
|
const projectConfigDir = join(repo.path, '.takt');
|
|
const projectConfigPath = join(projectConfigDir, 'config.yaml');
|
|
mkdirSync(projectConfigDir, { recursive: true });
|
|
writeFileSync(projectConfigPath, `piece: ${piecePath}\npermissionMode: default\n`, 'utf-8');
|
|
|
|
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
|
|
|
|
// When: add 2 tasks and run once
|
|
const addResult1 = runTakt({
|
|
args: ['--provider', 'mock', 'add', 'Initialize flow task 1'],
|
|
cwd: repo.path,
|
|
env: {
|
|
...envWithoutConfig.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
timeout: 240_000,
|
|
});
|
|
|
|
const addResult2 = runTakt({
|
|
args: ['--provider', 'mock', 'add', 'Initialize flow task 2'],
|
|
cwd: repo.path,
|
|
env: {
|
|
...envWithoutConfig.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
timeout: 240_000,
|
|
});
|
|
|
|
const runResult = runTakt({
|
|
args: ['--provider', 'mock', 'run'],
|
|
cwd: repo.path,
|
|
env: {
|
|
...envWithoutConfig.env,
|
|
TAKT_MOCK_SCENARIO: scenarioPath,
|
|
},
|
|
timeout: 240_000,
|
|
});
|
|
|
|
// Then: tasks are persisted/executed correctly and project init artifacts exist
|
|
expect(addResult1.exitCode).toBe(0);
|
|
expect(addResult2.exitCode).toBe(0);
|
|
expect(runResult.exitCode).toBe(0);
|
|
|
|
const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
|
|
const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] };
|
|
expect(parsedFinal.tasks).toEqual([]);
|
|
|
|
const taskDirsRoot = join(repo.path, '.takt', 'tasks');
|
|
const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true })
|
|
.filter((entry) => entry.isDirectory())
|
|
.map((entry) => entry.name);
|
|
expect(taskDirs.length).toBe(2);
|
|
|
|
expect(existsSync(join(projectConfigDir, '.gitignore'))).toBe(true);
|
|
expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);
|
|
} finally {
|
|
envWithoutConfig.cleanup();
|
|
}
|
|
}, 240_000);
|
|
});
|