diff --git a/builtins/en/piece-categories.yaml b/builtins/en/piece-categories.yaml index a7b6d6a..54bd219 100644 --- a/builtins/en/piece-categories.yaml +++ b/builtins/en/piece-categories.yaml @@ -5,9 +5,11 @@ piece_categories: - passthrough - coding - minimal - 🔍 Review & Fix: + - compound-eye + 🔍 Review: pieces: - review-fix-minimal + - review-only 🎨 Frontend: {} ⚙️ Backend: {} 🔧 Expert: @@ -33,6 +35,5 @@ piece_categories: pieces: - research - magi - - review-only show_others_category: true others_category_name: Others diff --git a/builtins/en/pieces/compound-eye.yaml b/builtins/en/pieces/compound-eye.yaml new file mode 100644 index 0000000..f0c5e69 --- /dev/null +++ b/builtins/en/pieces/compound-eye.yaml @@ -0,0 +1,110 @@ +name: compound-eye +description: Multi-model review - send the same instruction to Claude and Codex simultaneously, synthesize both responses +max_iterations: 10 +knowledge: + architecture: ../knowledge/architecture.md +personas: + coder: ../personas/coder.md + supervisor: ../personas/supervisor.md +initial_movement: evaluate +movements: + - name: evaluate + parallel: + - name: claude-eye + edit: false + persona: coder + provider: claude + session: refresh + knowledge: architecture + allowed_tools: + - Read + - Glob + - Grep + - Bash + - WebSearch + - WebFetch + rules: + - condition: done + - condition: failed + output_contracts: + report: + - name: 01-claude.md + - name: codex-eye + edit: false + persona: coder + provider: codex + session: refresh + knowledge: architecture + allowed_tools: + - Read + - Glob + - Grep + - Bash + - WebSearch + - WebFetch + rules: + - condition: done + - condition: failed + output_contracts: + report: + - name: 02-codex.md + rules: + - condition: any("done") + next: synthesize + + - name: synthesize + edit: false + persona: supervisor + allowed_tools: + - Read + - Glob + - Grep + rules: + - condition: synthesis complete + next: COMPLETE + instruction_template: | + Two models (Claude / Codex) independently answered the same instruction. + Synthesize their responses. + + **Tasks:** + 1. Read reports in the Report Directory + - `01-claude.md` (Claude's response) + - `02-codex.md` (Codex's response) + Note: If one report is missing (model failed), synthesize from the available report only + 2. If both reports exist, compare and clarify: + - Points of agreement + - Points of disagreement + - Points mentioned by only one model + 3. Produce a synthesized conclusion + + **Output format:** + ```markdown + # Multi-Model Review Synthesis + + ## Conclusion + {Synthesized conclusion} + + ## Response Status + | Model | Status | + |-------|--------| + | Claude | ✅ / ❌ | + | Codex | ✅ / ❌ | + + ## Agreements + - {Points where both models agree} + + ## Disagreements + | Topic | Claude | Codex | + |-------|--------|-------| + | {topic} | {Claude's view} | {Codex's view} | + + ## Unique Findings + - **Claude only:** {Points only Claude mentioned} + - **Codex only:** {Points only Codex mentioned} + + ## Overall Assessment + {Overall assessment considering both responses} + ``` + output_contracts: + report: + - Summary: 03-synthesis.md diff --git a/builtins/ja/piece-categories.yaml b/builtins/ja/piece-categories.yaml index 77ccee2..94263bb 100644 --- a/builtins/ja/piece-categories.yaml +++ b/builtins/ja/piece-categories.yaml @@ -5,9 +5,11 @@ piece_categories: - passthrough - coding - minimal - 🔍 レビュー&修正: + - compound-eye + 🔍 レビュー: pieces: - review-fix-minimal + - review-only 🎨 フロントエンド: {} ⚙️ バックエンド: {} 🔧 フルスタック: @@ -32,6 +34,5 @@ piece_categories: pieces: - research - magi - - review-only show_others_category: true others_category_name: その他 diff --git a/builtins/ja/pieces/compound-eye.yaml b/builtins/ja/pieces/compound-eye.yaml new file mode 100644 index 0000000..d5a1962 --- /dev/null +++ b/builtins/ja/pieces/compound-eye.yaml @@ -0,0 +1,110 @@ +name: compound-eye +description: 複眼レビュー - 同じ指示を Claude と Codex に同時に投げ、両者の回答を統合する +max_iterations: 10 +knowledge: + architecture: ../knowledge/architecture.md +personas: + coder: ../personas/coder.md + supervisor: ../personas/supervisor.md +initial_movement: evaluate + +movements: + - name: evaluate + parallel: + - name: claude-eye + edit: false + persona: coder + provider: claude + knowledge: architecture + allowed_tools: + - Read + - Glob + - Grep + - Bash + - WebSearch + - WebFetch + rules: + - condition: done + - condition: failed + output_contracts: + report: + - name: 01-claude.md + + - name: codex-eye + edit: false + persona: coder + provider: codex + knowledge: architecture + allowed_tools: + - Read + - Glob + - Grep + - Bash + - WebSearch + - WebFetch + rules: + - condition: done + - condition: failed + output_contracts: + report: + - name: 02-codex.md + rules: + - condition: any("done") + next: synthesize + + - name: synthesize + edit: false + persona: supervisor + allowed_tools: + - Read + - Glob + - Grep + rules: + - condition: 統合完了 + next: COMPLETE + instruction_template: | + 2つのモデル(Claude / Codex)が同じ指示に対して独立に回答しました。 + 両者の回答を統合してください。 + + **やること:** + 1. Report Directory 内のレポートを読む + - `01-claude.md`(Claude の回答) + - `02-codex.md`(Codex の回答) + ※ 片方が存在しない場合(エラーで失敗した場合)、存在するレポートのみで統合する + 2. 両方のレポートがある場合は比較し、以下を明示する + - 一致している点 + - 相違している点 + - 片方だけが指摘・言及している点 + 3. 統合した結論を出す + + **出力フォーマット:** + ```markdown + # 複眼レビュー統合 + + ## 結論 + {統合した結論} + + ## 回答状況 + | モデル | 状態 | + |--------|------| + | Claude | ✅ / ❌ | + | Codex | ✅ / ❌ | + + ## 一致点 + - {両モデルが同じ見解を示した点} + + ## 相違点 + | 論点 | Claude | Codex | + |------|--------|-------| + | {論点} | {Claudeの見解} | {Codexの見解} | + + ## 片方のみの指摘 + - **Claude のみ:** {Claudeだけが言及した点} + - **Codex のみ:** {Codexだけが言及した点} + + ## 総合評価 + {両者の回答を踏まえた総合的な評価} + ``` + output_contracts: + report: + - Summary: 03-synthesis.md diff --git a/package-lock.json b/package-lock.json index f2620e6..da99bef 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "0.8.0", "license": "MIT", "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.34", + "@anthropic-ai/claude-agent-sdk": "^0.2.37", "@openai/codex-sdk": "^0.98.0", "chalk": "^5.3.0", "commander": "^12.1.0", @@ -39,9 +39,9 @@ } }, "node_modules/@anthropic-ai/claude-agent-sdk": { - "version": "0.2.34", - "resolved": "https://registry.npmjs.org/@anthropic-ai/claude-agent-sdk/-/claude-agent-sdk-0.2.34.tgz", - "integrity": "sha512-QLHd3Nt7bGU7/YH71fXFaztM9fNxGGruzTMrTYJkbm5gYJl5ZyU2zGyoE5VpWC0e1QU0yYdNdBVgqSYDcJGufg==", + "version": "0.2.37", + "resolved": "https://registry.npmjs.org/@anthropic-ai/claude-agent-sdk/-/claude-agent-sdk-0.2.37.tgz", + "integrity": "sha512-0TCAUuGXiWYV2JK+j2SiakGzPA7aoR5DNRxZ0EA571loGIqN3FRfiO1kipeBpEc+cRQ03a/4Kt5YAjMx0KBW+A==", "license": "SEE LICENSE IN README.md", "engines": { "node": ">=18.0.0" diff --git a/package.json b/package.json index 4a01299..fb6b109 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ "builtins/" ], "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.34", + "@anthropic-ai/claude-agent-sdk": "^0.2.37", "@openai/codex-sdk": "^0.98.0", "chalk": "^5.3.0", "commander": "^12.1.0", diff --git a/src/__tests__/engine-parallel-failure.test.ts b/src/__tests__/engine-parallel-failure.test.ts new file mode 100644 index 0000000..a60dc9d --- /dev/null +++ b/src/__tests__/engine-parallel-failure.test.ts @@ -0,0 +1,202 @@ +/** + * PieceEngine integration tests: parallel movement partial failure handling. + * + * Covers: + * - One sub-movement fails while another succeeds → piece continues + * - All sub-movements fail → piece aborts + * - Failed sub-movement is recorded as blocked with error + */ + +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { existsSync, rmSync } from 'node:fs'; + +// --- Mock setup (must be before imports that use these modules) --- + +vi.mock('../agents/runner.js', () => ({ + runAgent: vi.fn(), +})); + +vi.mock('../core/piece/evaluation/index.js', () => ({ + detectMatchedRule: vi.fn(), +})); + +vi.mock('../core/piece/phase-runner.js', () => ({ + needsStatusJudgmentPhase: vi.fn().mockReturnValue(false), + runReportPhase: vi.fn().mockResolvedValue(undefined), + runStatusJudgmentPhase: vi.fn().mockResolvedValue(''), +})); + +vi.mock('../shared/utils/index.js', async (importOriginal) => ({ + ...(await importOriginal>()), + generateReportDir: vi.fn().mockReturnValue('test-report-dir'), +})); + +// --- Imports (after mocks) --- + +import { PieceEngine } from '../core/piece/index.js'; +import { runAgent } from '../agents/runner.js'; +import { detectMatchedRule } from '../core/piece/index.js'; +import { + makeResponse, + makeMovement, + makeRule, + mockDetectMatchedRuleSequence, + createTestTmpDir, + applyDefaultMocks, +} from './engine-test-helpers.js'; +import type { PieceConfig } from '../core/models/index.js'; + +/** + * Build a piece config that goes directly to a parallel step: + * parallel-step (arch-review + security-review) → done + */ +function buildParallelOnlyConfig(): PieceConfig { + return { + name: 'test-parallel-failure', + description: 'Test parallel failure handling', + maxIterations: 10, + initialMovement: 'reviewers', + movements: [ + makeMovement('reviewers', { + parallel: [ + makeMovement('arch-review', { + rules: [ + makeRule('done', 'COMPLETE'), + makeRule('needs_fix', 'fix'), + ], + }), + makeMovement('security-review', { + rules: [ + makeRule('done', 'COMPLETE'), + makeRule('needs_fix', 'fix'), + ], + }), + ], + rules: [ + makeRule('any("done")', 'done', { + isAggregateCondition: true, + aggregateType: 'any', + aggregateConditionText: 'done', + }), + makeRule('all("needs_fix")', 'fix', { + isAggregateCondition: true, + aggregateType: 'all', + aggregateConditionText: 'needs_fix', + }), + ], + }), + makeMovement('done', { + rules: [ + makeRule('completed', 'COMPLETE'), + ], + }), + makeMovement('fix', { + rules: [ + makeRule('fixed', 'reviewers'), + ], + }), + ], + }; +} + +describe('PieceEngine Integration: Parallel Movement Partial Failure', () => { + let tmpDir: string; + + beforeEach(() => { + vi.resetAllMocks(); + applyDefaultMocks(); + tmpDir = createTestTmpDir(); + }); + + afterEach(() => { + if (existsSync(tmpDir)) { + rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it('should continue when one sub-movement fails but another succeeds', async () => { + const config = buildParallelOnlyConfig(); + const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir }); + + const mock = vi.mocked(runAgent); + // arch-review fails (exit code 1) + mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1')); + // security-review succeeds + mock.mockResolvedValueOnce( + makeResponse({ persona: 'security-review', content: 'Security review passed' }), + ); + // done step + mock.mockResolvedValueOnce( + makeResponse({ persona: 'done', content: 'Completed' }), + ); + + mockDetectMatchedRuleSequence([ + // security-review sub-movement rule match (arch-review has no match — it failed) + { index: 0, method: 'phase1_tag' }, // security-review → done + { index: 0, method: 'aggregate' }, // reviewers → any("done") matches + { index: 0, method: 'phase1_tag' }, // done → COMPLETE + ]); + + const state = await engine.run(); + + expect(state.status).toBe('completed'); + + // arch-review should be recorded as blocked + const archReviewOutput = state.movementOutputs.get('arch-review'); + expect(archReviewOutput).toBeDefined(); + expect(archReviewOutput!.status).toBe('blocked'); + expect(archReviewOutput!.error).toContain('exit'); + + // security-review should be recorded as done + const securityReviewOutput = state.movementOutputs.get('security-review'); + expect(securityReviewOutput).toBeDefined(); + expect(securityReviewOutput!.status).toBe('done'); + }); + + it('should abort when all sub-movements fail', async () => { + const config = buildParallelOnlyConfig(); + const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir }); + + const mock = vi.mocked(runAgent); + // Both fail + mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1')); + mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1')); + + const abortFn = vi.fn(); + engine.on('piece:abort', abortFn); + + const state = await engine.run(); + + expect(state.status).toBe('aborted'); + expect(abortFn).toHaveBeenCalledOnce(); + const reason = abortFn.mock.calls[0]![1] as string; + expect(reason).toContain('All parallel sub-movements failed'); + }); + + it('should record failed sub-movement error message in movementOutputs', async () => { + const config = buildParallelOnlyConfig(); + const engine = new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir }); + + const mock = vi.mocked(runAgent); + mock.mockRejectedValueOnce(new Error('Session resume failed')); + mock.mockResolvedValueOnce( + makeResponse({ persona: 'security-review', content: 'OK' }), + ); + mock.mockResolvedValueOnce( + makeResponse({ persona: 'done', content: 'Done' }), + ); + + mockDetectMatchedRuleSequence([ + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'aggregate' }, + { index: 0, method: 'phase1_tag' }, + ]); + + const state = await engine.run(); + + const archReviewOutput = state.movementOutputs.get('arch-review'); + expect(archReviewOutput).toBeDefined(); + expect(archReviewOutput!.error).toBe('Session resume failed'); + expect(archReviewOutput!.content).toBe(''); + }); +}); diff --git a/src/__tests__/executor-stderr.test.ts b/src/__tests__/executor-stderr.test.ts new file mode 100644 index 0000000..7d875a3 --- /dev/null +++ b/src/__tests__/executor-stderr.test.ts @@ -0,0 +1,29 @@ +/** + * Tests for QueryExecutor stderr capture and SdkOptionsBuilder stderr passthrough. + */ + +import { describe, it, expect } from 'vitest'; +import { buildSdkOptions } from '../infra/claude/options-builder.js'; +import type { ClaudeSpawnOptions } from '../infra/claude/types.js'; + +describe('SdkOptionsBuilder.build() — stderr', () => { + it('should include stderr callback in SDK options when onStderr is provided', () => { + const stderrHandler = (_data: string): void => {}; + const spawnOptions: ClaudeSpawnOptions = { + cwd: '/tmp/test', + onStderr: stderrHandler, + }; + + const sdkOptions = buildSdkOptions(spawnOptions); + expect(sdkOptions.stderr).toBe(stderrHandler); + }); + + it('should not include stderr in SDK options when onStderr is not provided', () => { + const spawnOptions: ClaudeSpawnOptions = { + cwd: '/tmp/test', + }; + + const sdkOptions = buildSdkOptions(spawnOptions); + expect(sdkOptions).not.toHaveProperty('stderr'); + }); +}); diff --git a/src/__tests__/runAllTasks-concurrency.test.ts b/src/__tests__/runAllTasks-concurrency.test.ts index 3ff767a..258890b 100644 --- a/src/__tests__/runAllTasks-concurrency.test.ts +++ b/src/__tests__/runAllTasks-concurrency.test.ts @@ -1,5 +1,5 @@ /** - * Tests for runAllTasks concurrency support + * Tests for runAllTasks concurrency support (worker pool) */ import { describe, it, expect, vi, beforeEach } from 'vitest'; @@ -21,7 +21,7 @@ import { loadGlobalConfig } from '../infra/config/index.js'; const mockLoadGlobalConfig = vi.mocked(loadGlobalConfig); const mockGetNextTask = vi.fn(); -const mockGetNextTasks = vi.fn(); +const mockClaimNextTasks = vi.fn(); const mockCompleteTask = vi.fn(); const mockFailTask = vi.fn(); @@ -29,7 +29,7 @@ vi.mock('../infra/task/index.js', async (importOriginal) => ({ ...(await importOriginal>()), TaskRunner: vi.fn().mockImplementation(() => ({ getNextTask: mockGetNextTask, - getNextTasks: mockGetNextTasks, + claimNextTasks: mockClaimNextTasks, completeTask: mockCompleteTask, failTask: mockFailTask, })), @@ -147,7 +147,7 @@ describe('runAllTasks concurrency', () => { it('should show no-tasks message when no tasks exist', async () => { // Given: No pending tasks - mockGetNextTasks.mockReturnValue([]); + mockClaimNextTasks.mockReturnValue([]); // When await runAllTasks('/project'); @@ -156,21 +156,21 @@ describe('runAllTasks concurrency', () => { expect(mockInfo).toHaveBeenCalledWith('No pending tasks in .takt/tasks/'); }); - it('should execute tasks sequentially when concurrency is 1', async () => { + it('should execute tasks sequentially via worker pool when concurrency is 1', async () => { // Given: Two tasks available sequentially const task1 = createTask('task-1'); const task2 = createTask('task-2'); - mockGetNextTasks.mockReturnValueOnce([task1]); - mockGetNextTask - .mockReturnValueOnce(task2) - .mockReturnValueOnce(null); + mockClaimNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([task2]) + .mockReturnValueOnce([]); // When await runAllTasks('/project'); - // Then: Sequential execution uses getNextTask in the while loop - expect(mockGetNextTask).toHaveBeenCalled(); + // Then: Worker pool uses claimNextTasks for fetching more tasks + expect(mockClaimNextTasks).toHaveBeenCalled(); expect(mockStatus).toHaveBeenCalledWith('Total', '2'); }); }); @@ -188,7 +188,7 @@ describe('runAllTasks concurrency', () => { it('should display concurrency info when concurrency > 1', async () => { // Given: Tasks available const task1 = createTask('task-1'); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce([task1]) .mockReturnValueOnce([]); @@ -199,29 +199,32 @@ describe('runAllTasks concurrency', () => { expect(mockInfo).toHaveBeenCalledWith('Concurrency: 3'); }); - it('should execute tasks in batch when concurrency > 1', async () => { - // Given: 3 tasks available in first batch + it('should execute tasks using worker pool when concurrency > 1', async () => { + // Given: 3 tasks available const task1 = createTask('task-1'); const task2 = createTask('task-2'); const task3 = createTask('task-3'); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce([task1, task2, task3]) .mockReturnValueOnce([]); // When await runAllTasks('/project'); - // Then: Batch info shown - expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ==='); + // Then: Task names displayed + expect(mockInfo).toHaveBeenCalledWith('=== Task: task-1 ==='); + expect(mockInfo).toHaveBeenCalledWith('=== Task: task-2 ==='); + expect(mockInfo).toHaveBeenCalledWith('=== Task: task-3 ==='); expect(mockStatus).toHaveBeenCalledWith('Total', '3'); }); - it('should process multiple batches', async () => { - // Given: 5 tasks, concurrency=3 → batch1 (3 tasks), batch2 (2 tasks) + it('should fill slots as tasks complete (worker pool behavior)', async () => { + // Given: 5 tasks, concurrency=3 + // Worker pool should start 3, then fill slots as tasks complete const tasks = Array.from({ length: 5 }, (_, i) => createTask(`task-${i + 1}`)); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce(tasks.slice(0, 3)) .mockReturnValueOnce(tasks.slice(3, 5)) .mockReturnValueOnce([]); @@ -229,42 +232,9 @@ describe('runAllTasks concurrency', () => { // When await runAllTasks('/project'); - // Then: Both batches shown - expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 3 task(s) ==='); - expect(mockInfo).toHaveBeenCalledWith('=== Running batch of 2 task(s) ==='); + // Then: All 5 tasks executed expect(mockStatus).toHaveBeenCalledWith('Total', '5'); }); - - it('should not use getNextTask in parallel mode', async () => { - // Given: Tasks in parallel mode - const task1 = createTask('task-1'); - mockGetNextTasks - .mockReturnValueOnce([task1]) - .mockReturnValueOnce([]); - - // When - await runAllTasks('/project'); - - // Then: getNextTask should not be called (parallel uses getNextTasks) - expect(mockGetNextTask).not.toHaveBeenCalled(); - }); - - it('should list task names in batch output', async () => { - // Given: Tasks with specific names - const task1 = createTask('auth-feature'); - const task2 = createTask('db-migration'); - - mockGetNextTasks - .mockReturnValueOnce([task1, task2]) - .mockReturnValueOnce([]); - - // When - await runAllTasks('/project'); - - // Then - expect(mockInfo).toHaveBeenCalledWith(' - auth-feature'); - expect(mockInfo).toHaveBeenCalledWith(' - db-migration'); - }); }); describe('default concurrency', () => { @@ -278,8 +248,9 @@ describe('runAllTasks concurrency', () => { }); const task1 = createTask('task-1'); - mockGetNextTasks.mockReturnValueOnce([task1]); - mockGetNextTask.mockReturnValueOnce(null); + mockClaimNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([]); // When await runAllTasks('/project'); @@ -311,7 +282,7 @@ describe('runAllTasks concurrency', () => { mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never); }); - it('should run batch tasks concurrently, not sequentially', async () => { + it('should run tasks concurrently, not sequentially', async () => { // Given: 2 tasks with delayed execution to verify concurrency const task1 = createTask('slow-1'); const task2 = createTask('slow-2'); @@ -329,7 +300,7 @@ describe('runAllTasks concurrency', () => { }); }); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce([task1, task2]) .mockReturnValueOnce([]); @@ -345,7 +316,48 @@ describe('runAllTasks concurrency', () => { expect(elapsed).toBeLessThan(150); }); - it('should count partial failures correctly in a batch', async () => { + it('should fill slots immediately when a task completes (no batch waiting)', async () => { + // Given: 3 tasks, concurrency=2, task1 finishes quickly, task2 takes longer + mockLoadGlobalConfig.mockReturnValue({ + language: 'en', + defaultPiece: 'default', + logLevel: 'info', + concurrency: 2, + }); + + const task1 = createTask('fast'); + const task2 = createTask('slow'); + const task3 = createTask('after-fast'); + + const executionOrder: string[] = []; + + mockExecutePiece.mockImplementation((_config, task) => { + executionOrder.push(`start:${task}`); + const delay = (task as string).includes('slow') ? 80 : 20; + return new Promise((resolve) => { + setTimeout(() => { + executionOrder.push(`end:${task}`); + resolve({ success: true }); + }, delay); + }); + }); + + mockClaimNextTasks + .mockReturnValueOnce([task1, task2]) + .mockReturnValueOnce([task3]) + .mockReturnValueOnce([]); + + // When + await runAllTasks('/project'); + + // Then: task3 starts before task2 finishes (slot filled immediately) + const task3StartIdx = executionOrder.indexOf('start:Task: after-fast'); + const task2EndIdx = executionOrder.indexOf('end:Task: slow'); + expect(task3StartIdx).toBeLessThan(task2EndIdx); + expect(mockStatus).toHaveBeenCalledWith('Total', '3'); + }); + + it('should count partial failures correctly', async () => { // Given: 3 tasks, 1 fails, 2 succeed const task1 = createTask('pass-1'); const task2 = createTask('fail-1'); @@ -358,7 +370,7 @@ describe('runAllTasks concurrency', () => { return Promise.resolve({ success: callIndex !== 2 }); }); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce([task1, task2, task3]) .mockReturnValueOnce([]); @@ -371,29 +383,29 @@ describe('runAllTasks concurrency', () => { expect(mockStatus).toHaveBeenCalledWith('Failed', '1', 'red'); }); - it('should pass abortSignal and quiet=true to executePiece in parallel mode', async () => { + it('should pass abortSignal and taskPrefix to executePiece in parallel mode', async () => { // Given: One task in parallel mode const task1 = createTask('parallel-task'); mockExecutePiece.mockResolvedValue({ success: true }); - mockGetNextTasks + mockClaimNextTasks .mockReturnValueOnce([task1]) .mockReturnValueOnce([]); // When await runAllTasks('/project'); - // Then: executePiece received abortSignal and quiet options + // Then: executePiece received abortSignal and taskPrefix options expect(mockExecutePiece).toHaveBeenCalledTimes(1); const callArgs = mockExecutePiece.mock.calls[0]; const pieceOptions = callArgs?.[3]; // 4th argument is options expect(pieceOptions).toHaveProperty('abortSignal'); expect(pieceOptions?.abortSignal).toBeInstanceOf(AbortSignal); - expect(pieceOptions).toHaveProperty('quiet', true); + expect(pieceOptions).toHaveProperty('taskPrefix', 'parallel-task'); }); - it('should not pass abortSignal or quiet in sequential mode', async () => { + it('should not pass abortSignal or taskPrefix in sequential mode', async () => { // Given: Sequential mode mockLoadGlobalConfig.mockReturnValue({ language: 'en', @@ -406,18 +418,19 @@ describe('runAllTasks concurrency', () => { mockExecutePiece.mockResolvedValue({ success: true }); mockLoadPieceByIdentifier.mockReturnValue(fakePieceConfig as never); - mockGetNextTasks.mockReturnValueOnce([task1]); - mockGetNextTask.mockReturnValueOnce(null); + mockClaimNextTasks + .mockReturnValueOnce([task1]) + .mockReturnValueOnce([]); // When await runAllTasks('/project'); - // Then: executePiece should not have abortSignal or quiet + // Then: executePiece should not have abortSignal or taskPrefix expect(mockExecutePiece).toHaveBeenCalledTimes(1); const callArgs = mockExecutePiece.mock.calls[0]; const pieceOptions = callArgs?.[3]; expect(pieceOptions?.abortSignal).toBeUndefined(); - expect(pieceOptions?.quiet).toBeFalsy(); + expect(pieceOptions?.taskPrefix).toBeUndefined(); }); }); }); diff --git a/src/__tests__/session-key.test.ts b/src/__tests__/session-key.test.ts new file mode 100644 index 0000000..be4ccc1 --- /dev/null +++ b/src/__tests__/session-key.test.ts @@ -0,0 +1,53 @@ +/** + * Tests for session key generation + */ + +import { describe, it, expect } from 'vitest'; +import { buildSessionKey } from '../core/piece/session-key.js'; +import type { PieceMovement } from '../core/models/types.js'; + +function createMovement(overrides: Partial = {}): PieceMovement { + return { + name: 'test-movement', + personaDisplayName: 'test', + edit: false, + instructionTemplate: '', + passPreviousResponse: true, + ...overrides, + }; +} + +describe('buildSessionKey', () => { + it('should use persona as base key when persona is set', () => { + const step = createMovement({ persona: 'coder', name: 'implement' }); + expect(buildSessionKey(step)).toBe('coder'); + }); + + it('should use name as base key when persona is not set', () => { + const step = createMovement({ persona: undefined, name: 'plan' }); + expect(buildSessionKey(step)).toBe('plan'); + }); + + it('should append provider when provider is specified', () => { + const step = createMovement({ persona: 'coder', provider: 'claude' }); + expect(buildSessionKey(step)).toBe('coder:claude'); + }); + + it('should use name with provider when persona is not set', () => { + const step = createMovement({ persona: undefined, name: 'review', provider: 'codex' }); + expect(buildSessionKey(step)).toBe('review:codex'); + }); + + it('should produce different keys for same persona with different providers', () => { + const claudeStep = createMovement({ persona: 'coder', provider: 'claude', name: 'claude-eye' }); + const codexStep = createMovement({ persona: 'coder', provider: 'codex', name: 'codex-eye' }); + expect(buildSessionKey(claudeStep)).not.toBe(buildSessionKey(codexStep)); + expect(buildSessionKey(claudeStep)).toBe('coder:claude'); + expect(buildSessionKey(codexStep)).toBe('coder:codex'); + }); + + it('should not append provider when provider is undefined', () => { + const step = createMovement({ persona: 'coder', provider: undefined }); + expect(buildSessionKey(step)).toBe('coder'); + }); +}); diff --git a/src/__tests__/task.test.ts b/src/__tests__/task.test.ts index 4b16813..6e9af16 100644 --- a/src/__tests__/task.test.ts +++ b/src/__tests__/task.test.ts @@ -144,49 +144,114 @@ describe('TaskRunner', () => { }); }); - describe('getNextTasks', () => { + describe('claimNextTasks', () => { it('should return empty array when no tasks', () => { - const tasks = runner.getNextTasks(3); + const tasks = runner.claimNextTasks(3); expect(tasks).toEqual([]); }); - it('should return all tasks when count exceeds available tasks', () => { + it('should return tasks up to the requested count', () => { const tasksDir = join(testDir, '.takt', 'tasks'); mkdirSync(tasksDir, { recursive: true }); - writeFileSync(join(tasksDir, 'b-task.md'), 'B'); writeFileSync(join(tasksDir, 'a-task.md'), 'A'); - - const tasks = runner.getNextTasks(5); - expect(tasks).toHaveLength(2); - expect(tasks[0]?.name).toBe('a-task'); - expect(tasks[1]?.name).toBe('b-task'); - }); - - it('should return only count tasks when more are available', () => { - const tasksDir = join(testDir, '.takt', 'tasks'); - mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'b-task.md'), 'B'); writeFileSync(join(tasksDir, 'c-task.md'), 'C'); - writeFileSync(join(tasksDir, 'b-task.md'), 'B'); - writeFileSync(join(tasksDir, 'a-task.md'), 'A'); - const tasks = runner.getNextTasks(2); + const tasks = runner.claimNextTasks(2); expect(tasks).toHaveLength(2); expect(tasks[0]?.name).toBe('a-task'); expect(tasks[1]?.name).toBe('b-task'); }); - it('should return tasks in same sort order as getNextTask', () => { + it('should not return already claimed tasks on subsequent calls', () => { const tasksDir = join(testDir, '.takt', 'tasks'); mkdirSync(tasksDir, { recursive: true }); - writeFileSync(join(tasksDir, '02-second.md'), 'Second'); - writeFileSync(join(tasksDir, '01-first.md'), 'First'); - writeFileSync(join(tasksDir, '03-third.md'), 'Third'); + writeFileSync(join(tasksDir, 'a-task.md'), 'A'); + writeFileSync(join(tasksDir, 'b-task.md'), 'B'); + writeFileSync(join(tasksDir, 'c-task.md'), 'C'); - const nextTask = runner.getNextTask(); - const nextTasks = runner.getNextTasks(1); + // Given: first call claims a-task + const first = runner.claimNextTasks(1); + expect(first).toHaveLength(1); + expect(first[0]?.name).toBe('a-task'); - expect(nextTasks).toHaveLength(1); - expect(nextTasks[0]?.name).toBe(nextTask?.name); + // When: second call should skip a-task + const second = runner.claimNextTasks(1); + expect(second).toHaveLength(1); + expect(second[0]?.name).toBe('b-task'); + + // When: third call should skip a-task and b-task + const third = runner.claimNextTasks(1); + expect(third).toHaveLength(1); + expect(third[0]?.name).toBe('c-task'); + + // When: fourth call should return empty (all claimed) + const fourth = runner.claimNextTasks(1); + expect(fourth).toEqual([]); + }); + + it('should release claim after completeTask', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'task-a.md'), 'Task A content'); + + // Given: claim the task + const claimed = runner.claimNextTasks(1); + expect(claimed).toHaveLength(1); + + // When: complete the task (file is moved away) + runner.completeTask({ + task: claimed[0]!, + success: true, + response: 'Done', + executionLog: [], + startedAt: '2024-01-01T00:00:00.000Z', + completedAt: '2024-01-01T00:01:00.000Z', + }); + + // Then: claim set no longer blocks (but file is moved, so no tasks anyway) + const next = runner.claimNextTasks(1); + expect(next).toEqual([]); + }); + + it('should release claim after failTask', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'task-a.md'), 'Task A content'); + + // Given: claim the task + const claimed = runner.claimNextTasks(1); + expect(claimed).toHaveLength(1); + + // When: fail the task (file is moved away) + runner.failTask({ + task: claimed[0]!, + success: false, + response: 'Error', + executionLog: [], + startedAt: '2024-01-01T00:00:00.000Z', + completedAt: '2024-01-01T00:01:00.000Z', + }); + + // Then: claim set no longer blocks + const next = runner.claimNextTasks(1); + expect(next).toEqual([]); + }); + + it('should not affect getNextTask (unclaimed access)', () => { + const tasksDir = join(testDir, '.takt', 'tasks'); + mkdirSync(tasksDir, { recursive: true }); + writeFileSync(join(tasksDir, 'a-task.md'), 'A'); + writeFileSync(join(tasksDir, 'b-task.md'), 'B'); + + // Given: claim a-task via claimNextTasks + runner.claimNextTasks(1); + + // When: getNextTask is called (no claim filtering) + const task = runner.getNextTask(); + + // Then: getNextTask still returns first task (including claimed) + expect(task?.name).toBe('a-task'); }); }); diff --git a/src/__tests__/workerPool.test.ts b/src/__tests__/workerPool.test.ts new file mode 100644 index 0000000..4624d18 --- /dev/null +++ b/src/__tests__/workerPool.test.ts @@ -0,0 +1,230 @@ +/** + * Unit tests for runWithWorkerPool + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import type { TaskInfo } from '../infra/task/index.js'; + +vi.mock('../shared/ui/index.js', () => ({ + header: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + success: vi.fn(), + status: vi.fn(), + blankLine: vi.fn(), +})); + +vi.mock('../shared/exitCodes.js', () => ({ + EXIT_SIGINT: 130, +})); + +vi.mock('../shared/i18n/index.js', () => ({ + getLabel: vi.fn((key: string) => key), +})); + +const mockExecuteAndCompleteTask = vi.fn(); + +vi.mock('../features/tasks/execute/taskExecution.js', () => ({ + executeAndCompleteTask: (...args: unknown[]) => mockExecuteAndCompleteTask(...args), +})); + +import { runWithWorkerPool } from '../features/tasks/execute/parallelExecution.js'; +import { info } from '../shared/ui/index.js'; + +const mockInfo = vi.mocked(info); + +function createTask(name: string): TaskInfo { + return { + name, + content: `Task: ${name}`, + filePath: `/tasks/${name}.yaml`, + }; +} + +function createMockTaskRunner(taskBatches: TaskInfo[][]) { + let batchIndex = 0; + return { + getNextTask: vi.fn(() => null), + claimNextTasks: vi.fn(() => { + const batch = taskBatches[batchIndex] ?? []; + batchIndex++; + return batch; + }), + completeTask: vi.fn(), + failTask: vi.fn(), + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + mockExecuteAndCompleteTask.mockResolvedValue(true); +}); + +describe('runWithWorkerPool', () => { + it('should return correct counts for all successful tasks', async () => { + // Given + const tasks = [createTask('a'), createTask('b')]; + const runner = createMockTaskRunner([]); + + // When + const result = await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default'); + + // Then + expect(result).toEqual({ success: 2, fail: 0 }); + }); + + it('should return correct counts when some tasks fail', async () => { + // Given + const tasks = [createTask('pass'), createTask('fail'), createTask('pass2')]; + let callIdx = 0; + mockExecuteAndCompleteTask.mockImplementation(() => { + callIdx++; + return Promise.resolve(callIdx !== 2); + }); + const runner = createMockTaskRunner([]); + + // When + const result = await runWithWorkerPool(runner as never, tasks, 3, '/cwd', 'default'); + + // Then + expect(result).toEqual({ success: 2, fail: 1 }); + }); + + it('should display task name for each task', async () => { + // Given + const tasks = [createTask('alpha'), createTask('beta')]; + const runner = createMockTaskRunner([]); + + // When + await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default'); + + // Then + expect(mockInfo).toHaveBeenCalledWith('=== Task: alpha ==='); + expect(mockInfo).toHaveBeenCalledWith('=== Task: beta ==='); + }); + + it('should pass taskPrefix for parallel execution (concurrency > 1)', async () => { + // Given + const tasks = [createTask('my-task')]; + const runner = createMockTaskRunner([]); + + // When + await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default'); + + // Then + expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1); + const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5]; + expect(parallelOpts).toEqual({ + abortSignal: expect.any(AbortSignal), + taskPrefix: 'my-task', + }); + }); + + it('should not pass taskPrefix or abortSignal for sequential execution (concurrency = 1)', async () => { + // Given + const tasks = [createTask('seq-task')]; + const runner = createMockTaskRunner([]); + + // When + await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default'); + + // Then + expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(1); + const parallelOpts = mockExecuteAndCompleteTask.mock.calls[0]?.[5]; + expect(parallelOpts).toEqual({ + abortSignal: undefined, + taskPrefix: undefined, + }); + }); + + it('should fetch more tasks when slots become available', async () => { + // Given: 1 initial task, runner provides 1 more after + const task1 = createTask('first'); + const task2 = createTask('second'); + const runner = createMockTaskRunner([[task2]]); + + // When + await runWithWorkerPool(runner as never, [task1], 2, '/cwd', 'default'); + + // Then + expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(2); + expect(runner.claimNextTasks).toHaveBeenCalled(); + }); + + it('should respect concurrency limit', async () => { + // Given: 4 tasks, concurrency=2 + const tasks = Array.from({ length: 4 }, (_, i) => createTask(`task-${i}`)); + + let activeCount = 0; + let maxActive = 0; + + mockExecuteAndCompleteTask.mockImplementation(() => { + activeCount++; + maxActive = Math.max(maxActive, activeCount); + return new Promise((resolve) => { + setTimeout(() => { + activeCount--; + resolve(true); + }, 20); + }); + }); + + const runner = createMockTaskRunner([]); + + // When + await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default'); + + // Then: Never exceeded concurrency of 2 + expect(maxActive).toBeLessThanOrEqual(2); + expect(mockExecuteAndCompleteTask).toHaveBeenCalledTimes(4); + }); + + it('should pass abortSignal to all parallel tasks', async () => { + // Given: Multiple tasks in parallel mode + const tasks = [createTask('task-1'), createTask('task-2'), createTask('task-3')]; + const runner = createMockTaskRunner([]); + + const receivedSignals: (AbortSignal | undefined)[] = []; + mockExecuteAndCompleteTask.mockImplementation((_task, _runner, _cwd, _piece, _opts, parallelOpts) => { + receivedSignals.push(parallelOpts?.abortSignal); + return Promise.resolve(true); + }); + + // When + await runWithWorkerPool(runner as never, tasks, 3, '/cwd', 'default'); + + // Then: All tasks received the same AbortSignal + expect(receivedSignals).toHaveLength(3); + const firstSignal = receivedSignals[0]; + expect(firstSignal).toBeInstanceOf(AbortSignal); + for (const signal of receivedSignals) { + expect(signal).toBe(firstSignal); + } + }); + + it('should handle empty initial tasks', async () => { + // Given: No tasks + const runner = createMockTaskRunner([]); + + // When + const result = await runWithWorkerPool(runner as never, [], 2, '/cwd', 'default'); + + // Then + expect(result).toEqual({ success: 0, fail: 0 }); + expect(mockExecuteAndCompleteTask).not.toHaveBeenCalled(); + }); + + it('should handle task promise rejection gracefully', async () => { + // Given: Task that throws + const tasks = [createTask('throws')]; + mockExecuteAndCompleteTask.mockRejectedValue(new Error('boom')); + const runner = createMockTaskRunner([]); + + // When + const result = await runWithWorkerPool(runner as never, tasks, 1, '/cwd', 'default'); + + // Then: Treated as failure + expect(result).toEqual({ success: 0, fail: 1 }); + }); +}); diff --git a/src/core/piece/engine/MovementExecutor.ts b/src/core/piece/engine/MovementExecutor.ts index 0918504..f8c1b66 100644 --- a/src/core/piece/engine/MovementExecutor.ts +++ b/src/core/piece/engine/MovementExecutor.ts @@ -19,6 +19,7 @@ import { runAgent } from '../../../agents/runner.js'; import { InstructionBuilder, isOutputContractItem } from '../instruction/InstructionBuilder.js'; import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js'; import { detectMatchedRule } from '../evaluation/index.js'; +import { buildSessionKey } from '../session-key.js'; import { incrementMovementIteration, getPreviousOutput } from './state-manager.js'; import { createLogger } from '../../../shared/utils/index.js'; import type { OptionsBuilder } from './OptionsBuilder.js'; @@ -100,7 +101,7 @@ export class MovementExecutor { ? state.movementIterations.get(step.name) ?? 1 : incrementMovementIteration(state, step.name); const instruction = prebuiltInstruction ?? this.buildInstruction(step, movementIteration, state, task, maxIterations); - const sessionKey = step.persona ?? step.name; + const sessionKey = buildSessionKey(step); log.debug('Running movement', { movement: step.name, persona: step.persona ?? '(none)', diff --git a/src/core/piece/engine/OptionsBuilder.ts b/src/core/piece/engine/OptionsBuilder.ts index 8faee3b..ffdcb34 100644 --- a/src/core/piece/engine/OptionsBuilder.ts +++ b/src/core/piece/engine/OptionsBuilder.ts @@ -10,6 +10,7 @@ import type { PieceMovement, PieceState, Language } from '../../models/types.js' import type { RunAgentOptions } from '../../../agents/runner.js'; import type { PhaseRunnerContext } from '../phase-runner.js'; import type { PieceEngineOptions, PhaseName } from '../types.js'; +import { buildSessionKey } from '../session-key.js'; export class OptionsBuilder { constructor( @@ -66,7 +67,7 @@ export class OptionsBuilder { return { ...this.buildBaseOptions(step), - sessionId: shouldResumeSession ? this.getSessionId(step.persona ?? step.name) : undefined, + sessionId: shouldResumeSession ? this.getSessionId(buildSessionKey(step)) : undefined, allowedTools, mcpServers: step.mcpServers, }; diff --git a/src/core/piece/engine/ParallelRunner.ts b/src/core/piece/engine/ParallelRunner.ts index 9d80b7b..6cdf8f1 100644 --- a/src/core/piece/engine/ParallelRunner.ts +++ b/src/core/piece/engine/ParallelRunner.ts @@ -15,7 +15,8 @@ import { ParallelLogger } from './parallel-logger.js'; import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js'; import { detectMatchedRule } from '../evaluation/index.js'; import { incrementMovementIteration } from './state-manager.js'; -import { createLogger } from '../../../shared/utils/index.js'; +import { createLogger, getErrorMessage } from '../../../shared/utils/index.js'; +import { buildSessionKey } from '../session-key.js'; import type { OptionsBuilder } from './OptionsBuilder.js'; import type { MovementExecutor } from './MovementExecutor.js'; import type { PieceEngineOptions, PhaseName } from '../types.js'; @@ -86,12 +87,17 @@ export class ParallelRunner { callAiJudge: this.deps.callAiJudge, }; - // Run all sub-movements concurrently - const subResults = await Promise.all( + // Run all sub-movements concurrently (failures are captured, not thrown) + const settled = await Promise.allSettled( subMovements.map(async (subMovement, index) => { const subIteration = incrementMovementIteration(state, subMovement.name); const subInstruction = this.deps.movementExecutor.buildInstruction(subMovement, subIteration, state, task, maxIterations); + // Session key uses buildSessionKey (persona:provider) — same as normal movements. + // This ensures sessions are shared across movements with the same persona+provider, + // while different providers (e.g., claude-eye vs codex-eye) get separate sessions. + const subSessionKey = buildSessionKey(subMovement); + // Phase 1: main execution (Write excluded if sub-movement has report) const baseOptions = this.deps.optionsBuilder.buildAgentOptions(subMovement); @@ -100,13 +106,12 @@ export class ParallelRunner { ? { ...baseOptions, onStream: parallelLogger.createStreamHandler(subMovement.name, index) } : baseOptions; - const subSessionKey = subMovement.persona ?? subMovement.name; this.deps.onPhaseStart?.(subMovement, 1, 'execute', subInstruction); const subResponse = await runAgent(subMovement.persona, subInstruction, agentOptions); updatePersonaSession(subSessionKey, subResponse.sessionId); this.deps.onPhaseComplete?.(subMovement, 1, 'execute', subResponse.content, subResponse.status, subResponse.error); - // Build phase context for this sub-movement with its lastResponse + // Phase 2/3 context — no overrides needed, phase-runner uses buildSessionKey internally const phaseCtx = this.deps.optionsBuilder.buildPhaseRunnerContext(state, subResponse.content, updatePersonaSession, this.deps.onPhaseStart, this.deps.onPhaseComplete); // Phase 2: report output for sub-movement @@ -132,6 +137,32 @@ export class ParallelRunner { }), ); + // Map settled results: fulfilled → as-is, rejected → blocked AgentResponse + const subResults = settled.map((result, index) => { + if (result.status === 'fulfilled') { + return result.value; + } + const failedMovement = subMovements[index]!; + const errorMsg = getErrorMessage(result.reason); + log.error('Sub-movement failed', { movement: failedMovement.name, error: errorMsg }); + const blockedResponse: AgentResponse = { + persona: failedMovement.name, + status: 'blocked', + content: '', + timestamp: new Date(), + error: errorMsg, + }; + state.movementOutputs.set(failedMovement.name, blockedResponse); + return { subMovement: failedMovement, response: blockedResponse, instruction: '' }; + }); + + // If all sub-movements failed (error-originated), throw + const allFailed = subResults.every(r => r.response.error != null); + if (allFailed) { + const errors = subResults.map(r => `${r.subMovement.name}: ${r.response.error}`).join('; '); + throw new Error(`All parallel sub-movements failed: ${errors}`); + } + // Print completion summary if (parallelLogger) { parallelLogger.printSummary( diff --git a/src/core/piece/phase-runner.ts b/src/core/piece/phase-runner.ts index 9c0932a..21b2e62 100644 --- a/src/core/piece/phase-runner.ts +++ b/src/core/piece/phase-runner.ts @@ -14,6 +14,7 @@ import { ReportInstructionBuilder } from './instruction/ReportInstructionBuilder import { hasTagBasedRules, getReportFiles } from './evaluation/rule-utils.js'; import { JudgmentStrategyFactory, type JudgmentContext } from './judgment/index.js'; import { createLogger } from '../../shared/utils/index.js'; +import { buildSessionKey } from './session-key.js'; const log = createLogger('phase-runner'); @@ -75,7 +76,7 @@ export async function runReportPhase( movementIteration: number, ctx: PhaseRunnerContext, ): Promise { - const sessionKey = step.persona ?? step.name; + const sessionKey = buildSessionKey(step); let currentSessionId = ctx.getSessionId(sessionKey); if (!currentSessionId) { throw new Error(`Report phase requires a session to resume, but no sessionId found for persona "${sessionKey}" in movement "${step.name}"`); @@ -159,7 +160,7 @@ export async function runStatusJudgmentPhase( // フォールバック戦略を順次試行(AutoSelectStrategy含む) const strategies = JudgmentStrategyFactory.createStrategies(); - const sessionKey = step.persona ?? step.name; + const sessionKey = buildSessionKey(step); const judgmentContext: JudgmentContext = { step, cwd: ctx.cwd, diff --git a/src/core/piece/session-key.ts b/src/core/piece/session-key.ts new file mode 100644 index 0000000..4fb9116 --- /dev/null +++ b/src/core/piece/session-key.ts @@ -0,0 +1,29 @@ +/** + * Session key generation for persona sessions. + * + * When multiple movements share the same persona but use different providers + * (e.g., claude-eye uses Claude, codex-eye uses Codex, both with persona "coder"), + * sessions must be keyed by provider to prevent cross-provider contamination. + * + * Without provider in the key, a Codex session ID could overwrite a Claude session, + * causing Claude to attempt resuming a non-existent session file (exit code 1). + */ + +import type { PieceMovement } from '../models/types.js'; + +/** + * Build a unique session key for a movement. + * + * - Base key: `step.persona ?? step.name` + * - If the movement specifies a provider, appends `:{provider}` to disambiguate + * + * Examples: + * - persona="coder", provider=undefined → "coder" + * - persona="coder", provider="claude" → "coder:claude" + * - persona="coder", provider="codex" → "coder:codex" + * - persona=undefined, name="plan" → "plan" + */ +export function buildSessionKey(step: PieceMovement): string { + const base = step.persona ?? step.name; + return step.provider ? `${base}:${step.provider}` : base; +} diff --git a/src/features/interactive/lineEditor.ts b/src/features/interactive/lineEditor.ts index adfc042..d622493 100644 --- a/src/features/interactive/lineEditor.ts +++ b/src/features/interactive/lineEditor.ts @@ -8,6 +8,7 @@ */ import * as readline from 'node:readline'; +import { StringDecoder } from 'node:string_decoder'; import { stripAnsi, getDisplayWidth } from '../../shared/utils/text.js'; /** Escape sequences for terminal protocol control */ @@ -418,9 +419,12 @@ export function readMultilineInput(prompt: string): Promise { // --- Input dispatch --- + const utf8Decoder = new StringDecoder('utf8'); + function onData(data: Buffer): void { try { - const str = data.toString('utf-8'); + const str = utf8Decoder.write(data); + if (!str) return; parseInputData(str, { onPasteStart() { state = 'paste'; }, diff --git a/src/features/tasks/execute/parallelExecution.ts b/src/features/tasks/execute/parallelExecution.ts index bd2a984..67130ef 100644 --- a/src/features/tasks/execute/parallelExecution.ts +++ b/src/features/tasks/execute/parallelExecution.ts @@ -1,8 +1,10 @@ /** - * Parallel task execution strategy. + * Worker pool task execution strategy. * - * Runs tasks in batches of up to `concurrency` tasks at a time. - * Uses a single AbortController shared across all tasks in all batches. + * Runs tasks using a fixed-size worker pool. Each worker picks up the next + * available task as soon as it finishes the current one, maximizing slot + * utilization. Works for both sequential (concurrency=1) and parallel + * (concurrency>1) execution through the same code path. */ import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js'; @@ -11,61 +13,73 @@ import { executeAndCompleteTask } from './taskExecution.js'; import { installSigIntHandler } from './sigintHandler.js'; import type { TaskExecutionOptions } from './types.js'; -interface BatchResult { +export interface WorkerPoolResult { success: number; fail: number; } /** - * Run tasks in parallel batches. + * Run tasks using a worker pool with the given concurrency. * - * @returns Aggregated success/fail counts across all batches + * Algorithm: + * 1. Create a shared AbortController + * 2. Maintain a queue of pending tasks and a set of active promises + * 3. Fill available slots from the queue + * 4. Wait for any active task to complete (Promise.race) + * 5. Record result, fill freed slot from queue + * 6. Repeat until queue is empty and all active tasks complete */ -export async function runParallel( +export async function runWithWorkerPool( taskRunner: TaskRunner, initialTasks: TaskInfo[], concurrency: number, cwd: string, pieceName: string, options?: TaskExecutionOptions, -): Promise { +): Promise { const abortController = new AbortController(); const { cleanup } = installSigIntHandler(() => abortController.abort()); let successCount = 0; let failCount = 0; - try { - let batch = initialTasks; - while (batch.length > 0) { - blankLine(); - info(`=== Running batch of ${batch.length} task(s) ===`); - for (const task of batch) { - info(` - ${task.name}`); - } - blankLine(); + const queue = [...initialTasks]; + const active = new Map, TaskInfo>(); - const results = await Promise.all( - batch.map((task) => - executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { - abortSignal: abortController.signal, - }), - ), + try { + while (queue.length > 0 || active.size > 0) { + if (abortController.signal.aborted) { + break; + } + + fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController); + + if (active.size === 0) { + break; + } + + const settled = await Promise.race( + [...active.keys()].map((p) => p.then( + (result) => ({ promise: p, result }), + () => ({ promise: p, result: false }), + )), ); - for (const taskSuccess of results) { - if (taskSuccess) { + const task = active.get(settled.promise); + active.delete(settled.promise); + + if (task) { + if (settled.result) { successCount++; } else { failCount++; } } - if (abortController.signal.aborted) { - break; + if (!abortController.signal.aborted && queue.length === 0) { + const nextTasks = taskRunner.claimNextTasks(concurrency - active.size); + queue.push(...nextTasks); } - - batch = taskRunner.getNextTasks(concurrency); } } finally { cleanup(); @@ -73,3 +87,28 @@ export async function runParallel( return { success: successCount, fail: failCount }; } + +function fillSlots( + queue: TaskInfo[], + active: Map, TaskInfo>, + concurrency: number, + taskRunner: TaskRunner, + cwd: string, + pieceName: string, + options: TaskExecutionOptions | undefined, + abortController: AbortController, +): void { + while (active.size < concurrency && queue.length > 0) { + const task = queue.shift()!; + const isParallel = concurrency > 1; + + blankLine(); + info(`=== Task: ${task.name} ===`); + + const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { + abortSignal: isParallel ? abortController.signal : undefined, + taskPrefix: isParallel ? task.name : undefined, + }); + active.set(promise, task); + } +} diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index b777ece..f6f5156 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -322,9 +322,10 @@ export async function executePiece( const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); const totalMovements = pieceConfig.movements.length; - // Use quiet mode: forced quiet in parallel execution, or CLI/config setting - const quiet = options.quiet === true || isQuietMode(); - displayRef.current = new StreamDisplay(step.personaDisplayName, quiet, { + const quiet = isQuietMode(); + const prefix = options.taskPrefix; + const agentLabel = prefix ? `${prefix}:${step.personaDisplayName}` : step.personaDisplayName; + displayRef.current = new StreamDisplay(agentLabel, quiet, { iteration, maxIterations: pieceConfig.maxIterations, movementIndex: movementIndex >= 0 ? movementIndex : 0, diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 74cc71a..1dcd2bd 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -17,7 +17,7 @@ import { executePiece } from './pieceExecution.js'; import { DEFAULT_PIECE_NAME } from '../../../shared/constants.js'; import type { TaskExecutionOptions, ExecuteTaskOptions } from './types.js'; import { createPullRequest, buildPrBody, pushBranch } from '../../../infra/github/index.js'; -import { runParallel } from './parallelExecution.js'; +import { runWithWorkerPool } from './parallelExecution.js'; import { resolveTaskExecution } from './resolveTask.js'; export type { TaskExecutionOptions, ExecuteTaskOptions }; @@ -28,7 +28,7 @@ const log = createLogger('task'); * Execute a single task with piece. */ export async function executeTask(options: ExecuteTaskOptions): Promise { - const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, quiet } = options; + const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix } = options; const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd); if (!pieceConfig) { @@ -58,7 +58,7 @@ export async function executeTask(options: ExecuteTaskOptions): Promise startMovement, retryNote, abortSignal, - quiet, + taskPrefix, }); return result.success; } @@ -77,7 +77,7 @@ export async function executeAndCompleteTask( cwd: string, pieceName: string, options?: TaskExecutionOptions, - parallelOptions?: { abortSignal: AbortSignal }, + parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string }, ): Promise { const startedAt = new Date().toISOString(); const executionLog: string[] = []; @@ -95,7 +95,7 @@ export async function executeAndCompleteTask( startMovement, retryNote, abortSignal: parallelOptions?.abortSignal, - quiet: parallelOptions !== undefined, + taskPrefix: parallelOptions?.taskPrefix, }); const completedAt = new Date().toISOString(); @@ -167,44 +167,11 @@ export async function executeAndCompleteTask( } } -/** - * Run tasks sequentially, fetching one at a time. - */ -async function runSequential( - taskRunner: TaskRunner, - initialTask: TaskInfo, - cwd: string, - pieceName: string, - options?: TaskExecutionOptions, -): Promise<{ success: number; fail: number }> { - let successCount = 0; - let failCount = 0; - - let task: TaskInfo | undefined = initialTask; - while (task) { - blankLine(); - info(`=== Task: ${task.name} ===`); - blankLine(); - - const taskSuccess = await executeAndCompleteTask(task, taskRunner, cwd, pieceName, options); - - if (taskSuccess) { - successCount++; - } else { - failCount++; - } - - task = taskRunner.getNextTask() ?? undefined; - } - - return { success: successCount, fail: failCount }; -} - /** * Run all pending tasks from .takt/tasks/ * - * concurrency=1: 逐次実行(従来動作) - * concurrency=N (N>1): 最大N個のタスクをバッチ並列実行 + * Uses a worker pool for both sequential (concurrency=1) and parallel + * (concurrency>1) execution through the same code path. */ export async function runAllTasks( cwd: string, @@ -215,7 +182,7 @@ export async function runAllTasks( const globalConfig = loadGlobalConfig(); const concurrency = globalConfig.concurrency; - const initialTasks = taskRunner.getNextTasks(concurrency); + const initialTasks = taskRunner.claimNextTasks(concurrency); if (initialTasks.length === 0) { info('No pending tasks in .takt/tasks/'); @@ -228,10 +195,7 @@ export async function runAllTasks( info(`Concurrency: ${concurrency}`); } - // initialTasks is guaranteed non-empty at this point (early return above) - const result = concurrency <= 1 - ? await runSequential(taskRunner, initialTasks[0]!, cwd, pieceName, options) - : await runParallel(taskRunner, initialTasks, concurrency, cwd, pieceName, options); + const result = await runWithWorkerPool(taskRunner, initialTasks, concurrency, cwd, pieceName, options); const totalCount = result.success + result.fail; blankLine(); diff --git a/src/features/tasks/execute/types.ts b/src/features/tasks/execute/types.ts index 198a8fa..d450607 100644 --- a/src/features/tasks/execute/types.ts +++ b/src/features/tasks/execute/types.ts @@ -40,8 +40,8 @@ export interface PieceExecutionOptions { retryNote?: string; /** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */ abortSignal?: AbortSignal; - /** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */ - quiet?: boolean; + /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ + taskPrefix?: string; } export interface TaskExecutionOptions { @@ -70,8 +70,8 @@ export interface ExecuteTaskOptions { retryNote?: string; /** External abort signal for parallel execution — when provided, SIGINT handling is delegated to caller */ abortSignal?: AbortSignal; - /** Force quiet mode for streaming output (used in parallel execution to prevent interleaving) */ - quiet?: boolean; + /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ + taskPrefix?: string; } export interface PipelineExecutionOptions { diff --git a/src/infra/claude/executor.ts b/src/infra/claude/executor.ts index 233f3aa..254749c 100644 --- a/src/infra/claude/executor.ts +++ b/src/infra/claude/executor.ts @@ -35,10 +35,38 @@ const log = createLogger('claude-sdk'); export class QueryExecutor { /** * Execute a Claude query. + * If session resume fails with a process exit error, retries without resume. */ async execute( prompt: string, options: ClaudeSpawnOptions, + ): Promise { + const result = await this.executeOnce(prompt, options); + + // Retry without session resume if it appears to be a session resume failure + if ( + result.error + && options.sessionId + && result.error.includes('exited with code') + && !result.content + ) { + log.info('Session resume may have failed, retrying without resume', { + sessionId: options.sessionId, + error: result.error, + }); + const retryOptions: ClaudeSpawnOptions = { ...options, sessionId: undefined }; + return this.executeOnce(prompt, retryOptions); + } + + return result; + } + + /** + * Execute a single Claude query attempt. + */ + private async executeOnce( + prompt: string, + options: ClaudeSpawnOptions, ): Promise { const queryId = generateQueryId(); @@ -50,7 +78,16 @@ export class QueryExecutor { allowedTools: options.allowedTools, }); - const sdkOptions = new SdkOptionsBuilder(options).build(); + const stderrChunks: string[] = []; + const optionsWithStderr: ClaudeSpawnOptions = { + ...options, + onStderr: (data: string) => { + stderrChunks.push(data); + log.debug('Claude stderr', { queryId, data: data.trimEnd() }); + options.onStderr?.(data); + }, + }; + const sdkOptions = new SdkOptionsBuilder(optionsWithStderr).build(); let sessionId: string | undefined; let success = false; @@ -115,7 +152,7 @@ export class QueryExecutor { }; } catch (error) { unregisterQuery(queryId); - return QueryExecutor.handleQueryError(error, queryId, sessionId, hasResultMessage, success, resultContent); + return QueryExecutor.handleQueryError(error, queryId, sessionId, hasResultMessage, success, resultContent, stderrChunks); } } @@ -130,6 +167,7 @@ export class QueryExecutor { hasResultMessage: boolean, success: boolean, resultContent: string | undefined, + stderrChunks: string[], ): ClaudeResult { if (error instanceof AbortError) { log.info('Claude query was interrupted', { queryId }); @@ -170,7 +208,11 @@ export class QueryExecutor { return { success: false, content: '', error: 'Request timed out. Please try again.' }; } - return { success: false, content: '', error: errorMessage }; + const stderrOutput = stderrChunks.join('').trim(); + const errorWithStderr = stderrOutput + ? `${errorMessage}\nstderr: ${stderrOutput}` + : errorMessage; + return { success: false, content: '', error: errorWithStderr }; } } diff --git a/src/infra/claude/options-builder.ts b/src/infra/claude/options-builder.ts index 32bbdf8..86e0871 100644 --- a/src/infra/claude/options-builder.ts +++ b/src/infra/claude/options-builder.ts @@ -85,6 +85,10 @@ export class SdkOptionsBuilder { sdkOptions.continue = false; } + if (this.options.onStderr) { + sdkOptions.stderr = this.options.onStderr; + } + return sdkOptions; } diff --git a/src/infra/claude/types.ts b/src/infra/claude/types.ts index 782e9f2..cf33daa 100644 --- a/src/infra/claude/types.ts +++ b/src/infra/claude/types.ts @@ -166,4 +166,6 @@ export interface ClaudeSpawnOptions { bypassPermissions?: boolean; /** Anthropic API key to inject via env (bypasses CLI auth) */ anthropicApiKey?: string; + /** Callback for stderr output from the Claude Code process */ + onStderr?: (data: string) => void; } diff --git a/src/infra/task/runner.ts b/src/infra/task/runner.ts index 9b4cf63..0eb58b7 100644 --- a/src/infra/task/runner.ts +++ b/src/infra/task/runner.ts @@ -31,6 +31,7 @@ export class TaskRunner { private tasksDir: string; private completedDir: string; private failedDir: string; + private claimedPaths = new Set(); constructor(projectDir: string) { this.projectDir = projectDir; @@ -106,11 +107,19 @@ export class TaskRunner { } /** - * 次に実行すべきタスクを指定数分取得 + * 予約付きタスク取得 + * + * claimed 済みのタスクを除外して返し、返したタスクを claimed に追加する。 + * 並列実行時に同一タスクが複数ワーカーに返されることを防ぐ。 */ - getNextTasks(count: number): TaskInfo[] { - const tasks = this.listTasks(); - return tasks.slice(0, count); + claimNextTasks(count: number): TaskInfo[] { + const allTasks = this.listTasks(); + const unclaimed = allTasks.filter((t) => !this.claimedPaths.has(t.filePath)); + const claimed = unclaimed.slice(0, count); + for (const task of claimed) { + this.claimedPaths.add(task.filePath); + } + return claimed; } /** @@ -318,6 +327,8 @@ export class TaskRunner { const movedTaskFile = path.join(taskTargetDir, `${result.task.name}${originalExt}`); fs.renameSync(result.task.filePath, movedTaskFile); + this.claimedPaths.delete(result.task.filePath); + // レポートを生成 const reportFile = path.join(taskTargetDir, 'report.md'); const reportContent = this.generateReport(result);