パラレルを改良

This commit is contained in:
nrslib 2026-02-08 17:09:26 +09:00
parent 2b30700fa1
commit c2aa22f97c
26 changed files with 1145 additions and 201 deletions

View File

@ -5,9 +5,11 @@ piece_categories:
- passthrough
- coding
- minimal
🔍 Review & Fix:
- compound-eye
🔍 Review:
pieces:
- review-fix-minimal
- review-only
🎨 Frontend: {}
⚙️ Backend: {}
🔧 Expert:
@ -33,6 +35,5 @@ piece_categories:
pieces:
- research
- magi
- review-only
show_others_category: true
others_category_name: Others

View File

@ -0,0 +1,110 @@
name: compound-eye
description: Multi-model review - send the same instruction to Claude and Codex simultaneously, synthesize both responses
max_iterations: 10
knowledge:
architecture: ../knowledge/architecture.md
personas:
coder: ../personas/coder.md
supervisor: ../personas/supervisor.md
initial_movement: evaluate
movements:
- name: evaluate
parallel:
- name: claude-eye
edit: false
persona: coder
provider: claude
session: refresh
knowledge: architecture
allowed_tools:
- Read
- Glob
- Grep
- Bash
- WebSearch
- WebFetch
rules:
- condition: done
- condition: failed
output_contracts:
report:
- name: 01-claude.md
- name: codex-eye
edit: false
persona: coder
provider: codex
session: refresh
knowledge: architecture
allowed_tools:
- Read
- Glob
- Grep
- Bash
- WebSearch
- WebFetch
rules:
- condition: done
- condition: failed
output_contracts:
report:
- name: 02-codex.md
rules:
- condition: any("done")
next: synthesize
- name: synthesize
edit: false
persona: supervisor
allowed_tools:
- Read
- Glob
- Grep
rules:
- condition: synthesis complete
next: COMPLETE
instruction_template: |
Two models (Claude / Codex) independently answered the same instruction.
Synthesize their responses.
**Tasks:**
1. Read reports in the Report Directory
- `01-claude.md` (Claude's response)
- `02-codex.md` (Codex's response)
Note: If one report is missing (model failed), synthesize from the available report only
2. If both reports exist, compare and clarify:
- Points of agreement
- Points of disagreement
- Points mentioned by only one model
3. Produce a synthesized conclusion
**Output format:**
```markdown
# Multi-Model Review Synthesis
## Conclusion
{Synthesized conclusion}
## Response Status
| Model | Status |
|-------|--------|
| Claude | ✅ / ❌ |
| Codex | ✅ / ❌ |
## Agreements
- {Points where both models agree}
## Disagreements
| Topic | Claude | Codex |
|-------|--------|-------|
| {topic} | {Claude's view} | {Codex's view} |
## Unique Findings
- **Claude only:** {Points only Claude mentioned}
- **Codex only:** {Points only Codex mentioned}
## Overall Assessment
{Overall assessment considering both responses}
```
output_contracts:
report:
- Summary: 03-synthesis.md

View File

@ -5,9 +5,11 @@ piece_categories:
- passthrough
- coding
- minimal
🔍 レビュー&修正:
- compound-eye
🔍 レビュー:
pieces:
- review-fix-minimal
- review-only
🎨 フロントエンド: {}
⚙️ バックエンド: {}
🔧 フルスタック:
@ -32,6 +34,5 @@ piece_categories:
pieces:
- research
- magi
- review-only
show_others_category: true
others_category_name: その他

View File

@ -0,0 +1,110 @@
name: compound-eye
description: 複眼レビュー - 同じ指示を Claude と Codex に同時に投げ、両者の回答を統合する
max_iterations: 10
knowledge:
architecture: ../knowledge/architecture.md
personas:
coder: ../personas/coder.md
supervisor: ../personas/supervisor.md
initial_movement: evaluate
movements:
- name: evaluate
parallel:
- name: claude-eye
edit: false
persona: coder
provider: claude
knowledge: architecture
allowed_tools:
- Read
- Glob
- Grep
- Bash
- WebSearch
- WebFetch
rules:
- condition: done
- condition: failed
output_contracts:
report:
- name: 01-claude.md
- name: codex-eye
edit: false
persona: coder
provider: codex
knowledge: architecture
allowed_tools:
- Read
- Glob
- Grep
- Bash
- WebSearch
- WebFetch
rules:
- condition: done
- condition: failed
output_contracts:
report:
- name: 02-codex.md
rules:
- condition: any("done")
next: synthesize
- name: synthesize
edit: false
persona: supervisor
allowed_tools:
- Read
- Glob
- Grep
rules:
- condition: 統合完了
next: COMPLETE
instruction_template: |
2つのモデルClaude / Codexが同じ指示に対して独立に回答しました。
両者の回答を統合してください。
**やること:**
1. Report Directory 内のレポートを読む
- `01-claude.md`Claude の回答)
- `02-codex.md`Codex の回答)
※ 片方が存在しない場合(エラーで失敗した場合)、存在するレポートのみで統合する
2. 両方のレポートがある場合は比較し、以下を明示する
- 一致している点
- 相違している点
- 片方だけが指摘・言及している点
3. 統合した結論を出す
**出力フォーマット:**
```markdown
# 複眼レビュー統合
## 結論
{統合した結論}
## 回答状況
| モデル | 状態 |
|--------|------|
| Claude | ✅ / ❌ |
| Codex | ✅ / ❌ |
## 一致点
- {両モデルが同じ見解を示した点}
## 相違点
| 論点 | Claude | Codex |
|------|--------|-------|
| {論点} | {Claudeの見解} | {Codexの見解} |
## 片方のみの指摘
- **Claude のみ:** {Claudeだけが言及した点}
- **Codex のみ:** {Codexだけが言及した点}
## 総合評価
{両者の回答を踏まえた総合的な評価}
```
output_contracts:
report:
- Summary: 03-synthesis.md

8
package-lock.json generated
View File

@ -9,7 +9,7 @@
"version": "0.8.0",
"license": "MIT",
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.2.34",
"@anthropic-ai/claude-agent-sdk": "^0.2.37",
"@openai/codex-sdk": "^0.98.0",
"chalk": "^5.3.0",
"commander": "^12.1.0",
@ -39,9 +39,9 @@
}
},
"node_modules/@anthropic-ai/claude-agent-sdk": {
"version": "0.2.34",
"resolved": "https://registry.npmjs.org/@anthropic-ai/claude-agent-sdk/-/claude-agent-sdk-0.2.34.tgz",
"integrity": "sha512-QLHd3Nt7bGU7/YH71fXFaztM9fNxGGruzTMrTYJkbm5gYJl5ZyU2zGyoE5VpWC0e1QU0yYdNdBVgqSYDcJGufg==",
"version": "0.2.37",
"resolved": "https://registry.npmjs.org/@anthropic-ai/claude-agent-sdk/-/claude-agent-sdk-0.2.37.tgz",
"integrity": "sha512-0TCAUuGXiWYV2JK+j2SiakGzPA7aoR5DNRxZ0EA571loGIqN3FRfiO1kipeBpEc+cRQ03a/4Kt5YAjMx0KBW+A==",
"license": "SEE LICENSE IN README.md",
"engines": {
"node": ">=18.0.0"

View File

@ -57,7 +57,7 @@
"builtins/"
],
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.2.34",
"@anthropic-ai/claude-agent-sdk": "^0.2.37",
"@openai/codex-sdk": "^0.98.0",
"chalk": "^5.3.0",
"commander": "^12.1.0",

View File

@ -0,0 +1,202 @@
/**
* PieceEngine integration tests: parallel movement partial failure handling.
*
* Covers:
* - One sub-movement fails while another succeeds piece continues
* - All sub-movements fail piece aborts
* - Failed sub-movement is recorded as blocked with error
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { existsSync, rmSync } from 'node:fs';
// --- Mock setup (must be before imports that use these modules) ---
vi.mock('../agents/runner.js', () => ({
runAgent: vi.fn(),
}));
vi.mock('../core/piece/evaluation/index.js', () => ({
detectMatchedRule: vi.fn(),
}));
vi.mock('../core/piece/phase-runner.js', () => ({
needsStatusJudgmentPhase: vi.fn().mockReturnValue(false),
runReportPhase: vi.fn().mockResolvedValue(undefined),
runStatusJudgmentPhase: vi.fn().mockResolvedValue(''),
}));
vi.mock('../shared/utils/index.js', async (importOriginal) => ({
...(await importOriginal<Record<string, unknown>>()),
generateReportDir: vi.fn().mockReturnValue('test-report-dir'),
}));
// --- Imports (after mocks) ---
import { PieceEngine } from '../core/piece/index.js';
import { runAgent } from '../agents/runner.js';
import { detectMatchedRule } from '../core/piece/index.js';
import {
makeResponse,
makeMovement,
makeRule,
mockDetectMatchedRuleSequence,
createTestTmpDir,
applyDefaultMocks,
} from './engine-test-helpers.js';
import type { PieceConfig } from '../core/models/index.js';
/**
* Build a piece config that goes directly to a parallel step:
* parallel-step (arch-review + security-review) done
*/
function buildParallelOnlyConfig(): PieceConfig {
return {
name: 'test-parallel-failure',
description: 'Test parallel failure handling',
maxIterations: 10,
initialMovement: 'reviewers',
movements: [
makeMovement('reviewers', {
parallel: [
makeMovement('arch-review', {
rules: [
makeRule('done', 'COMPLETE'),
makeRule('needs_fix', 'fix'),
],
}),
makeMovement('security-review', {
rules: [
makeRule('done', 'COMPLETE'),
makeRule('needs_fix', 'fix'),
],
}),
],
rules: [
makeRule('any("done")', 'done', {
isAggregateCondition: true,
aggregateType: 'any',
aggregateConditionText: 'done',
}),
makeRule('all("needs_fix")', 'fix', {
isAggregateCondition: true,
aggregateType: 'all',
aggregateConditionText: 'needs_fix',
}),
],
}),
makeMovement('done', {
rules: [
makeRule('completed', 'COMPLETE'),
],
}),
makeMovement('fix', {
rules: [
makeRule('fixed', 'reviewers'),
],
}),
],
};
}
describe('PieceEngine Integration: Parallel Movement Partial Failure', () => {
let tmpDir: string;
beforeEach(() => {
vi.resetAllMocks();
applyDefaultMocks();
tmpDir = createTestTmpDir();
});
afterEach(() => {
if (existsSync(tmpDir)) {
rmSync(tmpDir, { recursive: true, force: true });
}
});
it('should continue when one sub-movement fails but another succeeds', async () => {
const config = buildParallelOnlyConfig();
const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });
const mock = vi.mocked(runAgent);
// arch-review fails (exit code 1)
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));
// security-review succeeds
mock.mockResolvedValueOnce(
makeResponse({ persona: 'security-review', content: 'Security review passed' }),
);
// done step
mock.mockResolvedValueOnce(
makeResponse({ persona: 'done', content: 'Completed' }),
);
mockDetectMatchedRuleSequence([
// security-review sub-movement rule match (arch-review has no match — it failed)
{ index: 0, method: 'phase1_tag' }, // security-review → done
{ index: 0, method: 'aggregate' }, // reviewers → any("done") matches
{ index: 0, method: 'phase1_tag' }, // done → COMPLETE
]);
const state = await engine.run();
expect(state.status).toBe('completed');
// arch-review should be recorded as blocked
const archReviewOutput = state.movementOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.status).toBe('blocked');
expect(archReviewOutput!.error).toContain('exit');
// security-review should be recorded as done
const securityReviewOutput = state.movementOutputs.get('security-review');
expect(securityReviewOutput).toBeDefined();
expect(securityReviewOutput!.status).toBe('done');
});
it('should abort when all sub-movements fail', async () => {
const config = buildParallelOnlyConfig();
const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });
const mock = vi.mocked(runAgent);
// Both fail
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));
const abortFn = vi.fn();
engine.on('piece:abort', abortFn);
const state = await engine.run();
expect(state.status).toBe('aborted');
expect(abortFn).toHaveBeenCalledOnce();
const reason = abortFn.mock.calls[0]![1] as string;
expect(reason).toContain('All parallel sub-movements failed');
});
it('should record failed sub-movement error message in movementOutputs', async () => {
const config = buildParallelOnlyConfig();
const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });
const mock = vi.mocked(runAgent);
mock.mockRejectedValueOnce(new Error('Session resume failed'));
mock.mockResolvedValueOnce(
makeResponse({ persona: 'security-review', content: 'OK' }),
);
mock.mockResolvedValueOnce(
makeResponse({ persona: 'done', content: 'Done' }),
);
mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' },
{ index: 0, method: 'aggregate' },
{ index: 0, method: 'phase1_tag' },
]);
const state = await engine.run();
const archReviewOutput = state.movementOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.error).toBe('Session resume failed');
expect(archReviewOutput!.content).toBe('');
});
});

View File

@ -0,0 +1,29 @@
/**
* Tests for QueryExecutor stderr capture and SdkOptionsBuilder stderr passthrough.
*/
import { describe, it, expect } from 'vitest';
import { buildSdkOptions } from '../infra/claude/options-builder.js';
import type { ClaudeSpawnOptions } from '../infra/claude/types.js';
describe('SdkOptionsBuilder.build() — stderr', () => {
it('should include stderr callback in SDK options when onStderr is provided', () => {
const stderrHandler = (_data: string): void => {};
const spawnOptions: ClaudeSpawnOptions = {
cwd: '/tmp/test',
onStderr: stderrHandler,
};
const sdkOptions = buildSdkOptions(spawnOptions);
expect(sdkOptions.stderr).toBe(stderrHandler);
});
it('should not include stderr in SDK options when onStderr is not provided', () => {
const spawnOptions: ClaudeSpawnOptions = {
cwd: '/tmp/test',
};
const sdkOptions = buildSdkOptions(spawnOptions);
expect(sdkOptions).not.toHaveProperty('stderr');
});
});

View File

@ -1,5 +1,5 @@
/**
* Tests for runAllTasks concurrency support
* Tests for runAllTasks concurrency support (worker pool)
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
@ -21,7 +21,7 @@ import { loadGlobalConfig } from '../infra/config/index.js';
const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig);
const mockGetNextTask = vi.fn();
const mockGetNextTasks = vi.fn();
const mockClaimNextTasks = vi.fn();
const mockCompleteTask = vi.fn();
const mockFailTask = vi.fn();
@ -29,7 +29,7 @@ vi.mock('../infra/task/index.js', async (importOriginal) => ({
...(await importOriginal<Record<string, unknown>>()),
TaskRunner: vi.fn().mockImplementation(() => ({
getNextTask: mockGetNextTask,
getNextTasks: mockGetNextTasks,
claimNextTasks: mockClaimNextTasks,
completeTask: mockCompleteTask,
failTask: mockFailTask,
})),
@ -147,7 +147,7 @@ describe('runAllTasks concurrency', () => {
it('should show no-tasks message when no tasks exist', async () => {
// Given: No pending tasks
mockGetNextTasks.mockReturnValue([]);
mockClaimNextTasks.mockReturnValue([]);
// When
await runAllTasks('/project');
@ -156,21 +156,21 @@ describe('runAllTasks concurrency', () => {
expect(mockInfo).toHaveBeenCalledWith('No pending tasks in .takt/tasks/');
});
it('should execute tasks sequentially when concurrency is 1', async () => {
it('should execute tasks sequentially via worker pool when concurrency is 1', async () => {
// Given: Two tasks available sequentially
const task1 = createTask('task-1');
const task2 = createTask('task-2');
mockGetNextTasks.mockReturnValueOnce([task1]);
mockGetNextTask
.mockReturnValueOnce(task2)
.mockReturnValueOnce(null);
mockClaimNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([task2])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: Sequential execution uses getNextTask in the while loop
expect(mockGetNextTask).toHaveBeenCalled();
// Then: Worker pool uses claimNextTasks for fetching more tasks
expect(mockClaimNextTasks).toHaveBeenCalled();
expect(mockStatus).toHaveBeenCalledWith('Total', '2');
});
});
@ -188,7 +188,7 @@ describe('runAllTasks concurrency', () => {
it('should display concurrency info when concurrency > 1', async () => {
// Given: Tasks available
const task1 = createTask('task-1');
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([]);
@ -199,29 +199,32 @@ describe('runAllTasks concurrency', () => {
expect(mockInfo).toHaveBeenCalledWith('Concurrency: 3');
});
it('should execute tasks in batch when concurrency > 1', async () => {
// Given: 3 tasks available in first batch
it('should execute tasks using worker pool when concurrency > 1', async () => {
// Given: 3 tasks available
const task1 = createTask('task-1');
const task2 = createTask('task-2');
const task3 = createTask('task-3');
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce([task1, task2, task3])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: Batch info shown
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ===');
// Then: Task names displayed
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-1 ===');
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-2 ===');
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-3 ===');
expect(mockStatus).toHaveBeenCalledWith('Total', '3');
});
it('should process multiple batches', async () => {
// Given: 5 tasks, concurrency=3 → batch1 (3 tasks), batch2 (2 tasks)
it('should fill slots as tasks complete (worker pool behavior)', async () => {
// Given: 5 tasks, concurrency=3
// Worker pool should start 3, then fill slots as tasks complete
const tasks = Array.from({ length: 5 }, (_, i) => createTask(`task-${i + 1}`));
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce(tasks.slice(0, 3))
.mockReturnValueOnce(tasks.slice(3, 5))
.mockReturnValueOnce([]);
@ -229,42 +232,9 @@ describe('runAllTasks concurrency', () => {
// When
await runAllTasks('/project');
// Then: Both batches shown
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ===');
expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 2 task(s) ===');
// Then: All 5 tasks executed
expect(mockStatus).toHaveBeenCalledWith('Total', '5');
});
it('should not use getNextTask in parallel mode', async () => {
// Given: Tasks in parallel mode
const task1 = createTask('task-1');
mockGetNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: getNextTask should not be called (parallel uses getNextTasks)
expect(mockGetNextTask).not.toHaveBeenCalled();
});
it('should list task names in batch output', async () => {
// Given: Tasks with specific names
const task1 = createTask('auth-feature');
const task2 = createTask('db-migration');
mockGetNextTasks
.mockReturnValueOnce([task1, task2])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then
expect(mockInfo).toHaveBeenCalledWith(' - auth-feature');
expect(mockInfo).toHaveBeenCalledWith(' - db-migration');
});
});
describe('default concurrency', () => {
@ -278,8 +248,9 @@ describe('runAllTasks concurrency', () => {
});
const task1 = createTask('task-1');
mockGetNextTasks.mockReturnValueOnce([task1]);
mockGetNextTask.mockReturnValueOnce(null);
mockClaimNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
@ -311,7 +282,7 @@ describe('runAllTasks concurrency', () => {
mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never);
});
it('should run batch tasks concurrently, not sequentially', async () => {
it('should run tasks concurrently, not sequentially', async () => {
// Given: 2 tasks with delayed execution to verify concurrency
const task1 = createTask('slow-1');
const task2 = createTask('slow-2');
@ -329,7 +300,7 @@ describe('runAllTasks concurrency', () => {
});
});
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce([task1, task2])
.mockReturnValueOnce([]);
@ -345,7 +316,48 @@ describe('runAllTasks concurrency', () => {
expect(elapsed).toBeLessThan(150);
});
it('should count partial failures correctly in a batch', async () => {
it('should fill slots immediately when a task completes (no batch waiting)', async () => {
// Given: 3 tasks, concurrency=2, task1 finishes quickly, task2 takes longer
mockLoadGlobalConfig.mockReturnValue({
language: 'en',
defaultPiece: 'default',
logLevel: 'info',
concurrency: 2,
});
const task1 = createTask('fast');
const task2 = createTask('slow');
const task3 = createTask('after-fast');
const executionOrder: string[] = [];
mockExecutePiece.mockImplementation((_config, task) => {
executionOrder.push(`start:${task}`);
const delay = (task as string).includes('slow') ? 80 : 20;
return new Promise((resolve) => {
setTimeout(() => {
executionOrder.push(`end:${task}`);
resolve({ success: true });
}, delay);
});
});
mockClaimNextTasks
.mockReturnValueOnce([task1, task2])
.mockReturnValueOnce([task3])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: task3 starts before task2 finishes (slot filled immediately)
const task3StartIdx = executionOrder.indexOf('start:Task: after-fast');
const task2EndIdx = executionOrder.indexOf('end:Task: slow');
expect(task3StartIdx).toBeLessThan(task2EndIdx);
expect(mockStatus).toHaveBeenCalledWith('Total', '3');
});
it('should count partial failures correctly', async () => {
// Given: 3 tasks, 1 fails, 2 succeed
const task1 = createTask('pass-1');
const task2 = createTask('fail-1');
@ -358,7 +370,7 @@ describe('runAllTasks concurrency', () => {
return Promise.resolve({ success: callIndex !== 2 });
});
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce([task1, task2, task3])
.mockReturnValueOnce([]);
@ -371,29 +383,29 @@ describe('runAllTasks concurrency', () => {
expect(mockStatus).toHaveBeenCalledWith('Failed', '1', 'red');
});
it('should pass abortSignal and quiet=true to executePiece in parallel mode', async () => {
it('should pass abortSignal and taskPrefix to executePiece in parallel mode', async () => {
// Given: One task in parallel mode
const task1 = createTask('parallel-task');
mockExecutePiece.mockResolvedValue({ success: true });
mockGetNextTasks
mockClaimNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: executePiece received abortSignal and quiet options
// Then: executePiece received abortSignal and taskPrefix options
expect(mockExecutePiece).toHaveBeenCalledTimes(1);
const callArgs = mockExecutePiece.mock.calls[0];
const pieceOptions = callArgs?.[3]; // 4th argument is options
expect(pieceOptions).toHaveProperty('abortSignal');
expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal);
expect(pieceOptions).toHaveProperty('quiet', true);
expect(pieceOptions).toHaveProperty('taskPrefix', 'parallel-task');
});
it('should not pass abortSignal or quiet in sequential mode', async () => {
it('should not pass abortSignal or taskPrefix in sequential mode', async () => {
// Given: Sequential mode
mockLoadGlobalConfig.mockReturnValue({
language: 'en',
@ -406,18 +418,19 @@ describe('runAllTasks concurrency', () => {
mockExecutePiece.mockResolvedValue({ success: true });
mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never);
mockGetNextTasks.mockReturnValueOnce([task1]);
mockGetNextTask.mockReturnValueOnce(null);
mockClaimNextTasks
.mockReturnValueOnce([task1])
.mockReturnValueOnce([]);
// When
await runAllTasks('/project');
// Then: executePiece should not have abortSignal or quiet
// Then: executePiece should not have abortSignal or taskPrefix
expect(mockExecutePiece).toHaveBeenCalledTimes(1);
const callArgs = mockExecutePiece.mock.calls[0];
const pieceOptions = callArgs?.[3];
expect(pieceOptions?.abortSignal).toBeUndefined();
expect(pieceOptions?.quiet).toBeFalsy();
expect(pieceOptions?.taskPrefix).toBeUndefined();
});
});
});

View File

@ -0,0 +1,53 @@
/**
* Tests for session key generation
*/
import { describe, it, expect } from 'vitest';
import { buildSessionKey } from '../core/piece/session-key.js';
import type { PieceMovement } from '../core/models/types.js';
function createMovement(overrides: Partial<PieceMovement> = {}): PieceMovement {
return {
name: 'test-movement',
personaDisplayName: 'test',
edit: false,
instructionTemplate: '',
passPreviousResponse: true,
...overrides,
};
}
describe('buildSessionKey', () => {
it('should use persona as base key when persona is set', () => {
const step = createMovement({ persona: 'coder', name: 'implement' });
expect(buildSessionKey(step)).toBe('coder');
});
it('should use name as base key when persona is not set', () => {
const step = createMovement({ persona: undefined, name: 'plan' });
expect(buildSessionKey(step)).toBe('plan');
});
it('should append provider when provider is specified', () => {
const step = createMovement({ persona: 'coder', provider: 'claude' });
expect(buildSessionKey(step)).toBe('coder:claude');
});
it('should use name with provider when persona is not set', () => {
const step = createMovement({ persona: undefined, name: 'review', provider: 'codex' });
expect(buildSessionKey(step)).toBe('review:codex');
});
it('should produce different keys for same persona with different providers', () => {
const claudeStep = createMovement({ persona: 'coder', provider: 'claude', name: 'claude-eye' });
const codexStep = createMovement({ persona: 'coder', provider: 'codex', name: 'codex-eye' });
expect(buildSessionKey(claudeStep)).not.toBe(buildSessionKey(codexStep));
expect(buildSessionKey(claudeStep)).toBe('coder:claude');
expect(buildSessionKey(codexStep)).toBe('coder:codex');
});
it('should not append provider when provider is undefined', () => {
const step = createMovement({ persona: 'coder', provider: undefined });
expect(buildSessionKey(step)).toBe('coder');
});
});

View File

@ -144,49 +144,114 @@ describe('TaskRunner', () => {
});
});
describe('getNextTasks', () => {
describe('claimNextTasks', () => {
it('should return empty array when no tasks', () => {
const tasks = runner.getNextTasks(3);
const tasks = runner.claimNextTasks(3);
expect(tasks).toEqual([]);
});
it('should return all tasks when count exceeds available tasks', () => {
it('should return tasks up to the requested count', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
const tasks = runner.getNextTasks(5);
expect(tasks).toHaveLength(2);
expect(tasks[0]?.name).toBe('a-task');
expect(tasks[1]?.name).toBe('b-task');
});
it('should return only count tasks when more are available', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
writeFileSync(join(tasksDir, 'c-task.md'), 'C');
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
const tasks = runner.getNextTasks(2);
const tasks = runner.claimNextTasks(2);
expect(tasks).toHaveLength(2);
expect(tasks[0]?.name).toBe('a-task');
expect(tasks[1]?.name).toBe('b-task');
});
it('should return tasks in same sort order as getNextTask', () => {
it('should not return already claimed tasks on subsequent calls', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, '02-second.md'), 'Second');
writeFileSync(join(tasksDir, '01-first.md'), 'First');
writeFileSync(join(tasksDir, '03-third.md'), 'Third');
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
writeFileSync(join(tasksDir, 'c-task.md'), 'C');
const nextTask = runner.getNextTask();
const nextTasks = runner.getNextTasks(1);
// Given: first call claims a-task
const first = runner.claimNextTasks(1);
expect(first).toHaveLength(1);
expect(first[0]?.name).toBe('a-task');
expect(nextTasks).toHaveLength(1);
expect(nextTasks[0]?.name).toBe(nextTask?.name);
// When: second call should skip a-task
const second = runner.claimNextTasks(1);
expect(second).toHaveLength(1);
expect(second[0]?.name).toBe('b-task');
// When: third call should skip a-task and b-task
const third = runner.claimNextTasks(1);
expect(third).toHaveLength(1);
expect(third[0]?.name).toBe('c-task');
// When: fourth call should return empty (all claimed)
const fourth = runner.claimNextTasks(1);
expect(fourth).toEqual([]);
});
it('should release claim after completeTask', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, 'task-a.md'), 'Task A content');
// Given: claim the task
const claimed = runner.claimNextTasks(1);
expect(claimed).toHaveLength(1);
// When: complete the task (file is moved away)
runner.completeTask({
task: claimed[0]!,
success: true,
response: 'Done',
executionLog: [],
startedAt: '2024-01-01T00:00:00.000Z',
completedAt: '2024-01-01T00:01:00.000Z',
});
// Then: claim set no longer blocks (but file is moved, so no tasks anyway)
const next = runner.claimNextTasks(1);
expect(next).toEqual([]);
});
it('should release claim after failTask', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, 'task-a.md'), 'Task A content');
// Given: claim the task
const claimed = runner.claimNextTasks(1);
expect(claimed).toHaveLength(1);
// When: fail the task (file is moved away)
runner.failTask({
task: claimed[0]!,
success: false,
response: 'Error',
executionLog: [],
startedAt: '2024-01-01T00:00:00.000Z',
completedAt: '2024-01-01T00:01:00.000Z',
});
// Then: claim set no longer blocks
const next = runner.claimNextTasks(1);
expect(next).toEqual([]);
});
it('should not affect getNextTask (unclaimed access)', () => {
const tasksDir = join(testDir, '.takt', 'tasks');
mkdirSync(tasksDir, { recursive: true });
writeFileSync(join(tasksDir, 'a-task.md'), 'A');
writeFileSync(join(tasksDir, 'b-task.md'), 'B');
// Given: claim a-task via claimNextTasks
runner.claimNextTasks(1);
// When: getNextTask is called (no claim filtering)
const task = runner.getNextTask();
// Then: getNextTask still returns first task (including claimed)
expect(task?.name).toBe('a-task');
});
});

View File

@ -0,0 +1,230 @@
/**
* Unit tests for runWithWorkerPool
*/
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { TaskInfo } from '../infra/task/index.js';
vi.mock('../shared/ui/index.js', () => ({
header: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
success: vi.fn(),
status: vi.fn(),
blankLine: vi.fn(),
}));
vi.mock('../shared/exitCodes.js', () => ({
EXIT_SIGINT: 130,
}));
vi.mock('../shared/i18n/index.js', () => ({
getLabel: vi.fn((key: string) => key),
}));
const mockExecuteAndCompleteTask = vi.fn();
vi.mock('../features/tasks/execute/taskExecution.js', () => ({
executeAndCompleteTask: (...args: unknown[]) => mockExecuteAndCompleteTask(...args),
}));
import { runWithWorkerPool } from '../features/tasks/execute/parallelExecution.js';
import { info } from '../shared/ui/index.js';
const mockInfo = vi.mocked(info);
function createTask(name: string): TaskInfo {
return {
name,
content: `Task: ${name}`,
filePath: `/tasks/${name}.yaml`,
};
}
function createMockTaskRunner(taskBatches: TaskInfo[][]) {
let batchIndex = 0;
return {
getNextTask: vi.fn(() => null),
claimNextTasks: vi.fn(() => {
const batch = taskBatches[batchIndex] ?? [];
batchIndex++;
return batch;
}),
completeTask: vi.fn(),
failTask: vi.fn(),
};
}
beforeEach(() => {
vi.clearAllMocks();
mockExecuteAndCompleteTask.mockResolvedValue(true);
});
describe('runWithWorkerPool', () => {
it('should return correct counts for all successful tasks', async () => {
// Given
const tasks = [createTask('a'), createTask('b')];
const runner = createMockTaskRunner([]);
// When
const result = await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default');
// Then
expect(result).toEqual({ success: 2, fail: 0 });
});
it('should return correct counts when some tasks fail', async () => {
// Given
const tasks = [createTask('pass'), createTask('fail'), createTask('pass2')];
let callIdx = 0;
mockExecuteAndCompleteTask.mockImplementation(() => {
callIdx++;
return Promise.resolve(callIdx !== 2);
});
const runner = createMockTaskRunner([]);
// When
const result = await runWithWorkerPool(runner as never, tasks, 3, '/cwd', 'default');
// Then
expect(result).toEqual({ success: 2, fail: 1 });
});
it('should display task name for each task', async () => {
// Given
const tasks = [createTask('alpha'), createTask('beta')];
const runner = createMockTaskRunner([]);
// When
await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default');
// Then
expect(mockInfo).toHaveBeenCalledWith('=== Task: alpha ===');
expect(mockInfo).toHaveBeenCalledWith('=== Task: beta ===');
});
it('should pass taskPrefix for parallel execution (concurrency > 1)', async () => {
// Given
const tasks = [createTask('my-task')];
const runner = createMockTaskRunner([]);
// When
await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default');
// Then
expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1);
const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5];
expect(parallelOpts).toEqual({
abortSignal: expect.any(AbortSignal),
taskPrefix: 'my-task',
});
});
it('should not pass taskPrefix or abortSignal for sequential execution (concurrency = 1)', async () => {
// Given
const tasks = [createTask('seq-task')];
const runner = createMockTaskRunner([]);
// When
await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default');
// Then
expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1);
const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5];
expect(parallelOpts).toEqual({
abortSignal: undefined,
taskPrefix: undefined,
});
});
it('should fetch more tasks when slots become available', async () => {
// Given: 1 initial task, runner provides 1 more after
const task1 = createTask('first');
const task2 = createTask('second');
const runner = createMockTaskRunner([[task2]]);
// When
await runWithWorkerPool(runner as never, [task1], 2, '/cwd', 'default');
// Then
expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(2);
expect(runner.claimNextTasks).toHaveBeenCalled();
});
it('should respect concurrency limit', async () => {
// Given: 4 tasks, concurrency=2
const tasks = Array.from({ length: 4 }, (_, i) => createTask(`task-${i}`));
let activeCount = 0;
let maxActive = 0;
mockExecuteAndCompleteTask.mockImplementation(() => {
activeCount++;
maxActive = Math.max(maxActive, activeCount);
return new Promise((resolve) => {
setTimeout(() => {
activeCount--;
resolve(true);
}, 20);
});
});
const runner = createMockTaskRunner([]);
// When
await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default');
// Then: Never exceeded concurrency of 2
expect(maxActive).toBeLessThanOrEqual(2);
expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(4);
});
it('should pass abortSignal to all parallel tasks', async () => {
// Given: Multiple tasks in parallel mode
const tasks = [createTask('task-1'), createTask('task-2'), createTask('task-3')];
const runner = createMockTaskRunner([]);
const receivedSignals: (AbortSignal | undefined)[] = [];
mockExecuteAndCompleteTask.mockImplementation((_task, _runner, _cwd, _piece, _opts, parallelOpts) => {
receivedSignals.push(parallelOpts?.abortSignal);
return Promise.resolve(true);
});
// When
await runWithWorkerPool(runner as never, tasks, 3, '/cwd', 'default');
// Then: All tasks received the same AbortSignal
expect(receivedSignals).toHaveLength(3);
const firstSignal = receivedSignals[0];
expect(firstSignal).toBeInstanceOf(AbortSignal);
for (const signal of receivedSignals) {
expect(signal).toBe(firstSignal);
}
});
it('should handle empty initial tasks', async () => {
// Given: No tasks
const runner = createMockTaskRunner([]);
// When
const result = await runWithWorkerPool(runner as never, [], 2, '/cwd', 'default');
// Then
expect(result).toEqual({ success: 0, fail: 0 });
expect(mockExecuteAndCompleteTask).not.toHaveBeenCalled();
});
it('should handle task promise rejection gracefully', async () => {
// Given: Task that throws
const tasks = [createTask('throws')];
mockExecuteAndCompleteTask.mockRejectedValue(new Error('boom'));
const runner = createMockTaskRunner([]);
// When
const result = await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default');
// Then: Treated as failure
expect(result).toEqual({ success: 0, fail: 1 });
});
});

View File

@ -19,6 +19,7 @@ import { runAgent } from '../../../agents/runner.js';
import { InstructionBuilder, isOutputContractItem } from '../instruction/InstructionBuilder.js';
import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js';
import { detectMatchedRule } from '../evaluation/index.js';
import { buildSessionKey } from '../session-key.js';
import { incrementMovementIteration, getPreviousOutput } from './state-manager.js';
import { createLogger } from '../../../shared/utils/index.js';
import type { OptionsBuilder } from './OptionsBuilder.js';
@ -100,7 +101,7 @@ export class MovementExecutor {
? state.movementIterations.get(step.name) ?? 1
: incrementMovementIteration(state, step.name);
const instruction = prebuiltInstruction ?? this.buildInstruction(step, movementIteration, state, task, maxIterations);
const sessionKey = step.persona ?? step.name;
const sessionKey = buildSessionKey(step);
log.debug('Running movement', {
movement: step.name,
persona: step.persona ?? '(none)',

View File

@ -10,6 +10,7 @@ import type { PieceMovement, PieceState, Language } from '../../models/types.js'
import type { RunAgentOptions } from '../../../agents/runner.js';
import type { PhaseRunnerContext } from '../phase-runner.js';
import type { PieceEngineOptions, PhaseName } from '../types.js';
import { buildSessionKey } from '../session-key.js';
export class OptionsBuilder {
constructor(
@ -66,7 +67,7 @@ export class OptionsBuilder {
return {
...this.buildBaseOptions(step),
sessionId: shouldResumeSession ? this.getSessionId(step.persona ?? step.name) : undefined,
sessionId: shouldResumeSession ? this.getSessionId(buildSessionKey(step)) : undefined,
allowedTools,
mcpServers: step.mcpServers,
};

View File

@ -15,7 +15,8 @@ import { ParallelLogger } from './parallel-logger.js';
import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js';
import { detectMatchedRule } from '../evaluation/index.js';
import { incrementMovementIteration } from './state-manager.js';
import { createLogger } from '../../../shared/utils/index.js';
import { createLogger, getErrorMessage } from '../../../shared/utils/index.js';
import { buildSessionKey } from '../session-key.js';
import type { OptionsBuilder } from './OptionsBuilder.js';
import type { MovementExecutor } from './MovementExecutor.js';
import type { PieceEngineOptions, PhaseName } from '../types.js';
@ -86,12 +87,17 @@ export class ParallelRunner {
callAiJudge: this.deps.callAiJudge,
};
// Run all sub-movements concurrently
const subResults = await Promise.all(
// Run all sub-movements concurrently (failures are captured, not thrown)
const settled = await Promise.allSettled(
subMovements.map(async (subMovement, index) => {
const subIteration = incrementMovementIteration(state, subMovement.name);
const subInstruction = this.deps.movementExecutor.buildInstruction(subMovement, subIteration, state, task, maxIterations);
// Session key uses buildSessionKey (persona:provider) — same as normal movements.
// This ensures sessions are shared across movements with the same persona+provider,
// while different providers (e.g., claude-eye vs codex-eye) get separate sessions.
const subSessionKey = buildSessionKey(subMovement);
// Phase 1: main execution (Write excluded if sub-movement has report)
const baseOptions = this.deps.optionsBuilder.buildAgentOptions(subMovement);
@ -100,13 +106,12 @@ export class ParallelRunner {
? { ...baseOptions, onStream: parallelLogger.createStreamHandler(subMovement.name, index) }
: baseOptions;
const subSessionKey = subMovement.persona ?? subMovement.name;
this.deps.onPhaseStart?.(subMovement, 1, 'execute', subInstruction);
const subResponse = await runAgent(subMovement.persona, subInstruction, agentOptions);
updatePersonaSession(subSessionKey, subResponse.sessionId);
this.deps.onPhaseComplete?.(subMovement, 1, 'execute', subResponse.content, subResponse.status, subResponse.error);
// Build phase context for this sub-movement with its lastResponse
// Phase 2/3 context — no overrides needed, phase-runner uses buildSessionKey internally
const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, subResponse.content, updatePersonaSession, this.deps.onPhaseStart, this.deps.onPhaseComplete);
// Phase 2: report output for sub-movement
@ -132,6 +137,32 @@ export class ParallelRunner {
}),
);
// Map settled results: fulfilled → as-is, rejected → blocked AgentResponse
const subResults = settled.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
}
const failedMovement = subMovements[index]!;
const errorMsg = getErrorMessage(result.reason);
log.error('Sub-movement failed', { movement: failedMovement.name, error: errorMsg });
const blockedResponse: AgentResponse = {
persona: failedMovement.name,
status: 'blocked',
content: '',
timestamp: new Date(),
error: errorMsg,
};
state.movementOutputs.set(failedMovement.name, blockedResponse);
return { subMovement: failedMovement, response: blockedResponse, instruction: '' };
});
// If all sub-movements failed (error-originated), throw
const allFailed = subResults.every(r => r.response.error != null);
if (allFailed) {
const errors = subResults.map(r => `${r.subMovement.name}: ${r.response.error}`).join('; ');
throw new Error(`All parallel sub-movements failed: ${errors}`);
}
// Print completion summary
if (parallelLogger) {
parallelLogger.printSummary(

View File

@ -14,6 +14,7 @@ import { ReportInstructionBuilder } from './instruction/ReportInstructionBuilder
import { hasTagBasedRules, getReportFiles } from './evaluation/rule-utils.js';
import { JudgmentStrategyFactory, type JudgmentContext } from './judgment/index.js';
import { createLogger } from '../../shared/utils/index.js';
import { buildSessionKey } from './session-key.js';
const log = createLogger('phase-runner');
@ -75,7 +76,7 @@ export async function runReportPhase(
movementIteration: number,
ctx: PhaseRunnerContext,
): Promise<void> {
const sessionKey = step.persona ?? step.name;
const sessionKey = buildSessionKey(step);
let currentSessionId = ctx.getSessionId(sessionKey);
if (!currentSessionId) {
throw new Error(`Report phase requires a session to resume, but no sessionId found for persona "${sessionKey}" in movement "${step.name}"`);
@ -159,7 +160,7 @@ export async function runStatusJudgmentPhase(
// フォールバック戦略を順次試行AutoSelectStrategy含む
const strategies = JudgmentStrategyFactory.createStrategies();
const sessionKey = step.persona ?? step.name;
const sessionKey = buildSessionKey(step);
const judgmentContext: JudgmentContext = {
step,
cwd: ctx.cwd,

View File

@ -0,0 +1,29 @@
/**
* Session key generation for persona sessions.
*
* When multiple movements share the same persona but use different providers
* (e.g., claude-eye uses Claude, codex-eye uses Codex, both with persona "coder"),
* sessions must be keyed by provider to prevent cross-provider contamination.
*
* Without provider in the key, a Codex session ID could overwrite a Claude session,
* causing Claude to attempt resuming a non-existent session file (exit code 1).
*/
import type { PieceMovement } from '../models/types.js';
/**
* Build a unique session key for a movement.
*
* - Base key: `step.persona ?? step.name`
* - If the movement specifies a provider, appends `:{provider}` to disambiguate
*
* Examples:
* - persona="coder", provider=undefined "coder"
* - persona="coder", provider="claude" "coder:claude"
* - persona="coder", provider="codex" "coder:codex"
* - persona=undefined, name="plan" "plan"
*/
export function buildSessionKey(step: PieceMovement): string {
const base = step.persona ?? step.name;
return step.provider ? `${base}:${step.provider}` : base;
}

View File

@ -8,6 +8,7 @@
*/
import * as readline from 'node:readline';
import { StringDecoder } from 'node:string_decoder';
import { stripAnsi, getDisplayWidth } from '../../shared/utils/text.js';
/** Escape sequences for terminal protocol control */
@ -418,9 +419,12 @@ export function readMultilineInput(prompt: string): Promise<string | null> {
// --- Input dispatch ---
const utf8Decoder = new StringDecoder('utf8');
function onData(data: Buffer): void {
try {
const str = data.toString('utf-8');
const str = utf8Decoder.write(data);
if (!str) return;
parseInputData(str, {
onPasteStart() { state = 'paste'; },

View File

@ -1,8 +1,10 @@
/**
* Parallel task execution strategy.
* Worker pool task execution strategy.
*
* Runs tasks in batches of up to `concurrency` tasks at a time.
* Uses a single AbortController shared across all tasks in all batches.
* Runs tasks using a fixed-size worker pool. Each worker picks up the next
* available task as soon as it finishes the current one, maximizing slot
* utilization. Works for both sequential (concurrency=1) and parallel
* (concurrency>1) execution through the same code path.
*/
import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js';
@ -11,61 +13,73 @@ import { executeAndCompleteTask } from './taskExecution.js';
import { installSigIntHandler } from './sigintHandler.js';
import type { TaskExecutionOptions } from './types.js';
interface BatchResult {
export interface WorkerPoolResult {
success: number;
fail: number;
}
/**
* Run tasks in parallel batches.
* Run tasks using a worker pool with the given concurrency.
*
* @returns Aggregated success/fail counts across all batches
* Algorithm:
* 1. Create a shared AbortController
* 2. Maintain a queue of pending tasks and a set of active promises
* 3. Fill available slots from the queue
* 4. Wait for any active task to complete (Promise.race)
* 5. Record result, fill freed slot from queue
* 6. Repeat until queue is empty and all active tasks complete
*/
export async function runParallel(
export async function runWithWorkerPool(
taskRunner: TaskRunner,
initialTasks: TaskInfo[],
concurrency: number,
cwd: string,
pieceName: string,
options?: TaskExecutionOptions,
): Promise<BatchResult> {
): Promise<WorkerPoolResult> {
const abortController = new AbortController();
const { cleanup } = installSigIntHandler(() => abortController.abort());
let successCount = 0;
let failCount = 0;
try {
let batch = initialTasks;
while (batch.length > 0) {
blankLine();
info(`=== Running batch of ${batch.length} task(s) ===`);
for (const task of batch) {
info(` - ${task.name}`);
}
blankLine();
const queue = [...initialTasks];
const active = new Map<Promise<boolean>, TaskInfo>();
const results = await Promise.all(
batch.map((task) =>
executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
abortSignal: abortController.signal,
}),
),
try {
while (queue.length > 0 || active.size > 0) {
if (abortController.signal.aborted) {
break;
}
fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController);
if (active.size === 0) {
break;
}
const settled = await Promise.race(
[...active.keys()].map((p) => p.then(
(result) => ({ promise: p, result }),
() => ({ promise: p, result: false }),
)),
);
for (const taskSuccess of results) {
if (taskSuccess) {
const task = active.get(settled.promise);
active.delete(settled.promise);
if (task) {
if (settled.result) {
successCount++;
} else {
failCount++;
}
}
if (abortController.signal.aborted) {
break;
if (!abortController.signal.aborted && queue.length === 0) {
const nextTasks = taskRunner.claimNextTasks(concurrency - active.size);
queue.push(...nextTasks);
}
batch = taskRunner.getNextTasks(concurrency);
}
} finally {
cleanup();
@ -73,3 +87,28 @@ export async function runParallel(
return { success: successCount, fail: failCount };
}
function fillSlots(
queue: TaskInfo[],
active: Map<Promise<boolean>, TaskInfo>,
concurrency: number,
taskRunner: TaskRunner,
cwd: string,
pieceName: string,
options: TaskExecutionOptions | undefined,
abortController: AbortController,
): void {
while (active.size < concurrency && queue.length > 0) {
const task = queue.shift()!;
const isParallel = concurrency > 1;
blankLine();
info(`=== Task: ${task.name} ===`);
const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
abortSignal: isParallel ? abortController.signal : undefined,
taskPrefix: isParallel ? task.name : undefined,
});
active.set(promise, task);
}
}

View File

@ -322,9 +322,10 @@ export async function executePiece(
const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name);
const totalMovements = pieceConfig.movements.length;
// Use quiet mode: forced quiet in parallel execution, or CLI/config setting
const quiet = options.quiet === true || isQuietMode();
displayRef.current = new StreamDisplay(step.personaDisplayName, quiet, {
const quiet = isQuietMode();
const prefix = options.taskPrefix;
const agentLabel = prefix ? `${prefix}:${step.personaDisplayName}` : step.personaDisplayName;
displayRef.current = new StreamDisplay(agentLabel, quiet, {
iteration,
maxIterations: pieceConfig.maxIterations,
movementIndex: movementIndex >= 0 ? movementIndex : 0,

View File

@ -17,7 +17,7 @@ import { executePiece } from './pieceExecution.js';
import { DEFAULT_PIECE_NAME } from '../../../shared/constants.js';
import type { TaskExecutionOptions, ExecuteTaskOptions } from './types.js';
import { createPullRequest, buildPrBody, pushBranch } from '../../../infra/github/index.js';
import { runParallel } from './parallelExecution.js';
import { runWithWorkerPool } from './parallelExecution.js';
import { resolveTaskExecution } from './resolveTask.js';
export type { TaskExecutionOptions, ExecuteTaskOptions };
@ -28,7 +28,7 @@ const log = createLogger('task');
* Execute a single task with piece.
*/
export async function executeTask(options: ExecuteTaskOptions): Promise<boolean> {
const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, quiet } = options;
const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix } = options;
const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd);
if (!pieceConfig) {
@ -58,7 +58,7 @@ export async function executeTask(options: ExecuteTaskOptions): Promise<boolean>
startMovement,
retryNote,
abortSignal,
quiet,
taskPrefix,
});
return result.success;
}
@ -77,7 +77,7 @@ export async function executeAndCompleteTask(
cwd: string,
pieceName: string,
options?: TaskExecutionOptions,
parallelOptions?: { abortSignal: AbortSignal },
parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string },
): Promise<boolean> {
const startedAt = new Date().toISOString();
const executionLog: string[] = [];
@ -95,7 +95,7 @@ export async function executeAndCompleteTask(
startMovement,
retryNote,
abortSignal: parallelOptions?.abortSignal,
quiet: parallelOptions !== undefined,
taskPrefix: parallelOptions?.taskPrefix,
});
const completedAt = new Date().toISOString();
@ -167,44 +167,11 @@ export async function executeAndCompleteTask(
}
}
/**
* Run tasks sequentially, fetching one at a time.
*/
async function runSequential(
taskRunner: TaskRunner,
initialTask: TaskInfo,
cwd: string,
pieceName: string,
options?: TaskExecutionOptions,
): Promise<{ success: number; fail: number }> {
let successCount = 0;
let failCount = 0;
let task: TaskInfo | undefined = initialTask;
while (task) {
blankLine();
info(`=== Task: ${task.name} ===`);
blankLine();
const taskSuccess = await executeAndCompleteTask(task, taskRunner, cwd, pieceName, options);
if (taskSuccess) {
successCount++;
} else {
failCount++;
}
task = taskRunner.getNextTask() ?? undefined;
}
return { success: successCount, fail: failCount };
}
/**
* Run all pending tasks from .takt/tasks/
*
* concurrency=1: 逐次実行
* concurrency=N (N>1): N個のタスクをバッチ並列実行
* Uses a worker pool for both sequential (concurrency=1) and parallel
* (concurrency>1) execution through the same code path.
*/
export async function runAllTasks(
cwd: string,
@ -215,7 +182,7 @@ export async function runAllTasks(
const globalConfig = loadGlobalConfig();
const concurrency = globalConfig.concurrency;
const initialTasks = taskRunner.getNextTasks(concurrency);
const initialTasks = taskRunner.claimNextTasks(concurrency);
if (initialTasks.length === 0) {
info('No pending tasks in .takt/tasks/');
@ -228,10 +195,7 @@ export async function runAllTasks(
info(`Concurrency: ${concurrency}`);
}
// initialTasks is guaranteed non-empty at this point (early return above)
const result = concurrency <= 1
? await runSequential(taskRunner, initialTasks[0]!, cwd, pieceName, options)
: await runParallel(taskRunner, initialTasks, concurrency, cwd, pieceName, options);
const result = await runWithWorkerPool(taskRunner, initialTasks, concurrency, cwd, pieceName, options);
const totalCount = result.success + result.fail;
blankLine();

View File

@ -40,8 +40,8 @@ export interface PieceExecutionOptions {
retryNote?: string;
/** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */
abortSignal?: AbortSignal;
/** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */
quiet?: boolean;
/** Task name prefix for parallel execution output (e.g. "[task-name] output...") */
taskPrefix?: string;
}
export interface TaskExecutionOptions {
@ -70,8 +70,8 @@ export interface ExecuteTaskOptions {
retryNote?: string;
/** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */
abortSignal?: AbortSignal;
/** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */
quiet?: boolean;
/** Task name prefix for parallel execution output (e.g. "[task-name] output...") */
taskPrefix?: string;
}
export interface PipelineExecutionOptions {

View File

@ -35,10 +35,38 @@ const log = createLogger('claude-sdk');
export class QueryExecutor {
/**
* Execute a Claude query.
* If session resume fails with a process exit error, retries without resume.
*/
async execute(
prompt: string,
options: ClaudeSpawnOptions,
): Promise<ClaudeResult> {
const result = await this.executeOnce(prompt, options);
// Retry without session resume if it appears to be a session resume failure
if (
result.error
&& options.sessionId
&& result.error.includes('exited with code')
&& !result.content
) {
log.info('Session resume may have failed, retrying without resume', {
sessionId: options.sessionId,
error: result.error,
});
const retryOptions: ClaudeSpawnOptions = { ...options, sessionId: undefined };
return this.executeOnce(prompt, retryOptions);
}
return result;
}
/**
* Execute a single Claude query attempt.
*/
private async executeOnce(
prompt: string,
options: ClaudeSpawnOptions,
): Promise<ClaudeResult> {
const queryId = generateQueryId();
@ -50,7 +78,16 @@ export class QueryExecutor {
allowedTools: options.allowedTools,
});
const sdkOptions = new SdkOptionsBuilder(options).build();
const stderrChunks: string[] = [];
const optionsWithStderr: ClaudeSpawnOptions = {
...options,
onStderr: (data: string) => {
stderrChunks.push(data);
log.debug('Claude stderr', { queryId, data: data.trimEnd() });
options.onStderr?.(data);
},
};
const sdkOptions = new SdkOptionsBuilder(optionsWithStderr).build();
let sessionId: string | undefined;
let success = false;
@ -115,7 +152,7 @@ export class QueryExecutor {
};
} catch (error) {
unregisterQuery(queryId);
return QueryExecutor.handleQueryError(error, queryId, sessionId, hasResultMessage, success, resultContent);
return QueryExecutor.handleQueryError(error, queryId, sessionId, hasResultMessage, success, resultContent, stderrChunks);
}
}
@ -130,6 +167,7 @@ export class QueryExecutor {
hasResultMessage: boolean,
success: boolean,
resultContent: string | undefined,
stderrChunks: string[],
): ClaudeResult {
if (error instanceof AbortError) {
log.info('Claude query was interrupted', { queryId });
@ -170,7 +208,11 @@ export class QueryExecutor {
return { success: false, content: '', error: 'Request timed out. Please try again.' };
}
return { success: false, content: '', error: errorMessage };
const stderrOutput = stderrChunks.join('').trim();
const errorWithStderr = stderrOutput
? `${errorMessage}\nstderr: ${stderrOutput}`
: errorMessage;
return { success: false, content: '', error: errorWithStderr };
}
}

View File

@ -85,6 +85,10 @@ export class SdkOptionsBuilder {
sdkOptions.continue = false;
}
if (this.options.onStderr) {
sdkOptions.stderr = this.options.onStderr;
}
return sdkOptions;
}

View File

@ -166,4 +166,6 @@ export interface ClaudeSpawnOptions {
bypassPermissions?: boolean;
/** Anthropic API key to inject via env (bypasses CLI auth) */
anthropicApiKey?: string;
/** Callback for stderr output from the Claude Code process */
onStderr?: (data: string) => void;
}

View File

@ -31,6 +31,7 @@ export class TaskRunner {
private tasksDir: string;
private completedDir: string;
private failedDir: string;
private claimedPaths = new Set<string>();
constructor(projectDir: string) {
this.projectDir = projectDir;
@ -106,11 +107,19 @@ export class TaskRunner {
}
/**
*
*
*
* claimed claimed
*
*/
getNextTasks(count: number): TaskInfo[] {
const tasks = this.listTasks();
return tasks.slice(0, count);
claimNextTasks(count: number): TaskInfo[] {
const allTasks = this.listTasks();
const unclaimed = allTasks.filter((t) => !this.claimedPaths.has(t.filePath));
const claimed = unclaimed.slice(0, count);
for (const task of claimed) {
this.claimedPaths.add(task.filePath);
}
return claimed;
}
/**
@ -318,6 +327,8 @@ export class TaskRunner {
const movedTaskFile = path.join(taskTargetDir, `${result.task.name}${originalExt}`);
fs.renameSync(result.task.filePath, movedTaskFile);
this.claimedPaths.delete(result.task.filePath);
// レポートを生成
const reportFile = path.join(taskTargetDir, 'report.md');
const reportContent = this.generateReport(result);