From 47612d9dcc34cfd52ec98b83f1ae7272db6397a9 Mon Sep 17 00:00:00 2001 From: nrslib <38722970+nrslib@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:20:50 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20agent-usecases=20/=20schema-loader?= =?UTF-8?q?=20=E3=81=AE=E7=A7=BB=E5=8B=95=E3=81=A8=20pieceExecution=20?= =?UTF-8?q?=E3=81=AE=E8=B2=AC=E5=8B=99=E5=88=86=E5=89=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - agent-usecases.ts を core/piece/ → agents/ へ移動 - schema-loader.ts を core/piece/ → infra/resources/ へ移動 - interactive-summary-types.ts を分離、shared/types/ ディレクトリを追加 - pieceExecution.ts を abortHandler / analyticsEmitter / iterationLimitHandler / outputFns / runMeta / sessionLogger に分割 - buildMergeFn を async → sync に変更(custom merge の file 戦略を削除) - cleanupOrphanedClone にパストラバーサル保護を追加 - review-fix / frontend-review-fix ピースの IT テストを追加 --- src/__tests__/agent-usecases.test.ts | 4 +- src/__tests__/arpeggio-merge.test.ts | 54 +- .../cli-routing-issue-resolve.test.ts | 18 - src/__tests__/clone.test.ts | 49 +- src/__tests__/engine-arpeggio.test.ts | 28 - src/__tests__/it-piece-patterns.test.ts | 302 +++++++ src/__tests__/models.test.ts | 16 - src/__tests__/pieceLoader.test.ts | 74 ++ src/__tests__/schema-loader.test.ts | 10 +- src/__tests__/session.test.ts | 19 - src/{core/piece => agents}/agent-usecases.ts | 14 +- src/agents/ai-judge.ts | 2 +- src/app/cli/program.ts | 4 +- src/app/cli/routing.ts | 8 - src/core/models/piece-types.ts | 10 +- src/core/models/schemas.ts | 6 +- src/core/piece/arpeggio/merge.ts | 96 +- src/core/piece/engine/ArpeggioRunner.ts | 4 +- src/core/piece/engine/MovementExecutor.ts | 2 +- src/core/piece/engine/ParallelRunner.ts | 2 +- src/core/piece/engine/TeamLeaderRunner.ts | 2 +- .../piece/engine/team-leader-execution.ts | 2 +- src/core/piece/index.ts | 10 - .../piece/instruction/InstructionBuilder.ts | 8 +- src/core/piece/phase-runner.ts | 2 +- src/core/piece/status-judgment-phase.ts | 2 +- src/core/piece/types.ts | 76 +- .../interactive/interactive-summary-types.ts | 80 ++ .../interactive/interactive-summary.ts | 110 +-- src/features/interactive/retryMode.ts | 24 +- src/features/repertoire/constants.ts | 5 +- src/features/tasks/execute/abortHandler.ts | 91 ++ .../tasks/execute/analyticsEmitter.ts | 92 ++ .../tasks/execute/iterationLimitHandler.ts | 64 ++ src/features/tasks/execute/outputFns.ts | 76 ++ src/features/tasks/execute/pieceExecution.ts | 850 +++--------------- src/features/tasks/execute/runMeta.ts | 60 ++ src/features/tasks/execute/sessionLogger.ts | 178 ++++ src/index.ts | 18 +- src/infra/config/constants.ts | 10 + src/infra/config/loaders/pieceParser.ts | 4 +- src/infra/config/paths.ts | 2 +- src/infra/config/project/sessionStore.ts | 168 ++-- src/infra/fs/session.ts | 12 +- .../resources}/schema-loader.ts | 2 +- src/infra/task/clone.ts | 29 +- src/shared/types/provider.ts | 64 ++ src/shared/ui/StreamDisplay.ts | 6 +- src/shared/utils/providerEventLogger.ts | 2 +- 49 files changed, 1558 insertions(+), 1213 deletions(-) rename src/{core/piece => agents}/agent-usecases.ts (96%) create mode 100644 src/features/interactive/interactive-summary-types.ts create mode 100644 src/features/tasks/execute/abortHandler.ts create mode 100644 src/features/tasks/execute/analyticsEmitter.ts create mode 100644 src/features/tasks/execute/iterationLimitHandler.ts create mode 100644 src/features/tasks/execute/outputFns.ts create mode 100644 src/features/tasks/execute/runMeta.ts create mode 100644 src/features/tasks/execute/sessionLogger.ts create mode 100644 src/infra/config/constants.ts rename src/{core/piece => infra/resources}/schema-loader.ts (97%) create mode 100644 src/shared/types/provider.ts diff --git a/src/__tests__/agent-usecases.test.ts b/src/__tests__/agent-usecases.test.ts index fce3a8e..a02e099 100644 --- a/src/__tests__/agent-usecases.test.ts +++ b/src/__tests__/agent-usecases.test.ts @@ -10,13 +10,13 @@ import { judgeStatus, decomposeTask, requestMoreParts, -} from '../core/piece/agent-usecases.js'; +} from '../agents/agent-usecases.js'; vi.mock('../agents/runner.js', () => ({ runAgent: vi.fn(), })); -vi.mock('../core/piece/schema-loader.js', () => ({ +vi.mock('../infra/resources/schema-loader.js', () => ({ loadJudgmentSchema: vi.fn(() => ({ type: 'judgment' })), loadEvaluationSchema: vi.fn(() => ({ type: 'evaluation' })), loadDecompositionSchema: vi.fn((maxParts: number) => ({ type: 'decomposition', maxParts })), diff --git a/src/__tests__/arpeggio-merge.test.ts b/src/__tests__/arpeggio-merge.test.ts index a27f8c5..0c2654e 100644 --- a/src/__tests__/arpeggio-merge.test.ts +++ b/src/__tests__/arpeggio-merge.test.ts @@ -17,9 +17,9 @@ function makeFailedResult(batchIndex: number, error: string): BatchResult { describe('buildMergeFn', () => { describe('concat strategy', () => { - it('should concatenate results with default separator (newline)', async () => { + it('should concatenate results with default separator (newline)', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ makeResult(0, 'Result A'), makeResult(1, 'Result B'), @@ -28,9 +28,9 @@ describe('buildMergeFn', () => { expect(mergeFn(results)).toBe('Result A\nResult B\nResult C'); }); - it('should concatenate results with custom separator', async () => { + it('should concatenate results with custom separator', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'concat', separator: '\n---\n' }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ makeResult(0, 'A'), makeResult(1, 'B'), @@ -38,9 +38,9 @@ describe('buildMergeFn', () => { expect(mergeFn(results)).toBe('A\n---\nB'); }); - it('should sort results by batch index', async () => { + it('should sort results by batch index', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ makeResult(2, 'C'), makeResult(0, 'A'), @@ -49,9 +49,9 @@ describe('buildMergeFn', () => { expect(mergeFn(results)).toBe('A\nB\nC'); }); - it('should filter out failed results', async () => { + it('should filter out failed results', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ makeResult(0, 'A'), makeFailedResult(1, 'oops'), @@ -60,9 +60,9 @@ describe('buildMergeFn', () => { expect(mergeFn(results)).toBe('A\nC'); }); - it('should return empty string when all results failed', async () => { + it('should return empty string when all results failed', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'concat' }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ makeFailedResult(0, 'error1'), makeFailedResult(1, 'error2'), @@ -71,38 +71,18 @@ describe('buildMergeFn', () => { }); }); - describe('custom strategy with inline_js', () => { - it('should execute inline JS merge function', async () => { + describe('custom strategy', () => { + it('should execute inline_js merge function', () => { const config: ArpeggioMergeMovementConfig = { strategy: 'custom', - inlineJs: 'return results.filter(r => r.success).map(r => r.content.toUpperCase()).join(", ");', + inlineJs: 'return results.filter((r) => r.success).map((r) => r.content).reverse().join("|");', }; - const mergeFn = await buildMergeFn(config); + const mergeFn = buildMergeFn(config); const results = [ - makeResult(0, 'hello'), - makeResult(1, 'world'), + makeResult(1, 'B'), + makeResult(0, 'A'), ]; - expect(mergeFn(results)).toBe('HELLO, WORLD'); - }); - - it('should throw when inline JS returns non-string', async () => { - const config: ArpeggioMergeMovementConfig = { - strategy: 'custom', - inlineJs: 'return 42;', - }; - const mergeFn = await buildMergeFn(config); - expect(() => mergeFn([makeResult(0, 'test')])).toThrow( - 'Inline JS merge function must return a string, got number' - ); - }); - }); - - describe('custom strategy validation', () => { - it('should throw when custom strategy has neither inline_js nor file', async () => { - const config: ArpeggioMergeMovementConfig = { strategy: 'custom' }; - await expect(buildMergeFn(config)).rejects.toThrow( - 'Custom merge strategy requires either inline_js or file path' - ); + expect(mergeFn(results)).toBe('B|A'); }); }); }); diff --git a/src/__tests__/cli-routing-issue-resolve.test.ts b/src/__tests__/cli-routing-issue-resolve.test.ts index d1f034f..f61f035 100644 --- a/src/__tests__/cli-routing-issue-resolve.test.ts +++ b/src/__tests__/cli-routing-issue-resolve.test.ts @@ -209,24 +209,6 @@ describe('Issue resolution in routing', () => { mockExit.mockRestore(); }); - it('should show migration error and exit when deprecated --create-worktree is used', async () => { - mockOpts.createWorktree = 'yes'; - - const mockExit = vi.spyOn(process, 'exit').mockImplementation(() => { - throw new Error('process.exit called'); - }); - - await expect(executeDefaultAction()).rejects.toThrow('process.exit called'); - - expect(mockError).toHaveBeenCalledWith( - '--create-worktree has been removed. execute now always runs in-place. Use "takt add" (save_task) + "takt run" for worktree-based execution.' - ); - expect(mockExit).toHaveBeenCalledWith(1); - expect(mockInteractiveMode).not.toHaveBeenCalled(); - expect(mockSelectAndExecuteTask).not.toHaveBeenCalled(); - - mockExit.mockRestore(); - }); describe('--issue option', () => { it('should resolve issue and pass to interactive mode when --issue is specified', async () => { diff --git a/src/__tests__/clone.test.ts b/src/__tests__/clone.test.ts index 77e8f92..172ad96 100644 --- a/src/__tests__/clone.test.ts +++ b/src/__tests__/clone.test.ts @@ -19,14 +19,18 @@ vi.mock('node:fs', () => ({ mkdirSync: vi.fn(), mkdtempSync: vi.fn(), writeFileSync: vi.fn(), + readFileSync: vi.fn(), existsSync: vi.fn(), rmSync: vi.fn(), + unlinkSync: vi.fn(), }, mkdirSync: vi.fn(), mkdtempSync: vi.fn(), writeFileSync: vi.fn(), + readFileSync: vi.fn(), existsSync: vi.fn(), rmSync: vi.fn(), + unlinkSync: vi.fn(), })); vi.mock('../shared/utils/index.js', async (importOriginal) => ({ @@ -49,9 +53,10 @@ vi.mock('../infra/config/project/projectConfig.js', async (importOriginal) => ({ })); import { execFileSync } from 'node:child_process'; +import * as fs from 'node:fs'; import { loadGlobalConfig } from '../infra/config/global/globalConfig.js'; import { loadProjectConfig } from '../infra/config/project/projectConfig.js'; -import { createSharedClone, createTempCloneForBranch } from '../infra/task/clone.js'; +import { createSharedClone, createTempCloneForBranch, cleanupOrphanedClone } from '../infra/task/clone.js'; const mockExecFileSync = vi.mocked(execFileSync); const mockLoadProjectConfig = vi.mocked(loadProjectConfig); @@ -946,3 +951,45 @@ describe('shallow clone fallback', () => { expect(cloneCalls[0]).toContain('--dissociate'); }); }); + +describe('cleanupOrphanedClone path traversal protection', () => { + // projectDir = '/project' → resolveCloneBaseDir → path.join('/project', '..', 'takt-worktrees') = '/takt-worktrees' + const PROJECT_DIR = '/project'; + const BRANCH = 'my-branch'; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('should refuse to remove clone path outside clone base directory', () => { + // clonePath points above the clone base directory (path traversal attempt) + vi.mocked(fs.readFileSync).mockReturnValueOnce( + JSON.stringify({ clonePath: '/etc/malicious' }) + ); + + cleanupOrphanedClone(PROJECT_DIR, BRANCH); + + expect(mockLogError).toHaveBeenCalledWith( + 'Refusing to remove clone outside of clone base directory', + expect.objectContaining({ branch: BRANCH }) + ); + expect(vi.mocked(fs.rmSync)).not.toHaveBeenCalled(); + }); + + it('should remove clone when path is within clone base directory', () => { + // resolveCloneBaseDir('/project') = path.resolve('/project/../takt-worktrees') = '/takt-worktrees' + const validClonePath = '/takt-worktrees/20260101T0000-my-task'; + vi.mocked(fs.readFileSync).mockReturnValueOnce( + JSON.stringify({ clonePath: validClonePath }) + ); + vi.mocked(fs.existsSync).mockReturnValueOnce(true); + + cleanupOrphanedClone(PROJECT_DIR, BRANCH); + + expect(mockLogError).not.toHaveBeenCalled(); + expect(vi.mocked(fs.rmSync)).toHaveBeenCalledWith( + validClonePath, + expect.objectContaining({ recursive: true }) + ); + }); +}); diff --git a/src/__tests__/engine-arpeggio.test.ts b/src/__tests__/engine-arpeggio.test.ts index 35f55b2..0038d0c 100644 --- a/src/__tests__/engine-arpeggio.test.ts +++ b/src/__tests__/engine-arpeggio.test.ts @@ -251,32 +251,4 @@ describe('ArpeggioRunner integration', () => { expect(mockAgent).toHaveBeenCalledTimes(3); }); - it('should use custom merge function when configured', async () => { - const { tmpDir, csvPath, templatePath } = createArpeggioTestDir(); - const arpeggioConfig = createArpeggioConfig(csvPath, templatePath, { - merge: { - strategy: 'custom', - inlineJs: 'return results.filter(r => r.success).map(r => r.content).join(" | ");', - }, - }); - const config = buildArpeggioPieceConfig(arpeggioConfig, tmpDir); - - const mockAgent = vi.mocked(runAgent); - mockAgent - .mockResolvedValueOnce(makeResponse({ content: 'X' })) - .mockResolvedValueOnce(makeResponse({ content: 'Y' })) - .mockResolvedValueOnce(makeResponse({ content: 'Z' })); - - vi.mocked(detectMatchedRule).mockResolvedValueOnce({ - index: 0, - method: 'phase1_tag', - }); - - engine = new PieceEngine(config, tmpDir, 'test task', createEngineOptions(tmpDir)); - const state = await engine.run(); - - expect(state.status).toBe('completed'); - const output = state.movementOutputs.get('process'); - expect(output!.content).toBe('X | Y | Z'); - }); }); diff --git a/src/__tests__/it-piece-patterns.test.ts b/src/__tests__/it-piece-patterns.test.ts index ad984c1..2de14bc 100644 --- a/src/__tests__/it-piece-patterns.test.ts +++ b/src/__tests__/it-piece-patterns.test.ts @@ -438,3 +438,305 @@ describe('Piece Patterns IT: dual piece (4 parallel reviewers)', () => { expect(state.status).toBe('completed'); }); }); + +describe('Piece Patterns IT: review-fix piece', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('happy path: gather → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 5 parallel reviewers: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + { persona: 'testing-reviewer', status: 'done', content: 'approved' }, + { persona: 'requirements-reviewer', status: 'done', content: 'approved' }, + // Supervisor: ready to merge + { persona: 'supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review PR #1'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 5 parallel reviewers: security needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'needs_fix' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + { persona: 'testing-reviewer', status: 'done', content: 'approved' }, + { persona: 'requirements-reviewer', status: 'done', content: 'approved' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + { persona: 'testing-reviewer', status: 'done', content: 'approved' }, + { persona: 'requirements-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review PR #2'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); + + it('fix_supervisor path: supervise detects issues → fix_supervisor → supervise → COMPLETE', async () => { + const config = loadPiece('review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 5 parallel reviewers: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + { persona: 'testing-reviewer', status: 'done', content: 'approved' }, + { persona: 'requirements-reviewer', status: 'done', content: 'approved' }, + // Supervisor: issues detected → fix_supervisor + { persona: 'supervisor', status: 'done', content: '[SUPERVISE:2]\n\nIssues detected.' }, + // fix_supervisor: fixes complete → back to supervise + { persona: 'coder', status: 'done', content: '[FIX_SUPERVISOR:1]\n\nFixes for supervisor findings complete.' }, + // Supervisor: ready to merge + { persona: 'supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review PR #3'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); + +describe('Piece Patterns IT: frontend-review-fix piece (fix loop)', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('frontend-review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 4 parallel reviewers: frontend needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'frontend-reviewer', status: 'done', content: 'needs_fix' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'frontend-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'dual-supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review frontend PR'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); + +describe('Piece Patterns IT: backend-review-fix piece (fix loop)', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('backend-review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 3 parallel reviewers: security needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'needs_fix' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'dual-supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review backend PR'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); + +describe('Piece Patterns IT: dual-review-fix piece (fix loop)', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('dual-review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 4 parallel reviewers: qa needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'frontend-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'needs_fix' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'frontend-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'dual-supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review dual PR'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); + +describe('Piece Patterns IT: dual-cqrs-review-fix piece (fix loop)', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('dual-cqrs-review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 5 parallel reviewers: cqrs-es needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'cqrs-es-reviewer', status: 'done', content: 'needs_fix' }, + { persona: 'frontend-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'cqrs-es-reviewer', status: 'done', content: 'approved' }, + { persona: 'frontend-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'dual-supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review CQRS dual PR'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); + +describe('Piece Patterns IT: backend-cqrs-review-fix piece (fix loop)', () => { + let testDir: string; + + beforeEach(() => { + vi.clearAllMocks(); + testDir = createTestDir(); + }); + + afterEach(() => { + resetScenario(); + rmSync(testDir, { recursive: true, force: true }); + }); + + it('fix loop: reviewers any("needs_fix") → fix → reviewers (all approved) → supervise → COMPLETE', async () => { + const config = loadPiece('backend-cqrs-review-fix', testDir); + expect(config).not.toBeNull(); + + setMockScenario([ + { persona: 'planner', status: 'done', content: '[GATHER:1]\n\nReview target gathered.' }, + // 4 parallel reviewers: cqrs-es needs_fix + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'cqrs-es-reviewer', status: 'done', content: 'needs_fix' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Fix + { persona: 'coder', status: 'done', content: '[FIX:1]\n\nFixes complete.' }, + // Re-review: all approved + { persona: 'architecture-reviewer', status: 'done', content: 'approved' }, + { persona: 'cqrs-es-reviewer', status: 'done', content: 'approved' }, + { persona: 'security-reviewer', status: 'done', content: 'approved' }, + { persona: 'qa-reviewer', status: 'done', content: 'approved' }, + // Supervisor + { persona: 'dual-supervisor', status: 'done', content: '[SUPERVISE:1]\n\nAll validations complete, ready to merge.' }, + ]); + + const engine = createEngine(config!, testDir, 'Review backend CQRS PR'); + const state = await engine.run(); + + expect(state.status).toBe('completed'); + }); +}); diff --git a/src/__tests__/models.test.ts b/src/__tests__/models.test.ts index 0d04ff1..4b99b6e 100644 --- a/src/__tests__/models.test.ts +++ b/src/__tests__/models.test.ts @@ -209,22 +209,6 @@ describe('PieceConfigRawSchema', () => { expect(() => PieceConfigRawSchema.parse(config)).toThrow(); }); - it('should reject legacy permission_mode', () => { - const config = { - name: 'test-piece', - movements: [ - { - name: 'step1', - persona: 'coder', - permission_mode: 'edit', - instruction: '{task}', - }, - ], - }; - - expect(() => PieceConfigRawSchema.parse(config)).toThrow(); - }); - it('should require at least one movement', () => { const config = { name: 'empty-piece', diff --git a/src/__tests__/pieceLoader.test.ts b/src/__tests__/pieceLoader.test.ts index 677be57..b09d69d 100644 --- a/src/__tests__/pieceLoader.test.ts +++ b/src/__tests__/pieceLoader.test.ts @@ -283,3 +283,77 @@ describe('loadAllPiecesWithSources with repertoire pieces', () => { expect(repertoirePieces).toHaveLength(0); }); }); + +describe('normalizeArpeggio: strategy coercion via loadPieceByIdentifier', () => { + let tempDir: string; + + beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), 'takt-arpeggio-coerce-')); + // Dummy files required by normalizeArpeggio (resolved relative to piece dir) + writeFileSync(join(tempDir, 'template.md'), '{line:1}'); + writeFileSync(join(tempDir, 'data.csv'), 'col\nval'); + }); + + afterEach(() => { + rmSync(tempDir, { recursive: true, force: true }); + }); + + it('should preserve strategy:"custom" when loading arpeggio piece YAML', () => { + const pieceYaml = `name: arpeggio-coerce-test +initial_movement: process +max_movements: 5 +movements: + - name: process + persona: coder + arpeggio: + source: csv + source_path: ./data.csv + template: ./template.md + merge: + strategy: custom + inline_js: 'return results.map(r => r.content).join(", ");' + rules: + - condition: All processed + next: COMPLETE +`; + const piecePath = join(tempDir, 'piece.yaml'); + writeFileSync(piecePath, pieceYaml); + + const config = loadPieceByIdentifier(piecePath, tempDir); + + expect(config).not.toBeNull(); + const movement = config!.movements[0]!; + expect(movement.arpeggio).toBeDefined(); + expect(movement.arpeggio!.merge.strategy).toBe('custom'); + expect(movement.arpeggio!.merge.inlineJs).toContain('map'); + }); + + it('should preserve concat strategy and separator when loading arpeggio piece YAML', () => { + const pieceYaml = `name: arpeggio-concat-test +initial_movement: process +max_movements: 5 +movements: + - name: process + persona: coder + arpeggio: + source: csv + source_path: ./data.csv + template: ./template.md + merge: + strategy: concat + separator: "\\n---\\n" + rules: + - condition: All processed + next: COMPLETE +`; + const piecePath = join(tempDir, 'piece.yaml'); + writeFileSync(piecePath, pieceYaml); + + const config = loadPieceByIdentifier(piecePath, tempDir); + + expect(config).not.toBeNull(); + const movement = config!.movements[0]!; + expect(movement.arpeggio!.merge.strategy).toBe('concat'); + expect(movement.arpeggio!.merge.separator).toBe('\n---\n'); + }); +}); diff --git a/src/__tests__/schema-loader.test.ts b/src/__tests__/schema-loader.test.ts index 9521b8a..54499ff 100644 --- a/src/__tests__/schema-loader.test.ts +++ b/src/__tests__/schema-loader.test.ts @@ -63,7 +63,7 @@ describe('schema-loader', () => { }); it('同じスキーマを複数回ロードしても readFileSync は1回だけ', async () => { - const { loadJudgmentSchema } = await import('../core/piece/schema-loader.js'); + const { loadJudgmentSchema } = await import('../infra/resources/schema-loader.js'); const first = loadJudgmentSchema(); const second = loadJudgmentSchema(); @@ -74,7 +74,7 @@ describe('schema-loader', () => { }); it('loadDecompositionSchema は maxItems を注入し、呼び出しごとに独立したオブジェクトを返す', async () => { - const { loadDecompositionSchema } = await import('../core/piece/schema-loader.js'); + const { loadDecompositionSchema } = await import('../infra/resources/schema-loader.js'); const first = loadDecompositionSchema(2); const second = loadDecompositionSchema(5); @@ -88,14 +88,14 @@ describe('schema-loader', () => { }); it('loadDecompositionSchema は不正な maxParts を拒否する', async () => { - const { loadDecompositionSchema } = await import('../core/piece/schema-loader.js'); + const { loadDecompositionSchema } = await import('../infra/resources/schema-loader.js'); expect(() => loadDecompositionSchema(0)).toThrow('maxParts must be a positive integer: 0'); expect(() => loadDecompositionSchema(-1)).toThrow('maxParts must be a positive integer: -1'); }); it('loadMorePartsSchema は maxItems を注入し、呼び出しごとに独立したオブジェクトを返す', async () => { - const { loadMorePartsSchema } = await import('../core/piece/schema-loader.js'); + const { loadMorePartsSchema } = await import('../infra/resources/schema-loader.js'); const first = loadMorePartsSchema(1); const second = loadMorePartsSchema(4); @@ -109,7 +109,7 @@ describe('schema-loader', () => { }); it('loadMorePartsSchema は不正な maxAdditionalParts を拒否する', async () => { - const { loadMorePartsSchema } = await import('../core/piece/schema-loader.js'); + const { loadMorePartsSchema } = await import('../infra/resources/schema-loader.js'); expect(() => loadMorePartsSchema(0)).toThrow('maxAdditionalParts must be a positive integer: 0'); expect(() => loadMorePartsSchema(-1)).toThrow('maxAdditionalParts must be a positive integer: -1'); diff --git a/src/__tests__/session.test.ts b/src/__tests__/session.test.ts index cd3093d..2a72fc1 100644 --- a/src/__tests__/session.test.ts +++ b/src/__tests__/session.test.ts @@ -277,25 +277,6 @@ describe('NDJSON log', () => { expect(log!.status).toBe('completed'); }); - it('should still load legacy .json files', () => { - const logsDir = join(projectDir, '.takt', 'logs'); - mkdirSync(logsDir, { recursive: true }); - const legacyPath = join(logsDir, 'legacy-001.json'); - const legacyLog: SessionLog = { - task: 'legacy task', - projectDir, - pieceName: 'wf', - iterations: 0, - startTime: new Date().toISOString(), - status: 'running', - history: [], - }; - writeFileSync(legacyPath, JSON.stringify(legacyLog, null, 2), 'utf-8'); - - const log = loadSessionLog(legacyPath); - expect(log).not.toBeNull(); - expect(log!.task).toBe('legacy task'); - }); }); describe('appendNdjsonLine real-time characteristics', () => { diff --git a/src/core/piece/agent-usecases.ts b/src/agents/agent-usecases.ts similarity index 96% rename from src/core/piece/agent-usecases.ts rename to src/agents/agent-usecases.ts index 8f42623..14d6478 100644 --- a/src/core/piece/agent-usecases.ts +++ b/src/agents/agent-usecases.ts @@ -1,10 +1,10 @@ -import type { AgentResponse, PartDefinition, PieceRule, RuleMatchMethod, Language } from '../models/types.js'; -import { runAgent, type RunAgentOptions, type StreamCallback } from '../../agents/runner.js'; -import { detectJudgeIndex, buildJudgePrompt } from '../../agents/judge-utils.js'; -import { parseParts } from './engine/task-decomposer.js'; -import { loadJudgmentSchema, loadEvaluationSchema, loadDecompositionSchema, loadMorePartsSchema } from './schema-loader.js'; -import { detectRuleIndex } from '../../shared/utils/ruleIndex.js'; -import { ensureUniquePartIds, parsePartDefinitionEntry } from './part-definition-validator.js'; +import type { AgentResponse, PartDefinition, PieceRule, RuleMatchMethod, Language } from '../core/models/types.js'; +import { runAgent, type RunAgentOptions, type StreamCallback } from './runner.js'; +import { detectJudgeIndex, buildJudgePrompt } from './judge-utils.js'; +import { parseParts } from '../core/piece/engine/task-decomposer.js'; +import { loadJudgmentSchema, loadEvaluationSchema, loadDecompositionSchema, loadMorePartsSchema } from '../infra/resources/schema-loader.js'; +import { detectRuleIndex } from '../shared/utils/ruleIndex.js'; +import { ensureUniquePartIds, parsePartDefinitionEntry } from '../core/piece/part-definition-validator.js'; export interface JudgeStatusOptions { cwd: string; diff --git a/src/agents/ai-judge.ts b/src/agents/ai-judge.ts index 004b3d9..7ba297d 100644 --- a/src/agents/ai-judge.ts +++ b/src/agents/ai-judge.ts @@ -7,7 +7,7 @@ import type { AiJudgeCaller, AiJudgeCondition } from '../core/piece/types.js'; import { createLogger } from '../shared/utils/index.js'; -import { evaluateCondition } from '../core/piece/agent-usecases.js'; +import { evaluateCondition } from './agent-usecases.js'; const log = createLogger('ai-judge'); diff --git a/src/app/cli/program.ts b/src/app/cli/program.ts index 9fe0108..7a0e10a 100644 --- a/src/app/cli/program.ts +++ b/src/app/cli/program.ts @@ -6,7 +6,7 @@ */ import { createRequire } from 'node:module'; -import { Command, Option } from 'commander'; +import { Command } from 'commander'; import { resolve } from 'node:path'; import { initGlobalDirs, @@ -52,8 +52,6 @@ program .option('-t, --task ', 'Task content (as alternative to GitHub issue)') .option('--pipeline', 'Pipeline mode: non-interactive, no worktree, direct branch creation') .option('--skip-git', 'Skip branch creation, commit, and push (pipeline mode)') - // Deprecated compatibility option: keep parsing to show migration guidance. - .addOption(new Option('--create-worktree ').hideHelp()) .option('-q, --quiet', 'Minimal output mode: suppress AI output (for CI)') .option('-c, --continue', 'Continue from the last assistant session'); diff --git a/src/app/cli/routing.ts b/src/app/cli/routing.ts index d52de5d..c222c34 100644 --- a/src/app/cli/routing.ts +++ b/src/app/cli/routing.ts @@ -110,14 +110,6 @@ async function resolvePrInput( */ export async function executeDefaultAction(task?: string): Promise { const opts = program.opts(); - if (opts.createWorktree !== undefined) { - logError( - '--create-worktree has been removed. ' + - 'execute now always runs in-place. ' + - 'Use "takt add" (save_task) + "takt run" for worktree-based execution.' - ); - process.exit(1); - } if (!pipelineMode && (opts.autoPr === true || opts.draft === true)) { logError('--auto-pr/--draft are supported only in --pipeline mode'); process.exit(1); diff --git a/src/core/models/piece-types.ts b/src/core/models/piece-types.ts index 665b736..112d1ef 100644 --- a/src/core/models/piece-types.ts +++ b/src/core/models/piece-types.ts @@ -166,14 +166,14 @@ export interface PieceMovement { /** Merge configuration for arpeggio results */ export interface ArpeggioMergeMovementConfig { - /** Merge strategy: 'concat' (default), 'custom' */ + /** Merge strategy */ readonly strategy: 'concat' | 'custom'; - /** Inline JS merge function body (for custom strategy) */ - readonly inlineJs?: string; - /** Path to external JS merge file (for custom strategy, resolved to absolute) */ - readonly filePath?: string; /** Separator for concat strategy (default: '\n') */ readonly separator?: string; + /** Inline JS function body for custom merge strategy */ + readonly inlineJs?: string; + /** External JS module path for custom merge strategy */ + readonly file?: string; } /** Arpeggio configuration for data-driven batch processing movements */ diff --git a/src/core/models/schemas.ts b/src/core/models/schemas.ts index dce0501..6da2b26 100644 --- a/src/core/models/schemas.ts +++ b/src/core/models/schemas.ts @@ -249,7 +249,7 @@ export const ParallelSubMovementRawSchema = z.object({ mcp_servers: McpServersSchema, provider: z.enum(['claude', 'codex', 'opencode', 'cursor', 'copilot', 'mock']).optional(), model: z.string().optional(), - /** Removed legacy field (no backward compatibility) */ + /** Deprecated alias */ permission_mode: z.never().optional(), required_permission_mode: PermissionModeSchema.optional(), provider_options: MovementProviderOptionsSchema, @@ -282,7 +282,7 @@ export const PieceMovementRawSchema = z.object({ mcp_servers: McpServersSchema, provider: z.enum(['claude', 'codex', 'opencode', 'cursor', 'copilot', 'mock']).optional(), model: z.string().optional(), - /** Removed legacy field (no backward compatibility) */ + /** Deprecated alias */ permission_mode: z.never().optional(), /** Required minimum permission mode for tool execution in this movement */ required_permission_mode: PermissionModeSchema.optional(), @@ -349,6 +349,8 @@ export const PieceConfigRawSchema = z.object({ name: z.string().min(1), description: z.string().optional(), piece_config: PieceProviderOptionsSchema, + /** Deprecated alias */ + permission_mode: z.never().optional(), /** Piece-level persona definitions — map of name to .md file path or inline content */ personas: z.record(z.string(), z.string()).optional(), /** Piece-level policy definitions — map of name to .md file path or inline content */ diff --git a/src/core/piece/arpeggio/merge.ts b/src/core/piece/arpeggio/merge.ts index 7ea25d3..603f22d 100644 --- a/src/core/piece/arpeggio/merge.ts +++ b/src/core/piece/arpeggio/merge.ts @@ -3,73 +3,71 @@ * * Supports two merge strategies: * - 'concat': Simple concatenation with configurable separator - * - 'custom': User-provided merge function (inline JS or external file) + * - 'custom': User-provided merge function (inline_js or file) */ import { writeFileSync } from 'node:fs'; +import { createRequire } from 'node:module'; import type { ArpeggioMergeMovementConfig, MergeFn } from './types.js'; +import type { BatchResult } from './types.js'; + +const require = createRequire(import.meta.url); + +function sortByBatchIndex(results: readonly BatchResult[]): readonly BatchResult[] { + return results.slice().sort((a, b) => a.batchIndex - b.batchIndex); +} + +/** Create a merge function from inline JS source */ +function createCustomMergeFromInlineJs(inlineJs: string): MergeFn { + const mergeImpl = new Function('results', inlineJs) as (results: readonly BatchResult[]) => unknown; + + return (results) => { + const orderedResults = sortByBatchIndex(results); + const value = mergeImpl(orderedResults); + return typeof value === 'string' ? value : JSON.stringify(value); + }; +} + +/** Create a merge function from external JS module */ +function createCustomMergeFromFile(path: string): MergeFn { + const moduleExports = require(path); + const mergeImpl = moduleExports?.default ?? moduleExports; + + if (typeof mergeImpl !== 'function') { + throw new Error(`Custom merge module must export a function: ${path}`); + } + + return (results) => { + const orderedResults = sortByBatchIndex(results); + const value = mergeImpl(orderedResults); + return typeof value === 'string' ? value : JSON.stringify(value); + }; +} /** Create a concat merge function with the given separator */ function createConcatMerge(separator: string): MergeFn { return (results) => - results + sortByBatchIndex(results) .filter((r) => r.success) - .sort((a, b) => a.batchIndex - b.batchIndex) .map((r) => r.content) .join(separator); } -/** - * Create a merge function from inline JavaScript. - * - * The inline JS receives `results` as the function parameter (readonly BatchResult[]). - * It must return a string. - */ -function createInlineJsMerge(jsBody: string): MergeFn { - const fn = new Function('results', jsBody) as MergeFn; - return (results) => { - const output = fn(results); - if (typeof output !== 'string') { - throw new Error(`Inline JS merge function must return a string, got ${typeof output}`); - } - return output; - }; -} - -/** - * Create a merge function from an external JS file. - * - * The file must export a default function: (results: BatchResult[]) => string - */ -async function createFileMerge(filePath: string): Promise { - const module = await import(filePath) as { default?: MergeFn }; - if (typeof module.default !== 'function') { - throw new Error(`Merge file "${filePath}" must export a default function`); - } - return module.default; -} - /** * Build a merge function from the arpeggio merge configuration. - * - * For 'concat' strategy: returns a simple join function. - * For 'custom' strategy: loads from inline JS or external file. */ -export async function buildMergeFn(config: ArpeggioMergeMovementConfig): Promise { - if (config.strategy === 'concat') { - return createConcatMerge(config.separator ?? '\n'); +export function buildMergeFn(config: ArpeggioMergeMovementConfig): MergeFn { + if (config.strategy === 'custom') { + if (config.inlineJs) { + return createCustomMergeFromInlineJs(config.inlineJs); + } + if (config.file) { + return createCustomMergeFromFile(config.file); + } + throw new Error('Custom merge strategy requires inline_js or file'); } - // Custom strategy - if (config.inlineJs) { - return createInlineJsMerge(config.inlineJs); - } - - if (config.filePath) { - return createFileMerge(config.filePath); - } - - throw new Error('Custom merge strategy requires either inline_js or file path'); + return createConcatMerge(config.separator ?? '\n'); } /** Write merged output to a file if output_path is configured */ diff --git a/src/core/piece/engine/ArpeggioRunner.ts b/src/core/piece/engine/ArpeggioRunner.ts index 24adc45..e5a8bdb 100644 --- a/src/core/piece/engine/ArpeggioRunner.ts +++ b/src/core/piece/engine/ArpeggioRunner.ts @@ -16,7 +16,7 @@ import { createDataSource } from '../arpeggio/data-source-factory.js'; import { loadTemplate, expandTemplate } from '../arpeggio/template.js'; import { buildMergeFn, writeMergedOutput } from '../arpeggio/merge.js'; import type { RunAgentOptions } from '../../../agents/runner.js'; -import { executeAgent } from '../agent-usecases.js'; +import { executeAgent } from '../../../agents/agent-usecases.js'; import { detectMatchedRule } from '../evaluation/index.js'; import { incrementMovementIteration } from './state-manager.js'; import { createLogger } from '../../../shared/utils/index.js'; @@ -200,7 +200,7 @@ export class ArpeggioRunner { ); } - const mergeFn = await buildMergeFn(arpeggioConfig.merge); + const mergeFn = buildMergeFn(arpeggioConfig.merge); const mergedContent = mergeFn(results); if (arpeggioConfig.outputPath) { diff --git a/src/core/piece/engine/MovementExecutor.ts b/src/core/piece/engine/MovementExecutor.ts index 0f36b7a..eaf937f 100644 --- a/src/core/piece/engine/MovementExecutor.ts +++ b/src/core/piece/engine/MovementExecutor.ts @@ -15,7 +15,7 @@ import type { Language, } from '../../models/types.js'; import type { PhaseName } from '../types.js'; -import { executeAgent } from '../agent-usecases.js'; +import { executeAgent } from '../../../agents/agent-usecases.js'; import { InstructionBuilder } from '../instruction/InstructionBuilder.js'; import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js'; import { detectMatchedRule } from '../evaluation/index.js'; diff --git a/src/core/piece/engine/ParallelRunner.ts b/src/core/piece/engine/ParallelRunner.ts index 1a5d865..7159d86 100644 --- a/src/core/piece/engine/ParallelRunner.ts +++ b/src/core/piece/engine/ParallelRunner.ts @@ -10,7 +10,7 @@ import type { PieceState, AgentResponse, } from '../../models/types.js'; -import { executeAgent } from '../agent-usecases.js'; +import { executeAgent } from '../../../agents/agent-usecases.js'; import { ParallelLogger } from './parallel-logger.js'; import { needsStatusJudgmentPhase, runReportPhase, runStatusJudgmentPhase } from '../phase-runner.js'; import { detectMatchedRule } from '../evaluation/index.js'; diff --git a/src/core/piece/engine/TeamLeaderRunner.ts b/src/core/piece/engine/TeamLeaderRunner.ts index b6b4432..9ea7ceb 100644 --- a/src/core/piece/engine/TeamLeaderRunner.ts +++ b/src/core/piece/engine/TeamLeaderRunner.ts @@ -5,7 +5,7 @@ import type { PartDefinition, PartResult, } from '../../models/types.js'; -import { decomposeTask, executeAgent, requestMoreParts } from '../agent-usecases.js'; +import { decomposeTask, executeAgent, requestMoreParts } from '../../../agents/agent-usecases.js'; import { buildSessionKey } from '../session-key.js'; import { ParallelLogger } from './parallel-logger.js'; import { incrementMovementIteration } from './state-manager.js'; diff --git a/src/core/piece/engine/team-leader-execution.ts b/src/core/piece/engine/team-leader-execution.ts index 977c7f9..3b68b48 100644 --- a/src/core/piece/engine/team-leader-execution.ts +++ b/src/core/piece/engine/team-leader-execution.ts @@ -1,4 +1,4 @@ -import type { MorePartsResponse } from '../agent-usecases.js'; +import type { MorePartsResponse } from '../../../agents/agent-usecases.js'; import type { PartDefinition, PartResult } from '../../models/types.js'; const DEFAULT_MAX_TOTAL_PARTS = 20; diff --git a/src/core/piece/index.ts b/src/core/piece/index.ts index f52d653..8bf6d3c 100644 --- a/src/core/piece/index.ts +++ b/src/core/piece/index.ts @@ -70,13 +70,3 @@ export { AggregateEvaluator } from './evaluation/AggregateEvaluator.js'; // Phase runner export { needsStatusJudgmentPhase, type ReportPhaseBlockedResult } from './phase-runner.js'; -// Agent usecases -export { - executeAgent, - generateReport, - executePart, - judgeStatus, - evaluateCondition, - decomposeTask, - type JudgeStatusResult, -} from './agent-usecases.js'; diff --git a/src/core/piece/instruction/InstructionBuilder.ts b/src/core/piece/instruction/InstructionBuilder.ts index 4a9bd06..2d9d5e6 100644 --- a/src/core/piece/instruction/InstructionBuilder.ts +++ b/src/core/piece/instruction/InstructionBuilder.ts @@ -142,7 +142,7 @@ export class InstructionBuilder { // Policy injection (top + bottom reminder per "Lost in the Middle" research) const policyContents = this.context.policyContents ?? this.step.policyContents; const hasPolicy = !!(policyContents && policyContents.length > 0); - const policyJoined = hasPolicy ? policyContents!.join('\n\n---\n\n') : ''; + const policyJoined = hasPolicy && policyContents ? policyContents.join('\n\n---\n\n') : ''; const policyContent = hasPolicy ? preparePolicyContent(policyJoined, this.context.policySourcePath) : ''; @@ -150,15 +150,15 @@ export class InstructionBuilder { // Knowledge injection (domain-specific knowledge, no reminder needed) const knowledgeContents = this.context.knowledgeContents ?? this.step.knowledgeContents; const hasKnowledge = !!(knowledgeContents && knowledgeContents.length > 0); - const knowledgeJoined = hasKnowledge ? knowledgeContents!.join('\n\n---\n\n') : ''; + const knowledgeJoined = hasKnowledge && knowledgeContents ? knowledgeContents.join('\n\n---\n\n') : ''; const knowledgeContent = hasKnowledge ? prepareKnowledgeContent(knowledgeJoined, this.context.knowledgeSourcePath) : ''; // Quality gates injection (AI directives for movement completion) const hasQualityGates = !!(this.step.qualityGates && this.step.qualityGates.length > 0); - const qualityGatesContent = hasQualityGates - ? this.step.qualityGates!.map(gate => `- ${gate}`).join('\n') + const qualityGatesContent = hasQualityGates && this.step.qualityGates + ? this.step.qualityGates.map(gate => `- ${gate}`).join('\n') : ''; return loadTemplate('perform_phase1_message', language, { diff --git a/src/core/piece/phase-runner.ts b/src/core/piece/phase-runner.ts index 4a210cd..03112a4 100644 --- a/src/core/piece/phase-runner.ts +++ b/src/core/piece/phase-runner.ts @@ -12,7 +12,7 @@ import type { PhaseName } from './types.js'; import type { RunAgentOptions } from '../../agents/runner.js'; import { ReportInstructionBuilder } from './instruction/ReportInstructionBuilder.js'; import { hasTagBasedRules, getReportFiles } from './evaluation/rule-utils.js'; -import { executeAgent } from './agent-usecases.js'; +import { executeAgent } from '../../agents/agent-usecases.js'; import { createLogger } from '../../shared/utils/index.js'; import { buildSessionKey } from './session-key.js'; export { runStatusJudgmentPhase, type StatusJudgmentPhaseResult } from './status-judgment-phase.js'; diff --git a/src/core/piece/status-judgment-phase.ts b/src/core/piece/status-judgment-phase.ts index 7b1ed8b..22515cc 100644 --- a/src/core/piece/status-judgment-phase.ts +++ b/src/core/piece/status-judgment-phase.ts @@ -1,7 +1,7 @@ import { existsSync, readFileSync } from 'node:fs'; import { resolve } from 'node:path'; import type { PieceMovement, RuleMatchMethod } from '../models/types.js'; -import { judgeStatus } from './agent-usecases.js'; +import { judgeStatus } from '../../agents/agent-usecases.js'; import { StatusJudgmentBuilder, type StatusJudgmentContext } from './instruction/StatusJudgmentBuilder.js'; import { getJudgmentReportFiles } from './evaluation/rule-utils.js'; import { createLogger } from '../../shared/utils/index.js'; diff --git a/src/core/piece/types.ts b/src/core/piece/types.ts index b479b18..5451298 100644 --- a/src/core/piece/types.ts +++ b/src/core/piece/types.ts @@ -11,62 +11,30 @@ import type { PersonaProviderEntry } from '../models/persisted-global-config.js' import type { ProviderPermissionProfiles } from '../models/provider-profiles.js'; import type { MovementProviderOptions } from '../models/piece-types.js'; -export type ProviderType = 'claude' | 'codex' | 'opencode' | 'cursor' | 'copilot' | 'mock'; +// Re-export shared provider protocol types to maintain backward compatibility. +// The canonical definitions live in shared/types/provider.ts so that shared-layer +// modules (StreamDisplay, providerEventLogger) can import them without creating +// an upward shared → core dependency. +import type { + ProviderType, + StreamCallback, +} from '../../shared/types/provider.js'; +export type { + ProviderType, + StreamEvent, + StreamCallback, + StreamInitEventData, + StreamToolUseEventData, + StreamToolResultEventData, + StreamToolOutputEventData, + StreamTextEventData, + StreamThinkingEventData, + StreamResultEventData, + StreamErrorEventData, +} from '../../shared/types/provider.js'; + export type ProviderOptionsSource = 'env' | 'project' | 'global' | 'default'; -export interface StreamInitEventData { - model: string; - sessionId: string; -} - -export interface StreamToolUseEventData { - tool: string; - input: Record; - id: string; -} - -export interface StreamToolResultEventData { - content: string; - isError: boolean; -} - -export interface StreamToolOutputEventData { - tool: string; - output: string; -} - -export interface StreamTextEventData { - text: string; -} - -export interface StreamThinkingEventData { - thinking: string; -} - -export interface StreamResultEventData { - result: string; - sessionId: string; - success: boolean; - error?: string; -} - -export interface StreamErrorEventData { - message: string; - raw?: string; -} - -export type StreamEvent = - | { type: 'init'; data: StreamInitEventData } - | { type: 'tool_use'; data: StreamToolUseEventData } - | { type: 'tool_result'; data: StreamToolResultEventData } - | { type: 'tool_output'; data: StreamToolOutputEventData } - | { type: 'text'; data: StreamTextEventData } - | { type: 'thinking'; data: StreamThinkingEventData } - | { type: 'result'; data: StreamResultEventData } - | { type: 'error'; data: StreamErrorEventData }; - -export type StreamCallback = (event: StreamEvent) => void; - export interface PermissionRequest { toolName: string; input: Record; diff --git a/src/features/interactive/interactive-summary-types.ts b/src/features/interactive/interactive-summary-types.ts new file mode 100644 index 0000000..def4d5b --- /dev/null +++ b/src/features/interactive/interactive-summary-types.ts @@ -0,0 +1,80 @@ +/** + * Type definitions for interactive summary. + */ + +import type { MovementPreview } from '../../infra/config/index.js'; + +export type TaskHistoryLocale = 'en' | 'ja'; + +export interface ConversationMessage { + role: 'user' | 'assistant'; + content: string; +} + +export interface TaskHistorySummaryItem { + worktreeId: string; + status: 'completed' | 'failed' | 'interrupted'; + startedAt: string; + completedAt: string; + finalResult: string; + failureSummary: string | undefined; + logKey: string; +} + +export interface PieceContext { + /** Piece name (e.g. "minimal") */ + name: string; + /** Piece description */ + description: string; + /** Piece structure (numbered list of movements) */ + pieceStructure: string; + /** Movement previews (persona + instruction content for first N movements) */ + movementPreviews?: MovementPreview[]; + /** Recent task history for conversation context */ + taskHistory?: TaskHistorySummaryItem[]; +} + +export type InteractiveModeAction = 'execute' | 'save_task' | 'create_issue' | 'cancel'; + +export type PostSummaryAction = InteractiveModeAction | 'continue'; + +export type SummaryActionValue = 'execute' | 'create_issue' | 'save_task' | 'continue'; + +export interface SummaryActionOption { + label: string; + value: SummaryActionValue; +} + +export type SummaryActionLabels = { + execute: string; + createIssue?: string; + saveTask: string; + continue: string; +}; + +export const BASE_SUMMARY_ACTIONS: readonly SummaryActionValue[] = [ + 'execute', + 'save_task', + 'continue', +]; + +export interface InteractiveSummaryUIText { + actionPrompt: string; + actions: { + execute: string; + createIssue: string; + saveTask: string; + continue: string; + }; +} + +/** UI labels required by createSelectActionWithoutExecute */ +export interface ActionWithoutExecuteUIText { + proposed: string; + actionPrompt: string; + actions: { + execute: string; + saveTask: string; + continue: string; + }; +} diff --git a/src/features/interactive/interactive-summary.ts b/src/features/interactive/interactive-summary.ts index 68bf5e1..cfa79cb 100644 --- a/src/features/interactive/interactive-summary.ts +++ b/src/features/interactive/interactive-summary.ts @@ -6,23 +6,34 @@ import { loadTemplate } from '../../shared/prompts/index.js'; import { type MovementPreview } from '../../infra/config/index.js'; import { selectOption } from '../../shared/prompt/index.js'; import { blankLine, info } from '../../shared/ui/index.js'; +import { + type TaskHistoryLocale, + type ConversationMessage, + type TaskHistorySummaryItem, + type PieceContext, + type InteractiveModeAction, + type PostSummaryAction, + type SummaryActionValue, + type SummaryActionOption, + type SummaryActionLabels, + BASE_SUMMARY_ACTIONS, + type InteractiveSummaryUIText, + type ActionWithoutExecuteUIText, +} from './interactive-summary-types.js'; -type TaskHistoryLocale = 'en' | 'ja'; - -export interface ConversationMessage { - role: 'user' | 'assistant'; - content: string; -} - -export interface TaskHistorySummaryItem { - worktreeId: string; - status: 'completed' | 'failed' | 'interrupted'; - startedAt: string; - completedAt: string; - finalResult: string; - failureSummary: string | undefined; - logKey: string; -} +export type { + ConversationMessage, + TaskHistorySummaryItem, + PieceContext, + PostSummaryAction, + SummaryActionValue, + SummaryActionOption, + SummaryActionLabels, + InteractiveModeAction, + InteractiveSummaryUIText, + ActionWithoutExecuteUIText, +} from './interactive-summary-types.js'; +export { BASE_SUMMARY_ACTIONS } from './interactive-summary-types.js'; export function formatMovementPreviews(previews: MovementPreview[], lang: TaskHistoryLocale): string { return previews.map((p, i) => { @@ -110,19 +121,6 @@ function buildTaskFromHistory(history: ConversationMessage[]): string { .join('\n\n'); } -export interface PieceContext { - /** Piece name (e.g. "minimal") */ - name: string; - /** Piece description */ - description: string; - /** Piece structure (numbered list of movements) */ - pieceStructure: string; - /** Movement previews (persona + instruction content for first N movements) */ - movementPreviews?: MovementPreview[]; - /** Recent task history for conversation context */ - taskHistory?: TaskHistorySummaryItem[]; -} - export function buildSummaryPrompt( history: ConversationMessage[], hasSession: boolean, @@ -143,9 +141,10 @@ export function buildSummaryPrompt( const hasPiece = !!pieceContext; const hasPreview = !!pieceContext?.movementPreviews?.length; - const summaryMovementDetails = hasPreview - ? `\n### ${lang === 'ja' ? '処理するエージェント' : 'Processing Agents'}\n${formatMovementPreviews(pieceContext!.movementPreviews!, lang)}` - : ''; + const summaryMovementDetails = + hasPreview && pieceContext?.movementPreviews + ? `\n### ${lang === 'ja' ? '処理するエージェント' : 'Processing Agents'}\n${formatMovementPreviews(pieceContext.movementPreviews, lang)}` + : ''; const summaryTaskHistory = pieceContext?.taskHistory?.length ? formatTaskHistorySummary(pieceContext.taskHistory, lang) : ''; @@ -160,40 +159,6 @@ export function buildSummaryPrompt( }); } -export type PostSummaryAction = InteractiveModeAction | 'continue'; - -export type SummaryActionValue = 'execute' | 'create_issue' | 'save_task' | 'continue'; - -export interface SummaryActionOption { - label: string; - value: SummaryActionValue; -} - -export type SummaryActionLabels = { - execute: string; - createIssue?: string; - saveTask: string; - continue: string; -}; - -export const BASE_SUMMARY_ACTIONS: readonly SummaryActionValue[] = [ - 'execute', - 'save_task', - 'continue', -]; - -export type InteractiveModeAction = 'execute' | 'save_task' | 'create_issue' | 'cancel'; - -export interface InteractiveSummaryUIText { - actionPrompt: string; - actions: { - execute: string; - createIssue: string; - saveTask: string; - continue: string; - }; -} - export function buildSummaryActionOptions( labels: SummaryActionLabels, append: readonly SummaryActionValue[] = [], @@ -238,7 +203,7 @@ export function selectSummaryAction( ): Promise { blankLine(); info(proposedLabel); - console.log(task); + info(task); return selectOption(actionPrompt, options); } @@ -276,17 +241,6 @@ export function buildReplayHint(lang: 'en' | 'ja', hasPreviousOrder: boolean): s : ', /replay (resubmit previous order)'; } -/** UI labels required by createSelectActionWithoutExecute */ -export interface ActionWithoutExecuteUIText { - proposed: string; - actionPrompt: string; - actions: { - execute: string; - saveTask: string; - continue: string; - }; -} - /** * Create an action selector that excludes the 'execute' option. * diff --git a/src/features/interactive/retryMode.ts b/src/features/interactive/retryMode.ts index b233376..7ae52b2 100644 --- a/src/features/interactive/retryMode.ts +++ b/src/features/interactive/retryMode.ts @@ -61,11 +61,13 @@ const RETRY_TOOLS = ['Read', 'Glob', 'Grep', 'Bash', 'WebSearch', 'WebFetch']; */ export function buildRetryTemplateVars(ctx: RetryContext, lang: 'en' | 'ja', previousOrderContent: string | null = null): Record { const hasPiecePreview = !!ctx.pieceContext.movementPreviews?.length; - const movementDetails = hasPiecePreview - ? formatMovementPreviews(ctx.pieceContext.movementPreviews!, lang) - : ''; + const movementDetails = + hasPiecePreview && ctx.pieceContext.movementPreviews + ? formatMovementPreviews(ctx.pieceContext.movementPreviews, lang) + : ''; - const hasRun = ctx.run !== null; + const run = ctx.run; + const hasRun = run !== null; return { taskName: ctx.failure.taskName, taskContent: ctx.failure.taskContent, @@ -79,13 +81,13 @@ export function buildRetryTemplateVars(ctx: RetryContext, lang: 'en' | 'ja', pre pieceStructure: ctx.pieceContext.pieceStructure, movementDetails, hasRun, - runLogsDir: hasRun ? ctx.run!.logsDir : '', - runReportsDir: hasRun ? ctx.run!.reportsDir : '', - runTask: hasRun ? ctx.run!.task : '', - runPiece: hasRun ? ctx.run!.piece : '', - runStatus: hasRun ? ctx.run!.status : '', - runMovementLogs: hasRun ? ctx.run!.movementLogs : '', - runReports: hasRun ? ctx.run!.reports : '', + runLogsDir: run !== null ? run.logsDir : '', + runReportsDir: run !== null ? run.reportsDir : '', + runTask: run !== null ? run.task : '', + runPiece: run !== null ? run.piece : '', + runStatus: run !== null ? run.status : '', + runMovementLogs: run !== null ? run.movementLogs : '', + runReports: run !== null ? run.reports : '', hasOrderContent: previousOrderContent !== null, orderContent: previousOrderContent ?? '', }; diff --git a/src/features/repertoire/constants.ts b/src/features/repertoire/constants.ts index e472025..a6bb908 100644 --- a/src/features/repertoire/constants.ts +++ b/src/features/repertoire/constants.ts @@ -2,8 +2,9 @@ * Shared constants for repertoire package manifest handling. */ -/** Directory name for the repertoire packages dir (~/.takt/repertoire). */ -export const REPERTOIRE_DIR_NAME = 'repertoire'; +// REPERTOIRE_DIR_NAME is defined in infra/config/constants to avoid an +// upward infra → features dependency from paths.ts. +export { REPERTOIRE_DIR_NAME } from '../../infra/config/constants.js'; /** Manifest filename inside a package repository and installed package directory. */ export const TAKT_REPERTOIRE_MANIFEST_FILENAME = 'takt-repertoire.yaml'; diff --git a/src/features/tasks/execute/abortHandler.ts b/src/features/tasks/execute/abortHandler.ts new file mode 100644 index 0000000..deb748c --- /dev/null +++ b/src/features/tasks/execute/abortHandler.ts @@ -0,0 +1,91 @@ +/** + * AbortHandler — abortSignal 監視と割り込み処理専用モジュール + * + * 外部 abortSignal(並列実行モード)または ShutdownManager(シングル実行モード)の + * どちらかを使い、エンジンの中断処理を担う。 + * EPIPE エラーの抑制も担当する。 + */ + +import { interruptAllQueries } from '../../../infra/claude/query-manager.js'; +import { ShutdownManager } from './shutdownManager.js'; +import { EXIT_SIGINT } from '../../../shared/exitCodes.js'; +import type { PieceEngine } from '../../../core/piece/engine/PieceEngine.js'; + +export interface AbortHandlerOptions { + /** 外部から渡された AbortSignal(並列実行モード) */ + externalSignal?: AbortSignal; + /** 外部シグナルがない場合に使う内部 AbortController */ + internalController: AbortController; + /** 中断時に呼び出す PieceEngine インスタンス(遅延参照) */ + getEngine: () => PieceEngine | null; +} + +export class AbortHandler { + private readonly options: AbortHandlerOptions; + private shutdownManager: ShutdownManager | undefined; + private onAbortSignal: (() => void) | undefined; + private onEpipe: ((err: NodeJS.ErrnoException) => void) | undefined; + + constructor(options: AbortHandlerOptions) { + this.options = options; + } + + /** + * 中断ハンドラをインストールする。 + * エンジン生成後に呼ぶこと。 + */ + install(): void { + const { externalSignal, internalController, getEngine } = this.options; + + this.onEpipe = (err: NodeJS.ErrnoException) => { + if (err.code === 'EPIPE') return; + throw err; + }; + + const abortEngine = () => { + const engine = getEngine(); + if (!engine || !this.onEpipe) { + throw new Error('Abort handler invoked before PieceEngine initialization'); + } + if (!internalController.signal.aborted) { + internalController.abort(); + } + process.on('uncaughtException', this.onEpipe); + interruptAllQueries(); + engine.abort(); + }; + + if (externalSignal) { + // 並列実行モード: 外部シグナルへ委譲 + this.onAbortSignal = abortEngine; + if (externalSignal.aborted) { + abortEngine(); + } else { + externalSignal.addEventListener('abort', this.onAbortSignal, { once: true }); + } + } else { + // シングル実行モード: SIGINT を自前でハンドリング + this.shutdownManager = new ShutdownManager({ + callbacks: { + onGraceful: abortEngine, + onForceKill: () => process.exit(EXIT_SIGINT), + }, + }); + this.shutdownManager.install(); + } + } + + /** + * ハンドラをクリーンアップする。 + * finally ブロックで必ず呼ぶこと。 + */ + cleanup(): void { + this.shutdownManager?.cleanup(); + if (this.onAbortSignal && this.options.externalSignal) { + this.options.externalSignal.removeEventListener('abort', this.onAbortSignal); + } + if (this.onEpipe) { + process.removeListener('uncaughtException', this.onEpipe); + } + } +} diff --git a/src/features/tasks/execute/analyticsEmitter.ts b/src/features/tasks/execute/analyticsEmitter.ts new file mode 100644 index 0000000..51c862f --- /dev/null +++ b/src/features/tasks/execute/analyticsEmitter.ts @@ -0,0 +1,92 @@ +/** + * AnalyticsEmitter — analytics イベント発行専用モジュール + * + * PieceEngine のイベントを受け取り、analytics イベントへ変換して書き出す責務を担う。 + * NDJSON ログや UI 出力は担当しない。 + */ + +import { readFileSync } from 'node:fs'; +import { + writeAnalyticsEvent, + parseFindingsFromReport, + extractDecisionFromReport, + inferSeverity, + emitFixActionEvents, + emitRebuttalEvents, +} from '../../analytics/index.js'; +import type { MovementResultEvent, ReviewFindingEvent } from '../../analytics/index.js'; +import type { PieceMovement, AgentResponse } from '../../../core/models/index.js'; + +export class AnalyticsEmitter { + private readonly runSlug: string; + private currentIteration = 0; + private currentProvider: string; + private currentModel: string; + + constructor(runSlug: string, initialProvider: string, initialModel: string) { + this.runSlug = runSlug; + this.currentProvider = initialProvider; + this.currentModel = initialModel; + } + + /** movement:start 時にプロバイダ/モデル情報を更新する */ + updateProviderInfo(iteration: number, provider: string, model: string): void { + this.currentIteration = iteration; + this.currentProvider = provider; + this.currentModel = model; + } + + /** movement:complete 時に MovementResultEvent と FixAction/Rebuttal を発行する */ + onMovementComplete(step: PieceMovement, response: AgentResponse): void { + const decisionTag = (response.matchedRuleIndex != null && step.rules) + ? (step.rules[response.matchedRuleIndex]?.condition ?? response.status) + : response.status; + + const movementResultEvent: MovementResultEvent = { + type: 'movement_result', + movement: step.name, + provider: this.currentProvider, + model: this.currentModel, + decisionTag, + iteration: this.currentIteration, + runId: this.runSlug, + timestamp: response.timestamp.toISOString(), + }; + writeAnalyticsEvent(movementResultEvent); + + if (step.edit === true && step.name.includes('fix')) { + emitFixActionEvents(response.content, this.currentIteration, this.runSlug, response.timestamp); + } + + if (step.name.includes('no_fix')) { + emitRebuttalEvents(response.content, this.currentIteration, this.runSlug, response.timestamp); + } + } + + /** movement:report 時に ReviewFindingEvent を発行する */ + onMovementReport(step: PieceMovement, filePath: string): void { + if (step.edit !== false) return; + + const content = readFileSync(filePath, 'utf-8'); + const decision = extractDecisionFromReport(content); + if (!decision) return; + + const findings = parseFindingsFromReport(content); + for (const finding of findings) { + const event: ReviewFindingEvent = { + type: 'review_finding', + findingId: finding.findingId, + status: finding.status, + ruleId: finding.ruleId, + severity: inferSeverity(finding.findingId), + decision, + file: finding.file, + line: finding.line, + iteration: this.currentIteration, + runId: this.runSlug, + timestamp: new Date().toISOString(), + }; + writeAnalyticsEvent(event); + } + } +} diff --git a/src/features/tasks/execute/iterationLimitHandler.ts b/src/features/tasks/execute/iterationLimitHandler.ts new file mode 100644 index 0000000..2a6d747 --- /dev/null +++ b/src/features/tasks/execute/iterationLimitHandler.ts @@ -0,0 +1,64 @@ +/** + * iterationLimitHandler — イテレーション上限到達時、およびユーザー入力のインタラクション処理 + * + * ユーザーに続行/停止を確認し、追加イテレーション数を返す。 + * ユーザー入力待ちハンドラも提供する。 + */ + +import type { IterationLimitRequest, UserInputRequest } from '../../../core/piece/index.js'; +import { playWarningSound } from '../../../shared/utils/index.js'; +import { selectOption, promptInput } from '../../../shared/prompt/index.js'; +import { getLabel } from '../../../shared/i18n/index.js'; +import { enterInputWait, leaveInputWait } from './inputWait.js'; +import type { OutputFns } from './outputFns.js'; +import type { StreamDisplay } from '../../../shared/ui/index.js'; + +export function createIterationLimitHandler( + out: OutputFns, + displayRef: { current: StreamDisplay | null }, + shouldNotify: boolean, +): (request: IterationLimitRequest) => Promise { + return async (request: IterationLimitRequest): Promise => { + if (displayRef.current) { displayRef.current.flush(); displayRef.current = null; } + out.blankLine(); + out.warn(getLabel('piece.iterationLimit.maxReached', undefined, { + currentIteration: String(request.currentIteration), + maxMovements: String(request.maxMovements), + })); + out.info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement })); + if (shouldNotify) playWarningSound(); + enterInputWait(); + try { + const action = await selectOption(getLabel('piece.iterationLimit.continueQuestion'), [ + { label: getLabel('piece.iterationLimit.continueLabel'), value: 'continue', description: getLabel('piece.iterationLimit.continueDescription') }, + { label: getLabel('piece.iterationLimit.stopLabel'), value: 'stop' }, + ]); + if (action !== 'continue') return null; + while (true) { + const input = await promptInput(getLabel('piece.iterationLimit.inputPrompt')); + if (!input) return null; + const n = Number.parseInt(input, 10); + if (Number.isInteger(n) && n > 0) return n; + out.warn(getLabel('piece.iterationLimit.invalidInput')); + } + } finally { + leaveInputWait(); + } + }; +} + +/** + * ユーザー入力ハンドラを作成する(interactiveUserInput が有効な場合のみ使用)。 + */ +export function createUserInputHandler( + out: OutputFns, + displayRef: { current: StreamDisplay | null }, +): (request: UserInputRequest) => Promise { + return async (request: UserInputRequest): Promise => { + if (displayRef.current) { displayRef.current.flush(); displayRef.current = null; } + out.blankLine(); + out.info(request.prompt.trim()); + const input = await promptInput(getLabel('piece.iterationLimit.userInputPrompt')); + return input && input.trim() ? input.trim() : null; + }; +} diff --git a/src/features/tasks/execute/outputFns.ts b/src/features/tasks/execute/outputFns.ts new file mode 100644 index 0000000..39208a4 --- /dev/null +++ b/src/features/tasks/execute/outputFns.ts @@ -0,0 +1,76 @@ +/** + * OutputFns — UI 出力ファサード + * + * TaskPrefixWriter が有効なとき(並列実行モード)はそちら経由で出力し、 + * 無効なとき(シングル実行モード)は shared/ui のモジュール関数に委譲する。 + */ + +import { + header as rawHeader, + info as rawInfo, + warn as rawWarn, + error as rawError, + success as rawSuccess, + status as rawStatus, + blankLine as rawBlankLine, + StreamDisplay, +} from '../../../shared/ui/index.js'; +import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; + +export interface OutputFns { + header: (title: string) => void; + info: (message: string) => void; + warn: (message: string) => void; + error: (message: string) => void; + success: (message: string) => void; + status: (label: string, value: string, color?: 'green' | 'yellow' | 'red') => void; + blankLine: () => void; + logLine: (text: string) => void; +} + +export function createOutputFns(prefixWriter: TaskPrefixWriter | undefined): OutputFns { + if (!prefixWriter) { + return { + header: rawHeader, + info: rawInfo, + warn: rawWarn, + error: rawError, + success: rawSuccess, + status: rawStatus, + blankLine: rawBlankLine, + logLine: (text: string) => rawInfo(text), + }; + } + return { + header: (title: string) => prefixWriter.writeLine(`=== ${title} ===`), + info: (message: string) => prefixWriter.writeLine(`[INFO] ${message}`), + warn: (message: string) => prefixWriter.writeLine(`[WARN] ${message}`), + error: (message: string) => prefixWriter.writeLine(`[ERROR] ${message}`), + success: (message: string) => prefixWriter.writeLine(message), + status: (label: string, value: string) => prefixWriter.writeLine(`${label}: ${value}`), + blankLine: () => prefixWriter.writeLine(''), + logLine: (text: string) => prefixWriter.writeLine(text), + }; +} + +/** + * TaskPrefixWriter 経由でストリームイベントを行バッファリング出力するハンドラを作成する。 + */ +export function createPrefixedStreamHandler( + writer: TaskPrefixWriter, +): (event: Parameters>[0]) => void { + return (event) => { + switch (event.type) { + case 'text': writer.writeChunk(event.data.text); break; + case 'tool_use': writer.writeLine(`[tool] ${event.data.tool}`); break; + case 'tool_result': { + const label = event.data.isError ? '✗' : '✓'; + writer.writeLine(` ${label} ${event.data.content}`); + break; + } + case 'tool_output': writer.writeChunk(event.data.output); break; + case 'thinking': writer.writeChunk(event.data.thinking); break; + default: break; + } + }; +} diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index b386169..3e0fa6e 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -4,224 +4,52 @@ import { readFileSync } from 'node:fs'; import { join } from 'node:path'; -import { PieceEngine, createDenyAskUserQuestionHandler, type IterationLimitRequest, type UserInputRequest } from '../../../core/piece/index.js'; +import { PieceEngine, createDenyAskUserQuestionHandler } from '../../../core/piece/index.js'; import type { PieceConfig } from '../../../core/models/index.js'; import type { PieceExecutionResult, PieceExecutionOptions } from './types.js'; import { detectRuleIndex } from '../../../shared/utils/ruleIndex.js'; import { interruptAllQueries } from '../../../infra/claude/query-manager.js'; import { callAiJudge } from '../../../agents/ai-judge.js'; -import { enterInputWait, leaveInputWait } from './inputWait.js'; +import { loadPersonaSessions, updatePersonaSession, loadWorktreeSessions, updateWorktreeSession, resolvePieceConfigValues, saveSessionState, type SessionState } from '../../../infra/config/index.js'; +import { isQuietMode } from '../../../shared/context.js'; +import { StreamDisplay } from '../../../shared/ui/index.js'; +import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; +import { generateSessionId, createSessionLog, finalizeSessionLog, initNdjsonLog } from '../../../infra/fs/index.js'; +import { createLogger, notifySuccess, notifyError, preventSleep, generateReportDir, isValidReportDirName } from '../../../shared/utils/index.js'; +import { createProviderEventLogger, isProviderEventsEnabled } from '../../../shared/utils/providerEventLogger.js'; +import { getLabel } from '../../../shared/i18n/index.js'; +import { buildRunPaths } from '../../../core/piece/run/run-paths.js'; +import { resolveRuntimeConfig } from '../../../core/runtime/runtime-environment.js'; +import { getGlobalConfigDir } from '../../../infra/config/paths.js'; +import { initAnalyticsWriter } from '../../analytics/index.js'; +import { SessionLogger } from './sessionLogger.js'; +import { AbortHandler } from './abortHandler.js'; +import { AnalyticsEmitter } from './analyticsEmitter.js'; +import { createOutputFns, createPrefixedStreamHandler } from './outputFns.js'; +import { RunMetaManager } from './runMeta.js'; +import { createIterationLimitHandler, createUserInputHandler } from './iterationLimitHandler.js'; export type { PieceExecutionResult, PieceExecutionOptions }; -import { - loadPersonaSessions, - updatePersonaSession, - loadWorktreeSessions, - updateWorktreeSession, - resolvePieceConfigValues, - saveSessionState, - type SessionState, -} from '../../../infra/config/index.js'; -import { isQuietMode } from '../../../shared/context.js'; -import { - header as rawHeader, - info as rawInfo, - warn as rawWarn, - error as rawError, - success as rawSuccess, - status as rawStatus, - blankLine as rawBlankLine, - StreamDisplay, -} from '../../../shared/ui/index.js'; -import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; -import { - generateSessionId, - createSessionLog, - finalizeSessionLog, - initNdjsonLog, - appendNdjsonLine, - type NdjsonStepStart, - type NdjsonStepComplete, - type NdjsonPieceComplete, - type NdjsonPieceAbort, - type NdjsonPhaseStart, - type NdjsonPhaseComplete, - type NdjsonInteractiveStart, - type NdjsonInteractiveEnd, -} from '../../../infra/fs/index.js'; -import { - createLogger, - notifySuccess, - notifyError, - preventSleep, - playWarningSound, - isDebugEnabled, - writePromptLog, - generateReportDir, - isValidReportDirName, -} from '../../../shared/utils/index.js'; -import type { PromptLogRecord } from '../../../shared/utils/index.js'; -import { - createProviderEventLogger, - isProviderEventsEnabled, -} from '../../../shared/utils/providerEventLogger.js'; -import { selectOption, promptInput } from '../../../shared/prompt/index.js'; -import { getLabel } from '../../../shared/i18n/index.js'; -import { EXIT_SIGINT } from '../../../shared/exitCodes.js'; -import { ShutdownManager } from './shutdownManager.js'; -import { buildRunPaths } from '../../../core/piece/run/run-paths.js'; -import { resolveRuntimeConfig } from '../../../core/runtime/runtime-environment.js'; -import { writeFileAtomic, ensureDir } from '../../../infra/config/index.js'; -import { getGlobalConfigDir } from '../../../infra/config/paths.js'; -import { - initAnalyticsWriter, - writeAnalyticsEvent, - parseFindingsFromReport, - extractDecisionFromReport, - inferSeverity, - emitFixActionEvents, - emitRebuttalEvents, -} from '../../analytics/index.js'; -import type { MovementResultEvent, ReviewFindingEvent } from '../../analytics/index.js'; - const log = createLogger('piece'); -/** - * Output facade — routes through TaskPrefixWriter when task prefix is active, - * or falls through to the raw module functions for single-task execution. - */ -interface OutputFns { - header: (title: string) => void; - info: (message: string) => void; - warn: (message: string) => void; - error: (message: string) => void; - success: (message: string) => void; - status: (label: string, value: string, color?: 'green' | 'yellow' | 'red') => void; - blankLine: () => void; - logLine: (text: string) => void; -} - -interface RunMeta { - task: string; - piece: string; - runSlug: string; - runRoot: string; - reportDirectory: string; - contextDirectory: string; - logsDirectory: string; - status: 'running' | 'completed' | 'aborted'; - startTime: string; - endTime?: string; - iterations?: number; -} - function assertTaskPrefixPair( taskPrefix: string | undefined, - taskColorIndex: number | undefined + taskColorIndex: number | undefined, ): void { - const hasTaskPrefix = taskPrefix != null; - const hasTaskColorIndex = taskColorIndex != null; - if (hasTaskPrefix !== hasTaskColorIndex) { + if ((taskPrefix != null) !== (taskColorIndex != null)) { throw new Error('taskPrefix and taskColorIndex must be provided together'); } } -function toJudgmentMatchMethod( - matchedRuleMethod: string | undefined, -): string | undefined { - if (!matchedRuleMethod) return undefined; - if (matchedRuleMethod === 'structured_output') return 'structured_output'; - if (matchedRuleMethod === 'ai_judge' || matchedRuleMethod === 'ai_judge_fallback') return 'ai_judge'; - if (matchedRuleMethod === 'phase3_tag' || matchedRuleMethod === 'phase1_tag') return 'tag_fallback'; - return undefined; -} - -function createOutputFns(prefixWriter: TaskPrefixWriter | undefined): OutputFns { - if (!prefixWriter) { - return { - header: rawHeader, - info: rawInfo, - warn: rawWarn, - error: rawError, - success: rawSuccess, - status: rawStatus, - blankLine: rawBlankLine, - logLine: (text: string) => console.log(text), - }; - } - return { - header: (title: string) => prefixWriter.writeLine(`=== ${title} ===`), - info: (message: string) => prefixWriter.writeLine(`[INFO] ${message}`), - warn: (message: string) => prefixWriter.writeLine(`[WARN] ${message}`), - error: (message: string) => prefixWriter.writeLine(`[ERROR] ${message}`), - success: (message: string) => prefixWriter.writeLine(message), - status: (label: string, value: string) => prefixWriter.writeLine(`${label}: ${value}`), - blankLine: () => prefixWriter.writeLine(''), - logLine: (text: string) => prefixWriter.writeLine(text), - }; -} - -/** - * Create a stream handler that routes all stream events through TaskPrefixWriter. - * Text and tool_output are line-buffered; block events are output per-line with prefix. - */ -function createPrefixedStreamHandler( - writer: TaskPrefixWriter, -): (event: Parameters>[0]) => void { - return (event) => { - switch (event.type) { - case 'text': - writer.writeChunk(event.data.text); - break; - case 'tool_use': - writer.writeLine(`[tool] ${event.data.tool}`); - break; - case 'tool_result': { - const label = event.data.isError ? '✗' : '✓'; - writer.writeLine(` ${label} ${event.data.content}`); - break; - } - case 'tool_output': - writer.writeChunk(event.data.output); - break; - case 'thinking': - writer.writeChunk(event.data.thinking); - break; - case 'init': - case 'result': - case 'error': - break; - } - }; -} - -/** - * Truncate string to maximum length - */ function truncate(str: string, maxLength: number): string { - if (str.length <= maxLength) { - return str; - } - return str.slice(0, maxLength) + '...'; + return str.length <= maxLength ? str : str.slice(0, maxLength) + '...'; } -/** - * Format elapsed time in human-readable format - */ function formatElapsedTime(startTime: string, endTime: string): string { - const start = new Date(startTime).getTime(); - const end = new Date(endTime).getTime(); - const elapsedMs = end - start; - const elapsedSec = elapsedMs / 1000; - - if (elapsedSec < 60) { - return `${elapsedSec.toFixed(1)}s`; - } - - const minutes = Math.floor(elapsedSec / 60); - const seconds = Math.floor(elapsedSec % 60); - return `${minutes}m ${seconds}s`; + const elapsedSec = (new Date(endTime).getTime() - new Date(startTime).getTime()) / 1000; + if (elapsedSec < 60) return `${elapsedSec.toFixed(1)}s`; + return `${Math.floor(elapsedSec / 60)}m ${Math.floor(elapsedSec % 60)}s`; } /** @@ -231,122 +59,59 @@ export async function executePiece( pieceConfig: PieceConfig, task: string, cwd: string, - options: PieceExecutionOptions + options: PieceExecutionOptions, ): Promise { - const { - headerPrefix = 'Running Piece:', - interactiveUserInput = false, - } = options; - - // projectCwd is where .takt/ lives (project root, not the clone) + const { headerPrefix = 'Running Piece:', interactiveUserInput = false } = options; const projectCwd = options.projectCwd; assertTaskPrefixPair(options.taskPrefix, options.taskColorIndex); - // When taskPrefix is set (parallel execution), route all output through TaskPrefixWriter const prefixWriter = options.taskPrefix != null - ? new TaskPrefixWriter({ - taskName: options.taskPrefix, - colorIndex: options.taskColorIndex!, - displayLabel: options.taskDisplayLabel, - }) + ? new TaskPrefixWriter({ taskName: options.taskPrefix, colorIndex: options.taskColorIndex!, displayLabel: options.taskDisplayLabel }) : undefined; const out = createOutputFns(prefixWriter); - // Retry reuses saved sessions; normal runs start fresh const isRetry = Boolean(options.startMovement || options.retryNote); log.debug('Session mode', { isRetry, isWorktree: cwd !== projectCwd }); - out.header(`${headerPrefix} ${pieceConfig.name}`); const pieceSessionId = generateSessionId(); const runSlug = options.reportDirName ?? generateReportDir(task); - if (!isValidReportDirName(runSlug)) { - throw new Error(`Invalid reportDirName: ${runSlug}`); - } - const runPaths = buildRunPaths(cwd, runSlug); + if (!isValidReportDirName(runSlug)) throw new Error(`Invalid reportDirName: ${runSlug}`); - const runMeta: RunMeta = { - task, - piece: pieceConfig.name, - runSlug: runPaths.slug, - runRoot: runPaths.runRootRel, - reportDirectory: runPaths.reportsRel, - contextDirectory: runPaths.contextRel, - logsDirectory: runPaths.logsRel, - status: 'running', - startTime: new Date().toISOString(), - }; - ensureDir(runPaths.runRootAbs); - writeFileAtomic(runPaths.metaAbs, JSON.stringify(runMeta, null, 2)); - let isMetaFinalized = false; - const finalizeRunMeta = (status: 'completed' | 'aborted', iterations?: number): void => { - writeFileAtomic(runPaths.metaAbs, JSON.stringify({ - ...runMeta, - status, - endTime: new Date().toISOString(), - ...(iterations != null ? { iterations } : {}), - } satisfies RunMeta, null, 2)); - isMetaFinalized = true; - }; + const runPaths = buildRunPaths(cwd, runSlug); + const runMetaManager = new RunMetaManager(runPaths, task, pieceConfig.name); let sessionLog = createSessionLog(task, projectCwd, pieceConfig.name); + const ndjsonLogPath = initNdjsonLog(pieceSessionId, task, pieceConfig.name, { logsDir: runPaths.logsAbs }); + const sessionLogger = new SessionLogger(ndjsonLogPath); - // Initialize NDJSON log file at run-scoped logs directory - const ndjsonLogPath = initNdjsonLog(pieceSessionId, task, pieceConfig.name, { - logsDir: runPaths.logsAbs, - }); - - // Write interactive mode records if interactive mode was used before this piece if (options.interactiveMetadata) { - const startRecord: NdjsonInteractiveStart = { - type: 'interactive_start', - timestamp: new Date().toISOString(), - }; - appendNdjsonLine(ndjsonLogPath, startRecord); - - const endRecord: NdjsonInteractiveEnd = { - type: 'interactive_end', - confirmed: options.interactiveMetadata.confirmed, - ...(options.interactiveMetadata.task ? { task: options.interactiveMetadata.task } : {}), - timestamp: new Date().toISOString(), - }; - appendNdjsonLine(ndjsonLogPath, endRecord); + sessionLogger.writeInteractiveMetadata(options.interactiveMetadata); } - // Track current display for streaming const displayRef: { current: StreamDisplay | null } = { current: null }; - - // Create stream handler — when prefixWriter is active, use it for line-buffered - // output to prevent mid-line interleaving between concurrent tasks. - // When not in parallel mode, delegate to StreamDisplay as before. const streamHandler = prefixWriter ? createPrefixedStreamHandler(prefixWriter) : (event: Parameters>[0]): void => { - if (!displayRef.current) return; - if (event.type === 'result') return; + if (!displayRef.current || event.type === 'result') return; displayRef.current.createHandler()(event); }; - // Load saved agent sessions only on retry; normal runs start with empty sessions const isWorktree = cwd !== projectCwd; const globalConfig = resolvePieceConfigValues( projectCwd, ['notificationSound', 'notificationSoundEvents', 'provider', 'runtime', 'preventSleep', 'model', 'observability', 'analytics'], ); const shouldNotify = globalConfig.notificationSound !== false; - const notificationSoundEvents = globalConfig.notificationSoundEvents; - const shouldNotifyIterationLimit = shouldNotify && notificationSoundEvents?.iterationLimit !== false; - const shouldNotifyPieceComplete = shouldNotify && notificationSoundEvents?.pieceComplete !== false; - const shouldNotifyPieceAbort = shouldNotify && notificationSoundEvents?.pieceAbort !== false; + const nse = globalConfig.notificationSoundEvents; + const shouldNotifyIterationLimit = shouldNotify && nse?.iterationLimit !== false; + const shouldNotifyPieceComplete = shouldNotify && nse?.pieceComplete !== false; + const shouldNotifyPieceAbort = shouldNotify && nse?.pieceAbort !== false; const currentProvider = globalConfig.provider; + if (!currentProvider) throw new Error('No provider configured. Set "provider" in ~/.takt/config.yaml'); const configuredModel = options.model ?? globalConfig.model; - if (!currentProvider) { - throw new Error('No provider configured. Set "provider" in ~/.takt/config.yaml'); - } - const effectivePieceConfig: PieceConfig = { - ...pieceConfig, - runtime: resolveRuntimeConfig(globalConfig.runtime, pieceConfig.runtime), - }; + + const effectivePieceConfig: PieceConfig = { ...pieceConfig, runtime: resolveRuntimeConfig(globalConfig.runtime, pieceConfig.runtime) }; const providerEventLogger = createProviderEventLogger({ logsDir: runPaths.logsAbs, sessionId: pieceSessionId, @@ -356,112 +121,28 @@ export async function executePiece( enabled: isProviderEventsEnabled(globalConfig), }); - const analyticsEnabled = globalConfig.analytics?.enabled === true; - const eventsDir = globalConfig.analytics?.eventsPath - ?? join(getGlobalConfigDir(), 'analytics', 'events'); - initAnalyticsWriter(analyticsEnabled, eventsDir); + initAnalyticsWriter(globalConfig.analytics?.enabled === true, globalConfig.analytics?.eventsPath ?? join(getGlobalConfigDir(), 'analytics', 'events')); + if (globalConfig.preventSleep) preventSleep(); - // Prevent macOS idle sleep if configured - if (globalConfig.preventSleep) { - preventSleep(); - } + const analyticsEmitter = new AnalyticsEmitter(runSlug, currentProvider, configuredModel ?? '(default)'); const savedSessions = isRetry - ? (isWorktree - ? loadWorktreeSessions(projectCwd, cwd, currentProvider) - : loadPersonaSessions(projectCwd, currentProvider)) + ? (isWorktree ? loadWorktreeSessions(projectCwd, cwd, currentProvider) : loadPersonaSessions(projectCwd, currentProvider)) : {}; - - // Session update handler - persist session IDs when they change - // Clone sessions are stored separately per clone path const sessionUpdateHandler = isWorktree - ? (personaName: string, personaSessionId: string): void => { - updateWorktreeSession(projectCwd, cwd, personaName, personaSessionId, currentProvider); - } - : (persona: string, personaSessionId: string): void => { - updatePersonaSession(projectCwd, persona, personaSessionId, currentProvider); - }; + ? (personaName: string, personaSessionId: string) => updateWorktreeSession(projectCwd, cwd, personaName, personaSessionId, currentProvider) + : (persona: string, personaSessionId: string) => updatePersonaSession(projectCwd, persona, personaSessionId, currentProvider); - const iterationLimitHandler = async ( - request: IterationLimitRequest - ): Promise => { - if (displayRef.current) { - displayRef.current.flush(); - displayRef.current = null; - } - - out.blankLine(); - out.warn( - getLabel('piece.iterationLimit.maxReached', undefined, { - currentIteration: String(request.currentIteration), - maxMovements: String(request.maxMovements), - }) - ); - out.info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement })); - - if (shouldNotifyIterationLimit) { - playWarningSound(); - } - - enterInputWait(); - try { - const action = await selectOption(getLabel('piece.iterationLimit.continueQuestion'), [ - { - label: getLabel('piece.iterationLimit.continueLabel'), - value: 'continue', - description: getLabel('piece.iterationLimit.continueDescription'), - }, - { label: getLabel('piece.iterationLimit.stopLabel'), value: 'stop' }, - ]); - - if (action !== 'continue') { - return null; - } - - while (true) { - const input = await promptInput(getLabel('piece.iterationLimit.inputPrompt')); - if (!input) { - return null; - } - - const additionalIterations = Number.parseInt(input, 10); - if (Number.isInteger(additionalIterations) && additionalIterations > 0) { - pieceConfig.maxMovements = request.maxMovements + additionalIterations; - return additionalIterations; - } - - out.warn(getLabel('piece.iterationLimit.invalidInput')); - } - } finally { - leaveInputWait(); - } - }; - - const onUserInput = interactiveUserInput - ? async (request: UserInputRequest): Promise => { - if (displayRef.current) { - displayRef.current.flush(); - displayRef.current = null; - } - out.blankLine(); - out.info(request.prompt.trim()); - const input = await promptInput(getLabel('piece.iterationLimit.userInputPrompt')); - return input && input.trim() ? input.trim() : null; - } - : undefined; + const iterationLimitHandler = createIterationLimitHandler(out, displayRef, shouldNotifyIterationLimit); + const onUserInput = interactiveUserInput ? createUserInputHandler(out, displayRef) : undefined; let abortReason: string | undefined; let lastMovementContent: string | undefined; let lastMovementName: string | undefined; let currentIteration = 0; - let currentMovementProvider = currentProvider; - let currentMovementModel = configuredModel ?? '(default)'; - const phasePrompts = new Map(); const movementIterations = new Map(); let engine: PieceEngine | null = null; - let onAbortSignal: (() => void) | undefined; - let shutdownManager: ShutdownManager | undefined; - let onEpipe: ((err: NodeJS.ErrnoException) => void) | undefined; const runAbortController = new AbortController(); + const abortHandler = new AbortHandler({ externalSignal: options.abortSignal, internalController: runAbortController, getEngine: () => engine }); try { engine = new PieceEngine(effectivePieceConfig, cwd, task, { @@ -490,377 +171,112 @@ export async function executePiece( taskColorIndex: options.taskColorIndex, }); - engine.on('phase:start', (step, phase, phaseName, instruction) => { - log.debug('Phase starting', { step: step.name, phase, phaseName }); - const record: NdjsonPhaseStart = { - type: 'phase_start', - step: step.name, - phase, - phaseName, - timestamp: new Date().toISOString(), - ...(instruction ? { instruction } : {}), - }; - appendNdjsonLine(ndjsonLogPath, record); + abortHandler.install(); - if (isDebugEnabled()) { - phasePrompts.set(`${step.name}:${phase}`, instruction); - } - }); + engine.on('phase:start', (step, phase, phaseName, instruction) => { + log.debug('Phase starting', { step: step.name, phase, phaseName }); + sessionLogger.onPhaseStart(step, phase, phaseName, instruction); + }); engine.on('phase:complete', (step, phase, phaseName, content, phaseStatus, phaseError) => { - log.debug('Phase completed', { step: step.name, phase, phaseName, status: phaseStatus }); - const record: NdjsonPhaseComplete = { - type: 'phase_complete', - step: step.name, - phase, - phaseName, - status: phaseStatus, - content, - timestamp: new Date().toISOString(), - ...(phaseError ? { error: phaseError } : {}), - }; - appendNdjsonLine(ndjsonLogPath, record); - - const promptKey = `${step.name}:${phase}`; - const prompt = phasePrompts.get(promptKey); - phasePrompts.delete(promptKey); - - if (isDebugEnabled()) { - if (prompt) { - const promptRecord: PromptLogRecord = { - movement: step.name, - phase, - iteration: currentIteration, - prompt, - response: content, - timestamp: new Date().toISOString(), - }; - writePromptLog(promptRecord); - } - } - }); + log.debug('Phase completed', { step: step.name, phase, phaseName, status: phaseStatus }); + sessionLogger.setIteration(currentIteration); + sessionLogger.onPhaseComplete(step, phase, phaseName, content, phaseStatus, phaseError); + }); engine.on('movement:start', (step, iteration, instruction, providerInfo) => { - log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration }); - currentIteration = iteration; - const movementIteration = (movementIterations.get(step.name) ?? 0) + 1; - movementIterations.set(step.name, movementIteration); - prefixWriter?.setMovementContext({ - movementName: step.name, - iteration, - maxMovements: pieceConfig.maxMovements, - movementIteration, + log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration }); + currentIteration = iteration; + const movementIteration = (movementIterations.get(step.name) ?? 0) + 1; + movementIterations.set(step.name, movementIteration); + prefixWriter?.setMovementContext({ movementName: step.name, iteration, maxMovements: pieceConfig.maxMovements, movementIteration }); + out.info(`[${iteration}/${pieceConfig.maxMovements}] ${step.name} (${step.personaDisplayName})`); + const movementProvider = providerInfo.provider ?? currentProvider; + const movementModel = providerInfo.model ?? (movementProvider === currentProvider ? configuredModel : undefined) ?? '(default)'; + providerEventLogger.setMovement(step.name); + providerEventLogger.setProvider(movementProvider); + out.info(`Provider: ${movementProvider}`); + out.info(`Model: ${movementModel}`); + if (instruction) log.debug('Step instruction', instruction); + analyticsEmitter.updateProviderInfo(iteration, movementProvider, movementModel); + if (!prefixWriter) { + const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); + displayRef.current = new StreamDisplay(step.personaDisplayName, isQuietMode(), { + iteration, + maxMovements: pieceConfig.maxMovements, + movementIndex: movementIndex >= 0 ? movementIndex : 0, + totalMovements: pieceConfig.movements.length, + }); + } + sessionLogger.onMovementStart(step, iteration, instruction); }); - out.info(`[${iteration}/${pieceConfig.maxMovements}] ${step.name} (${step.personaDisplayName})`); - const movementProvider = providerInfo.provider ?? currentProvider; - const movementModel = providerInfo.model - ?? (movementProvider === currentProvider ? configuredModel : undefined) - ?? '(default)'; - currentMovementProvider = movementProvider; - currentMovementModel = movementModel; - providerEventLogger.setMovement(step.name); - providerEventLogger.setProvider(movementProvider); - out.info(`Provider: ${movementProvider}`); - out.info(`Model: ${movementModel}`); - - // Log prompt content for debugging - if (instruction) { - log.debug('Step instruction', instruction); - } - - // Find movement index for progress display - const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); - const totalMovements = pieceConfig.movements.length; - - // In parallel mode, StreamDisplay is not used (prefixWriter handles output). - // In single mode, StreamDisplay renders stream events directly. - if (!prefixWriter) { - const quiet = isQuietMode(); - const agentLabel = step.personaDisplayName; - displayRef.current = new StreamDisplay(agentLabel, quiet, { - iteration, - maxMovements: pieceConfig.maxMovements, - movementIndex: movementIndex >= 0 ? movementIndex : 0, - totalMovements, - }); - } - - // Write step_start record to NDJSON log - const record: NdjsonStepStart = { - type: 'step_start', - step: step.name, - persona: step.personaDisplayName, - iteration, - timestamp: new Date().toISOString(), - ...(instruction ? { instruction } : {}), - }; - appendNdjsonLine(ndjsonLogPath, record); - - }); engine.on('movement:complete', (step, response, instruction) => { - log.debug('Movement completed', { - step: step.name, - status: response.status, - matchedRuleIndex: response.matchedRuleIndex, - matchedRuleMethod: response.matchedRuleMethod, - contentLength: response.content.length, - sessionId: response.sessionId, - error: response.error, - }); - - // Capture last movement output for session state - lastMovementContent = response.content; - lastMovementName = step.name; - - if (displayRef.current) { - displayRef.current.flush(); - displayRef.current = null; - } - prefixWriter?.flush(); - out.blankLine(); - - if (response.matchedRuleIndex != null && step.rules) { - const rule = step.rules[response.matchedRuleIndex]; - if (rule) { + log.debug('Movement completed', { step: step.name, status: response.status, matchedRuleIndex: response.matchedRuleIndex, matchedRuleMethod: response.matchedRuleMethod, contentLength: response.content.length, sessionId: response.sessionId, error: response.error }); + lastMovementContent = response.content; + lastMovementName = step.name; + if (displayRef.current) { displayRef.current.flush(); displayRef.current = null; } + prefixWriter?.flush(); + out.blankLine(); + if (response.matchedRuleIndex != null && step.rules) { + const rule = step.rules[response.matchedRuleIndex]; const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : ''; - out.status('Status', `${rule.condition}${methodLabel}`); + out.status('Status', rule ? `${rule.condition}${methodLabel}` : response.status); } else { out.status('Status', response.status); } - } else { - out.status('Status', response.status); - } - - if (response.error) { - out.error(`Error: ${response.error}`); - } - if (response.sessionId) { - out.status('Session', response.sessionId); - } - - // Write step_complete record to NDJSON log - const matchMethod = toJudgmentMatchMethod(response.matchedRuleMethod); - const record: NdjsonStepComplete = { - type: 'step_complete', - step: step.name, - persona: response.persona, - status: response.status, - content: response.content, - instruction, - ...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}), - ...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}), - ...(matchMethod ? { matchMethod } : {}), - ...(response.error ? { error: response.error } : {}), - timestamp: response.timestamp.toISOString(), - }; - appendNdjsonLine(ndjsonLogPath, record); - - const decisionTag = (response.matchedRuleIndex != null && step.rules) - ? (step.rules[response.matchedRuleIndex]?.condition ?? response.status) - : response.status; - const movementResultEvent: MovementResultEvent = { - type: 'movement_result', - movement: step.name, - provider: currentMovementProvider, - model: currentMovementModel, - decisionTag, - iteration: currentIteration, - runId: runSlug, - timestamp: response.timestamp.toISOString(), - }; - writeAnalyticsEvent(movementResultEvent); - - if (step.edit === true && step.name.includes('fix')) { - emitFixActionEvents(response.content, currentIteration, runSlug, response.timestamp); - } - - if (step.name.includes('no_fix')) { - emitRebuttalEvents(response.content, currentIteration, runSlug, response.timestamp); - } - - // Update in-memory log for pointer metadata (immutable) - sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 }; - }); + if (response.error) out.error(`Error: ${response.error}`); + if (response.sessionId) out.status('Session', response.sessionId); + sessionLogger.onMovementComplete(step, response, instruction); + analyticsEmitter.onMovementComplete(step, response); + sessionLog = { ...sessionLog, iterations: sessionLog.iterations + 1 }; + }); engine.on('movement:report', (step, filePath, fileName) => { - const content = readFileSync(filePath, 'utf-8'); - out.logLine(`\n📄 Report: ${fileName}\n`); - out.logLine(content); - - if (step.edit === false) { - const decision = extractDecisionFromReport(content); - if (decision) { - const findings = parseFindingsFromReport(content); - for (const finding of findings) { - const event: ReviewFindingEvent = { - type: 'review_finding', - findingId: finding.findingId, - status: finding.status, - ruleId: finding.ruleId, - severity: inferSeverity(finding.findingId), - decision, - file: finding.file, - line: finding.line, - iteration: currentIteration, - runId: runSlug, - timestamp: new Date().toISOString(), - }; - writeAnalyticsEvent(event); - } - } - } - }); + out.logLine(`\n📄 Report: ${fileName}\n`); + out.logLine(readFileSync(filePath, 'utf-8')); + analyticsEmitter.onMovementReport(step, filePath); + }); engine.on('piece:complete', (state) => { - log.info('Piece completed successfully', { iterations: state.iteration }); - sessionLog = finalizeSessionLog(sessionLog, 'completed'); - - // Write piece_complete record to NDJSON log - const record: NdjsonPieceComplete = { - type: 'piece_complete', - iterations: state.iteration, - endTime: new Date().toISOString(), - }; - appendNdjsonLine(ndjsonLogPath, record); - finalizeRunMeta('completed', state.iteration); - - // Save session state for next interactive mode - try { - const sessionState: SessionState = { - status: 'success', - taskResult: truncate(lastMovementContent ?? '', 1000), - timestamp: new Date().toISOString(), - pieceName: pieceConfig.name, - taskContent: truncate(task, 200), - lastMovement: lastMovementName, - }; - saveSessionState(projectCwd, sessionState); - } catch (error) { - log.error('Failed to save session state', { error }); - } - - const elapsed = sessionLog.endTime - ? formatElapsedTime(sessionLog.startTime, sessionLog.endTime) - : ''; - const elapsedDisplay = elapsed ? `, ${elapsed}` : ''; - - out.success(`Piece completed (${state.iteration} iterations${elapsedDisplay})`); - out.info(`Session log: ${ndjsonLogPath}`); - if (shouldNotifyPieceComplete) { - notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) })); - } - }); + log.info('Piece completed successfully', { iterations: state.iteration }); + sessionLog = finalizeSessionLog(sessionLog, 'completed'); + sessionLogger.onPieceComplete(state); + runMetaManager.finalize('completed', state.iteration); + try { + saveSessionState(projectCwd, { status: 'success', taskResult: truncate(lastMovementContent ?? '', 1000), timestamp: new Date().toISOString(), pieceName: pieceConfig.name, taskContent: truncate(task, 200), lastMovement: lastMovementName } satisfies SessionState); + } catch (error) { log.error('Failed to save session state', { error }); } + const elapsed = sessionLog.endTime ? formatElapsedTime(sessionLog.startTime, sessionLog.endTime) : ''; + out.success(`Piece completed (${state.iteration} iterations${elapsed ? `, ${elapsed}` : ''})`); + out.info(`Session log: ${ndjsonLogPath}`); + if (shouldNotifyPieceComplete) notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) })); + }); engine.on('piece:abort', (state, reason) => { - interruptAllQueries(); - log.error('Piece aborted', { reason, iterations: state.iteration }); - if (displayRef.current) { - displayRef.current.flush(); - displayRef.current = null; - } - prefixWriter?.flush(); - abortReason = reason; - sessionLog = finalizeSessionLog(sessionLog, 'aborted'); - - // Write piece_abort record to NDJSON log - const record: NdjsonPieceAbort = { - type: 'piece_abort', - iterations: state.iteration, - reason, - endTime: new Date().toISOString(), - }; - appendNdjsonLine(ndjsonLogPath, record); - finalizeRunMeta('aborted', state.iteration); - - // Save session state for next interactive mode - try { - const sessionState: SessionState = { - status: reason === 'user_interrupted' ? 'user_stopped' : 'error', - errorMessage: reason, - timestamp: new Date().toISOString(), - pieceName: pieceConfig.name, - taskContent: truncate(task, 200), - lastMovement: lastMovementName, - }; - saveSessionState(projectCwd, sessionState); - } catch (error) { - log.error('Failed to save session state', { error }); - } - - const elapsed = sessionLog.endTime - ? formatElapsedTime(sessionLog.startTime, sessionLog.endTime) - : ''; - const elapsedDisplay = elapsed ? ` (${elapsed})` : ''; - - out.error(`Piece aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`); - out.info(`Session log: ${ndjsonLogPath}`); - if (shouldNotifyPieceAbort) { - notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason })); - } - }); - - // Suppress EPIPE errors from SDK child process stdin after interrupt. - // When interruptAllQueries() kills the child process, the SDK may still - // try to write to the dead process's stdin pipe, causing an unhandled - // EPIPE error on the Socket. This handler catches it gracefully. - onEpipe = (err: NodeJS.ErrnoException) => { - if (err.code === 'EPIPE') return; - throw err; - }; - - const abortEngine = () => { - if (!engine || !onEpipe) { - throw new Error('Abort handler invoked before PieceEngine initialization'); - } - if (!runAbortController.signal.aborted) { - runAbortController.abort(); - } - process.on('uncaughtException', onEpipe); interruptAllQueries(); - engine.abort(); - }; - - // SIGINT handling: when abortSignal is provided (parallel mode), delegate to caller - const useExternalAbort = Boolean(options.abortSignal); - if (useExternalAbort) { - onAbortSignal = abortEngine; - if (options.abortSignal!.aborted) { - abortEngine(); - } else { - options.abortSignal!.addEventListener('abort', onAbortSignal, { once: true }); - } - } else { - shutdownManager = new ShutdownManager({ - callbacks: { - onGraceful: abortEngine, - onForceKill: () => process.exit(EXIT_SIGINT), - }, - }); - shutdownManager.install(); - } + log.error('Piece aborted', { reason, iterations: state.iteration }); + if (displayRef.current) { displayRef.current.flush(); displayRef.current = null; } + prefixWriter?.flush(); + abortReason = reason; + sessionLog = finalizeSessionLog(sessionLog, 'aborted'); + sessionLogger.onPieceAbort(state, reason); + runMetaManager.finalize('aborted', state.iteration); + try { + saveSessionState(projectCwd, { status: reason === 'user_interrupted' ? 'user_stopped' : 'error', errorMessage: reason, timestamp: new Date().toISOString(), pieceName: pieceConfig.name, taskContent: truncate(task, 200), lastMovement: lastMovementName } satisfies SessionState); + } catch (error) { log.error('Failed to save session state', { error }); } + const elapsed = sessionLog.endTime ? formatElapsedTime(sessionLog.startTime, sessionLog.endTime) : ''; + out.error(`Piece aborted after ${state.iteration} iterations${elapsed ? ` (${elapsed})` : ''}: ${reason}`); + out.info(`Session log: ${ndjsonLogPath}`); + if (shouldNotifyPieceAbort) notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason })); + }); const finalState = await engine.run(); - - return { - success: finalState.status === 'completed', - reason: abortReason, - lastMovement: lastMovementName, - lastMessage: lastMovementContent, - }; + return { success: finalState.status === 'completed', reason: abortReason, lastMovement: lastMovementName, lastMessage: lastMovementContent }; } catch (error) { - if (!isMetaFinalized) { - finalizeRunMeta('aborted'); - } + if (!runMetaManager.isFinalized) runMetaManager.finalize('aborted'); throw error; } finally { prefixWriter?.flush(); - shutdownManager?.cleanup(); - if (onAbortSignal && options.abortSignal) { - options.abortSignal.removeEventListener('abort', onAbortSignal); - } - if (onEpipe) { - process.removeListener('uncaughtException', onEpipe); - } + abortHandler.cleanup(); } } diff --git a/src/features/tasks/execute/runMeta.ts b/src/features/tasks/execute/runMeta.ts new file mode 100644 index 0000000..a53a073 --- /dev/null +++ b/src/features/tasks/execute/runMeta.ts @@ -0,0 +1,60 @@ +/** + * RunMeta — 実行メタデータの管理モジュール + * + * ランのメタデータ(task, piece, status, 開始・終了時刻など)を + * .takt/runs/{slug}/meta.json へ書き出す責務を担う。 + */ + +import { writeFileAtomic, ensureDir } from '../../../infra/config/index.js'; +import type { RunPaths } from '../../../core/piece/run/run-paths.js'; + +interface RunMeta { + task: string; + piece: string; + runSlug: string; + runRoot: string; + reportDirectory: string; + contextDirectory: string; + logsDirectory: string; + status: 'running' | 'completed' | 'aborted'; + startTime: string; + endTime?: string; + iterations?: number; +} + +export class RunMetaManager { + private readonly runMeta: RunMeta; + private readonly metaAbs: string; + private finalized = false; + + constructor(runPaths: RunPaths, task: string, pieceName: string) { + this.metaAbs = runPaths.metaAbs; + this.runMeta = { + task, + piece: pieceName, + runSlug: runPaths.slug, + runRoot: runPaths.runRootRel, + reportDirectory: runPaths.reportsRel, + contextDirectory: runPaths.contextRel, + logsDirectory: runPaths.logsRel, + status: 'running', + startTime: new Date().toISOString(), + }; + ensureDir(runPaths.runRootAbs); + writeFileAtomic(this.metaAbs, JSON.stringify(this.runMeta, null, 2)); + } + + finalize(status: 'completed' | 'aborted', iterations?: number): void { + writeFileAtomic(this.metaAbs, JSON.stringify({ + ...this.runMeta, + status, + endTime: new Date().toISOString(), + ...(iterations != null ? { iterations } : {}), + } satisfies RunMeta, null, 2)); + this.finalized = true; + } + + get isFinalized(): boolean { + return this.finalized; + } +} diff --git a/src/features/tasks/execute/sessionLogger.ts b/src/features/tasks/execute/sessionLogger.ts new file mode 100644 index 0000000..3a8666d --- /dev/null +++ b/src/features/tasks/execute/sessionLogger.ts @@ -0,0 +1,178 @@ +/** + * Session logger — NDJSON ログ書き出し専用モジュール + * + * PieceEngine のイベントを受け取り、NDJSON セッションログへ追記する責務を担う。 + * analytics や UI 出力は担当しない。 + */ + +import { + appendNdjsonLine, + type NdjsonStepStart, + type NdjsonStepComplete, + type NdjsonPieceComplete, + type NdjsonPieceAbort, + type NdjsonPhaseStart, + type NdjsonPhaseComplete, + type NdjsonInteractiveStart, + type NdjsonInteractiveEnd, +} from '../../../infra/fs/index.js'; +import type { InteractiveMetadata } from './types.js'; +import { isDebugEnabled, writePromptLog } from '../../../shared/utils/index.js'; +import type { PromptLogRecord } from '../../../shared/utils/index.js'; +import type { PieceMovement, AgentResponse, PieceState } from '../../../core/models/index.js'; +import type { PhaseName } from '../../../core/piece/index.js'; + +function toJudgmentMatchMethod( + matchedRuleMethod: string | undefined, +): string | undefined { + if (!matchedRuleMethod) return undefined; + if (matchedRuleMethod === 'structured_output') return 'structured_output'; + if (matchedRuleMethod === 'ai_judge' || matchedRuleMethod === 'ai_judge_fallback') return 'ai_judge'; + if (matchedRuleMethod === 'phase3_tag' || matchedRuleMethod === 'phase1_tag') return 'tag_fallback'; + return undefined; +} + +export class SessionLogger { + private readonly ndjsonLogPath: string; + /** phase 開始時のプロンプトを一時保持(デバッグ用) */ + private readonly phasePrompts = new Map(); + /** 現在のピース全体のイテレーション数 */ + private currentIteration = 0; + + constructor(ndjsonLogPath: string) { + this.ndjsonLogPath = ndjsonLogPath; + } + + /** インタラクティブモードのメタデータ(interactive_start / interactive_end)を NDJSON へ記録する */ + writeInteractiveMetadata(meta: InteractiveMetadata): void { + const startRecord: NdjsonInteractiveStart = { type: 'interactive_start', timestamp: new Date().toISOString() }; + appendNdjsonLine(this.ndjsonLogPath, startRecord); + const endRecord: NdjsonInteractiveEnd = { + type: 'interactive_end', + confirmed: meta.confirmed, + ...(meta.task ? { task: meta.task } : {}), + timestamp: new Date().toISOString(), + }; + appendNdjsonLine(this.ndjsonLogPath, endRecord); + } + + /** 現在のイテレーション番号を更新する(movement:start で呼ぶ) */ + setIteration(iteration: number): void { + this.currentIteration = iteration; + } + + onPhaseStart( + step: PieceMovement, + phase: 1 | 2 | 3, + phaseName: PhaseName, + instruction: string, + ): void { + const record: NdjsonPhaseStart = { + type: 'phase_start', + step: step.name, + phase, + phaseName, + timestamp: new Date().toISOString(), + ...(instruction ? { instruction } : {}), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + + if (isDebugEnabled()) { + this.phasePrompts.set(`${step.name}:${phase}`, instruction); + } + } + + onPhaseComplete( + step: PieceMovement, + phase: 1 | 2 | 3, + phaseName: PhaseName, + content: string, + phaseStatus: string, + phaseError: string | undefined, + ): void { + const record: NdjsonPhaseComplete = { + type: 'phase_complete', + step: step.name, + phase, + phaseName, + status: phaseStatus, + content, + timestamp: new Date().toISOString(), + ...(phaseError ? { error: phaseError } : {}), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + + const promptKey = `${step.name}:${phase}`; + const prompt = this.phasePrompts.get(promptKey); + this.phasePrompts.delete(promptKey); + + if (isDebugEnabled() && prompt) { + const promptRecord: PromptLogRecord = { + movement: step.name, + phase, + iteration: this.currentIteration, + prompt, + response: content, + timestamp: new Date().toISOString(), + }; + writePromptLog(promptRecord); + } + } + + onMovementStart( + step: PieceMovement, + iteration: number, + instruction: string | undefined, + ): void { + const record: NdjsonStepStart = { + type: 'step_start', + step: step.name, + persona: step.personaDisplayName, + iteration, + timestamp: new Date().toISOString(), + ...(instruction ? { instruction } : {}), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + } + + onMovementComplete( + step: PieceMovement, + response: AgentResponse, + instruction: string, + ): void { + const matchMethod = toJudgmentMatchMethod(response.matchedRuleMethod); + const record: NdjsonStepComplete = { + type: 'step_complete', + step: step.name, + persona: response.persona, + status: response.status, + content: response.content, + instruction, + ...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}), + ...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}), + ...(matchMethod ? { matchMethod } : {}), + ...(response.error ? { error: response.error } : {}), + timestamp: response.timestamp.toISOString(), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + } + + onPieceComplete(state: PieceState): void { + const record: NdjsonPieceComplete = { + type: 'piece_complete', + iterations: state.iteration, + endTime: new Date().toISOString(), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + } + + onPieceAbort(state: PieceState, reason: string): void { + const record: NdjsonPieceAbort = { + type: 'piece_abort', + iterations: state.iteration, + reason, + endTime: new Date().toISOString(), + }; + appendNdjsonLine(this.ndjsonLogPath, record); + } +} diff --git a/src/index.ts b/src/index.ts index ea3f3e6..e8ba0ae 100644 --- a/src/index.ts +++ b/src/index.ts @@ -41,12 +41,6 @@ export { export { PieceEngine, isOutputContractItem, - executeAgent, - generateReport, - executePart, - judgeStatus, - evaluateCondition, - decomposeTask, } from './core/piece/index.js'; export type { PieceEvents, @@ -56,5 +50,15 @@ export type { IterationLimitCallback, PieceEngineOptions, ProviderType, - JudgeStatusResult, } from './core/piece/index.js'; + +// Agent usecases +export { + executeAgent, + generateReport, + executePart, + judgeStatus, + evaluateCondition, + decomposeTask, +} from './agents/agent-usecases.js'; +export type { JudgeStatusResult } from './agents/agent-usecases.js'; diff --git a/src/infra/config/constants.ts b/src/infra/config/constants.ts new file mode 100644 index 0000000..363e562 --- /dev/null +++ b/src/infra/config/constants.ts @@ -0,0 +1,10 @@ +/** + * Shared infrastructure-level constants. + * + * Defined here (infra/config) rather than features/repertoire so that + * infra/config/paths.ts can reference the directory name without creating + * an upward infra → features dependency. + */ + +/** Directory name for the repertoire packages dir (~/.takt/repertoire). */ +export const REPERTOIRE_DIR_NAME = 'repertoire'; diff --git a/src/infra/config/loaders/pieceParser.ts b/src/infra/config/loaders/pieceParser.ts index 4dd58da..25c7f33 100644 --- a/src/infra/config/loaders/pieceParser.ts +++ b/src/infra/config/loaders/pieceParser.ts @@ -226,9 +226,9 @@ function normalizeArpeggio( const merge: ArpeggioMergeMovementConfig = raw.merge ? { strategy: raw.merge.strategy, - inlineJs: raw.merge.inline_js, - filePath: raw.merge.file ? resolve(pieceDir, raw.merge.file) : undefined, separator: raw.merge.separator, + inlineJs: raw.merge.inline_js, + file: raw.merge.file ? resolve(pieceDir, raw.merge.file) : undefined, } : { strategy: 'concat' }; diff --git a/src/infra/config/paths.ts b/src/infra/config/paths.ts index d1cc86b..a5e36da 100644 --- a/src/infra/config/paths.ts +++ b/src/infra/config/paths.ts @@ -12,7 +12,7 @@ import type { Language } from '../../core/models/index.js'; import { getLanguageResourcesDir } from '../resources/index.js'; import type { FacetKind } from '../../faceted-prompting/index.js'; -import { REPERTOIRE_DIR_NAME } from '../../features/repertoire/constants.js'; +import { REPERTOIRE_DIR_NAME } from './constants.js'; /** Facet types used in layer resolution */ export type { FacetKind as FacetType } from '../../faceted-prompting/index.js'; diff --git a/src/infra/config/project/sessionStore.ts b/src/infra/config/project/sessionStore.ts index 3ab91ba..486bccd 100644 --- a/src/infra/config/project/sessionStore.ts +++ b/src/infra/config/project/sessionStore.ts @@ -88,6 +88,70 @@ import type { PersonaSessionData } from '../types.js'; export type { PersonaSessionData }; +/** + * Read session data from a file path. + * Returns empty record if file doesn't exist, is malformed, or provider has changed. + */ +function readSessionData(sessionPath: string, currentProvider?: string): Record { + if (!existsSync(sessionPath)) return {}; + try { + const content = readFileSync(sessionPath, 'utf-8'); + const data = JSON.parse(content) as PersonaSessionData; + // If provider has changed or is unknown (legacy data), sessions are incompatible — discard them + if (currentProvider && data.provider !== currentProvider) { + return {}; + } + return data.personaSessions || {}; + } catch { + return {}; + } +} + +/** + * Update a single persona session atomically (read-modify-write). + * @param sessionPath - Path to the session JSON file + * @param ensureSessionDir - Function that ensures the session directory exists + * @param persona - Persona (key) to update + * @param sessionId - New session ID + * @param provider - Current provider (used to detect provider change) + */ +function updateSessionData( + sessionPath: string, + ensureSessionDir: () => void, + persona: string, + sessionId: string, + provider?: string, +): void { + ensureSessionDir(); + + let sessions: Record = {}; + let existingProvider: string | undefined; + if (existsSync(sessionPath)) { + try { + const content = readFileSync(sessionPath, 'utf-8'); + const data = JSON.parse(content) as PersonaSessionData; + existingProvider = data.provider; + // If provider changed, discard old sessions + if (provider && existingProvider && existingProvider !== provider) { + sessions = {}; + } else { + sessions = data.personaSessions || {}; + } + } catch { + sessions = {}; + } + } + + sessions[persona] = sessionId; + + const data: PersonaSessionData = { + personaSessions: sessions, + updatedAt: new Date().toISOString(), + provider: provider ?? existingProvider, + }; + writeFileAtomic(sessionPath, JSON.stringify(data, null, 2)); +} + /** Get path for storing persona sessions */ export function getPersonaSessionsPath(projectDir: string): string { return join(getProjectConfigDir(projectDir), 'persona_sessions.json'); @@ -95,21 +159,7 @@ export function getPersonaSessionsPath(projectDir: string): string { /** Load saved persona sessions. Returns empty if provider has changed. */ export function loadPersonaSessions(projectDir: string, currentProvider?: string): Record { - const path = getPersonaSessionsPath(projectDir); - if (existsSync(path)) { - try { - const content = readFileSync(path, 'utf-8'); - const data = JSON.parse(content) as PersonaSessionData; - // If provider has changed or is unknown (legacy data), sessions are incompatible — discard them - if (currentProvider && data.provider !== currentProvider) { - return {}; - } - return data.personaSessions || {}; - } catch { - return {}; - } - } - return {}; + return readSessionData(getPersonaSessionsPath(projectDir), currentProvider); } /** Save persona sessions (atomic write) */ @@ -138,35 +188,13 @@ export function updatePersonaSession( sessionId: string, provider?: string ): void { - const path = getPersonaSessionsPath(projectDir); - ensureDir(getProjectConfigDir(projectDir)); - - let sessions: Record = {}; - let existingProvider: string | undefined; - if (existsSync(path)) { - try { - const content = readFileSync(path, 'utf-8'); - const data = JSON.parse(content) as PersonaSessionData; - existingProvider = data.provider; - // If provider changed, discard old sessions - if (provider && existingProvider && existingProvider !== provider) { - sessions = {}; - } else { - sessions = data.personaSessions || {}; - } - } catch { - sessions = {}; - } - } - - sessions[persona] = sessionId; - - const data: PersonaSessionData = { - personaSessions: sessions, - updatedAt: new Date().toISOString(), - provider: provider ?? existingProvider, - }; - writeFileAtomic(path, JSON.stringify(data, null, 2)); + updateSessionData( + getPersonaSessionsPath(projectDir), + () => ensureDir(getProjectConfigDir(projectDir)), + persona, + sessionId, + provider, + ); } /** Clear all saved persona sessions */ @@ -209,20 +237,7 @@ export function loadWorktreeSessions( worktreePath: string, currentProvider?: string ): Record { - const sessionPath = getWorktreeSessionPath(projectDir, worktreePath); - if (existsSync(sessionPath)) { - try { - const content = readFileSync(sessionPath, 'utf-8'); - const data = JSON.parse(content) as PersonaSessionData; - if (currentProvider && data.provider !== currentProvider) { - return {}; - } - return data.personaSessions || {}; - } catch { - return {}; - } - } - return {}; + return readSessionData(getWorktreeSessionPath(projectDir, worktreePath), currentProvider); } /** Update a single persona session for a worktree (atomic) */ @@ -233,36 +248,13 @@ export function updateWorktreeSession( sessionId: string, provider?: string ): void { - const dir = getWorktreeSessionsDir(projectDir); - ensureDir(dir); - - const sessionPath = getWorktreeSessionPath(projectDir, worktreePath); - let sessions: Record = {}; - let existingProvider: string | undefined; - - if (existsSync(sessionPath)) { - try { - const content = readFileSync(sessionPath, 'utf-8'); - const data = JSON.parse(content) as PersonaSessionData; - existingProvider = data.provider; - if (provider && existingProvider && existingProvider !== provider) { - sessions = {}; - } else { - sessions = data.personaSessions || {}; - } - } catch { - sessions = {}; - } - } - - sessions[personaName] = sessionId; - - const data: PersonaSessionData = { - personaSessions: sessions, - updatedAt: new Date().toISOString(), - provider: provider ?? existingProvider, - }; - writeFileAtomic(sessionPath, JSON.stringify(data, null, 2)); + updateSessionData( + getWorktreeSessionPath(projectDir, worktreePath), + () => ensureDir(getWorktreeSessionsDir(projectDir)), + personaName, + sessionId, + provider, + ); } /** diff --git a/src/infra/fs/session.ts b/src/infra/fs/session.ts index d8b8f4a..47151bc 100644 --- a/src/infra/fs/session.ts +++ b/src/infra/fs/session.ts @@ -185,17 +185,9 @@ export class SessionManager { }; } - /** Load session log from file (supports both .json and .jsonl formats) */ + /** Load session log from a .jsonl file */ loadSessionLog(filepath: string): SessionLog | null { - if (filepath.endsWith('.jsonl')) { - return this.loadNdjsonLog(filepath); - } - - if (!existsSync(filepath)) { - return null; - } - const content = readFileSync(filepath, 'utf-8'); - return JSON.parse(content) as SessionLog; + return this.loadNdjsonLog(filepath); } } diff --git a/src/core/piece/schema-loader.ts b/src/infra/resources/schema-loader.ts similarity index 97% rename from src/core/piece/schema-loader.ts rename to src/infra/resources/schema-loader.ts index eebf6b9..3f13116 100644 --- a/src/core/piece/schema-loader.ts +++ b/src/infra/resources/schema-loader.ts @@ -1,6 +1,6 @@ import { readFileSync } from 'node:fs'; import { join } from 'node:path'; -import { getResourcesDir } from '../../infra/resources/index.js'; +import { getResourcesDir } from './index.js'; type JsonSchema = Record; diff --git a/src/infra/task/clone.ts b/src/infra/task/clone.ts index a46c900..89f3207 100644 --- a/src/infra/task/clone.ts +++ b/src/infra/task/clone.ts @@ -62,9 +62,7 @@ export class CloneManager { /** * Resolve the base directory for clones from global config. * Returns the configured worktree_dir (resolved to absolute), or - * the default 'takt-worktrees' (plural). Automatically migrates - * legacy 'takt-worktree' (singular) to 'takt-worktrees' if only - * the legacy directory exists. + * the default 'takt-worktrees' (plural). */ private static resolveCloneBaseDir(projectDir: string): string { const worktreeDir = resolveConfigValue(projectDir, 'worktreeDir'); @@ -73,13 +71,7 @@ export class CloneManager { ? worktreeDir : path.resolve(projectDir, worktreeDir); } - const newDir = path.join(projectDir, '..', 'takt-worktrees'); - const legacyDir = path.join(projectDir, '..', 'takt-worktree'); - // Auto-migrate: rename legacy singular to plural - if (fs.existsSync(legacyDir) && !fs.existsSync(newDir)) { - fs.renameSync(legacyDir, newDir); - } - return newDir; + return path.join(projectDir, '..', 'takt-worktrees'); } /** Resolve the clone path based on options and global config */ @@ -373,9 +365,20 @@ export class CloneManager { try { const raw = fs.readFileSync(CloneManager.getCloneMetaPath(projectDir, branch), 'utf-8'); const meta = JSON.parse(raw) as { clonePath: string }; - if (fs.existsSync(meta.clonePath)) { - this.removeClone(meta.clonePath); - log.info('Orphaned clone cleaned up', { branch, clonePath: meta.clonePath }); + // Validate clonePath is within the expected clone base directory to prevent path traversal. + const cloneBaseDir = path.resolve(CloneManager.resolveCloneBaseDir(projectDir)); + const resolvedClonePath = path.resolve(meta.clonePath); + if (!resolvedClonePath.startsWith(cloneBaseDir + path.sep)) { + log.error('Refusing to remove clone outside of clone base directory', { + branch, + clonePath: meta.clonePath, + cloneBaseDir, + }); + return; + } + if (fs.existsSync(resolvedClonePath)) { + this.removeClone(resolvedClonePath); + log.info('Orphaned clone cleaned up', { branch, clonePath: resolvedClonePath }); } } catch { // No metadata or parse error — nothing to clean up diff --git a/src/shared/types/provider.ts b/src/shared/types/provider.ts new file mode 100644 index 0000000..793da49 --- /dev/null +++ b/src/shared/types/provider.ts @@ -0,0 +1,64 @@ +/** + * Shared provider-protocol types used across layers. + * + * Defined here (shared/) rather than core/piece/ so that modules in the + * shared/ layer (StreamDisplay, providerEventLogger, etc.) can import them + * without creating an upward shared → core dependency. + * + * core/piece/types.ts re-exports these for backward compatibility. + */ + +export type ProviderType = 'claude' | 'codex' | 'opencode' | 'cursor' | 'copilot' | 'mock'; + +export interface StreamInitEventData { + model: string; + sessionId: string; +} + +export interface StreamToolUseEventData { + tool: string; + input: Record; + id: string; +} + +export interface StreamToolResultEventData { + content: string; + isError: boolean; +} + +export interface StreamToolOutputEventData { + tool: string; + output: string; +} + +export interface StreamTextEventData { + text: string; +} + +export interface StreamThinkingEventData { + thinking: string; +} + +export interface StreamResultEventData { + result: string; + sessionId: string; + success: boolean; + error?: string; +} + +export interface StreamErrorEventData { + message: string; + raw?: string; +} + +export type StreamEvent = + | { type: 'init'; data: StreamInitEventData } + | { type: 'tool_use'; data: StreamToolUseEventData } + | { type: 'tool_result'; data: StreamToolResultEventData } + | { type: 'tool_output'; data: StreamToolOutputEventData } + | { type: 'text'; data: StreamTextEventData } + | { type: 'thinking'; data: StreamThinkingEventData } + | { type: 'result'; data: StreamResultEventData } + | { type: 'error'; data: StreamErrorEventData }; + +export type StreamCallback = (event: StreamEvent) => void; diff --git a/src/shared/ui/StreamDisplay.ts b/src/shared/ui/StreamDisplay.ts index 702cefa..cf4426e 100644 --- a/src/shared/ui/StreamDisplay.ts +++ b/src/shared/ui/StreamDisplay.ts @@ -6,11 +6,7 @@ */ import chalk from 'chalk'; -// NOTE: type-only import from core — acceptable because StreamDisplay is -// a UI renderer tightly coupled to the piece event protocol. -// Moving StreamEvent/StreamCallback to shared would require relocating all -// dependent event-data types, which is out of scope for this refactoring. -import type { StreamEvent, StreamCallback } from '../../core/piece/index.js'; +import type { StreamEvent, StreamCallback } from '../types/provider.js'; import { truncate } from './LogManager.js'; import { stripAnsi } from '../utils/text.js'; diff --git a/src/shared/utils/providerEventLogger.ts b/src/shared/utils/providerEventLogger.ts index 0789e90..6c51953 100644 --- a/src/shared/utils/providerEventLogger.ts +++ b/src/shared/utils/providerEventLogger.ts @@ -1,6 +1,6 @@ import { appendFileSync } from 'node:fs'; import { join } from 'node:path'; -import type { ProviderType, StreamCallback, StreamEvent } from '../../core/piece/index.js'; +import type { ProviderType, StreamCallback, StreamEvent } from '../types/provider.js'; export interface ProviderEventLoggerConfig { logsDir: string;