From 88f7b38796a016ebae46a32929eab021c9cd677b Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Mon, 9 Feb 2026 09:03:34 +0900 Subject: [PATCH] takt: improve-parallel-output-prefix (#172) --- src/__tests__/engine-parallel.test.ts | 67 +++++++++++++++++++ src/__tests__/parallel-logger.test.ts | 23 +++++++ .../pieceExecution-debug-prompts.test.ts | 49 +++++++++++++- src/__tests__/runAllTasks-concurrency.test.ts | 6 +- src/__tests__/task-prefix-writer.test.ts | 27 ++++++-- src/__tests__/workerPool.test.ts | 2 +- src/core/piece/engine/ParallelRunner.ts | 39 ++++++++--- src/core/piece/engine/PieceEngine.ts | 9 +++ src/core/piece/engine/parallel-logger.ts | 22 ++++++ src/core/piece/types.ts | 4 ++ src/features/tasks/execute/pieceExecution.ts | 27 +++++++- src/shared/ui/TaskPrefixWriter.ts | 32 +++++++-- 12 files changed, 282 insertions(+), 25 deletions(-) diff --git a/src/__tests__/engine-parallel.test.ts b/src/__tests__/engine-parallel.test.ts index d9e9bbb..11b88cf 100644 --- a/src/__tests__/engine-parallel.test.ts +++ b/src/__tests__/engine-parallel.test.ts @@ -160,4 +160,71 @@ describe('PieceEngine Integration: Parallel Movement Aggregation', () => { expect(calledAgents).toContain('../personas/arch-review.md'); expect(calledAgents).toContain('../personas/security-review.md'); }); + + it('should output rich parallel prefix when taskPrefix/taskColorIndex are provided', async () => { + const config = buildDefaultPieceConfig(); + const stdoutSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + const parentOnStream = vi.fn(); + + const responsesByPersona = new Map>([ + ['../personas/plan.md', makeResponse({ persona: 'plan', content: 'Plan done' })], + ['../personas/implement.md', makeResponse({ persona: 'implement', content: 'Impl done' })], + ['../personas/ai_review.md', makeResponse({ persona: 'ai_review', content: 'OK' })], + ['../personas/arch-review.md', makeResponse({ persona: 'arch-review', content: 'Architecture review content' })], + ['../personas/security-review.md', makeResponse({ persona: 'security-review', content: 'Security review content' })], + ['../personas/supervise.md', makeResponse({ persona: 'supervise', content: 'All passed' })], + ]); + + vi.mocked(runAgent).mockImplementation(async (persona, _task, options) => { + const response = responsesByPersona.get(persona ?? ''); + if (!response) { + throw new Error(`Unexpected persona: ${persona}`); + } + + if (persona === '../personas/arch-review.md') { + options.onStream?.({ type: 'text', data: { text: 'arch stream line\n' } }); + } + if (persona === '../personas/security-review.md') { + options.onStream?.({ type: 'text', data: { text: 'security stream line\n' } }); + } + + return response; + }); + + mockDetectMatchedRuleSequence([ + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'phase1_tag' }, + { index: 0, method: 'aggregate' }, + { index: 0, method: 'phase1_tag' }, + ]); + + const engine = new PieceEngine(config, tmpDir, 'test task', { + projectCwd: tmpDir, + onStream: parentOnStream, + taskPrefix: 'override-persona-provider', + taskColorIndex: 0, + }); + + try { + const state = await engine.run(); + expect(state.status).toBe('completed'); + + const output = stdoutSpy.mock.calls.map((call) => String(call[0])).join(''); + expect(output).toContain('[over]'); + expect(output).toContain('[reviewers][arch-review](4/30)(1) arch stream line'); + expect(output).toContain('[reviewers][security-review](4/30)(1) security stream line'); + } finally { + stdoutSpy.mockRestore(); + } + }); + + it('should fail fast when taskPrefix is provided without taskColorIndex', () => { + const config = buildDefaultPieceConfig(); + expect( + () => new PieceEngine(config, tmpDir, 'test task', { projectCwd: tmpDir, taskPrefix: 'override-persona-provider' }) + ).toThrow('taskPrefix and taskColorIndex must be provided together'); + }); }); diff --git a/src/__tests__/parallel-logger.test.ts b/src/__tests__/parallel-logger.test.ts index 36af15f..5a4113c 100644 --- a/src/__tests__/parallel-logger.test.ts +++ b/src/__tests__/parallel-logger.test.ts @@ -67,6 +67,29 @@ describe('ParallelLogger', () => { // No padding needed (0 spaces) expect(prefix).toMatch(/\x1b\[0m $/); }); + + it('should build rich prefix with task and parent movement for parallel task mode', () => { + const logger = new ParallelLogger({ + subMovementNames: ['arch-review'], + writeFn, + progressInfo: { + iteration: 4, + maxIterations: 30, + }, + taskLabel: 'override-persona-provider', + taskColorIndex: 0, + parentMovementName: 'reviewers', + movementIteration: 1, + }); + + const prefix = logger.buildPrefix('arch-review', 0); + expect(prefix).toContain('\x1b[36m'); + expect(prefix).toContain('[over]'); + expect(prefix).toContain('[reviewers]'); + expect(prefix).toContain('[arch-review]'); + expect(prefix).toContain('(4/30)(1)'); + expect(prefix).not.toContain('step 1/1'); + }); }); describe('text event line buffering', () => { diff --git a/src/__tests__/pieceExecution-debug-prompts.test.ts b/src/__tests__/pieceExecution-debug-prompts.test.ts index 7d9ca04..c99bc19 100644 --- a/src/__tests__/pieceExecution-debug-prompts.test.ts +++ b/src/__tests__/pieceExecution-debug-prompts.test.ts @@ -14,10 +14,12 @@ const { mockIsDebugEnabled, mockWritePromptLog, MockPieceEngine } = vi.hoisted(( class MockPieceEngine extends EE { private config: PieceConfig; + private task: string; - constructor(config: PieceConfig, _cwd: string, _task: string, _options: unknown) { + constructor(config: PieceConfig, _cwd: string, task: string, _options: unknown) { super(); this.config = config; + this.task = task; } abort(): void {} @@ -26,6 +28,7 @@ const { mockIsDebugEnabled, mockWritePromptLog, MockPieceEngine } = vi.hoisted(( const step = this.config.movements[0]!; const timestamp = new Date('2026-02-07T00:00:00.000Z'); + const shouldRepeatMovement = this.task === 'repeat-movement-task'; this.emit('movement:start', step, 1, 'movement instruction'); this.emit('phase:start', step, 1, 'execute', 'phase prompt'); this.emit('phase:complete', step, 1, 'execute', 'phase response', 'done'); @@ -40,9 +43,23 @@ const { mockIsDebugEnabled, mockWritePromptLog, MockPieceEngine } = vi.hoisted(( }, 'movement instruction' ); + if (shouldRepeatMovement) { + this.emit('movement:start', step, 2, 'movement instruction repeat'); + this.emit( + 'movement:complete', + step, + { + persona: step.personaDisplayName, + status: 'done', + content: 'movement response repeat', + timestamp, + }, + 'movement instruction repeat' + ); + } this.emit('piece:complete', { status: 'completed', iteration: 1 }); - return { status: 'completed', iteration: 1 }; + return { status: 'completed', iteration: shouldRepeatMovement ? 2 : 1 }; } } @@ -187,4 +204,32 @@ describe('executePiece debug prompts logging', () => { expect(mockWritePromptLog).not.toHaveBeenCalled(); }); + + it('should update movement prefix context on each movement:start event', async () => { + const stdoutSpy = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + + try { + await executePiece(makeConfig(), 'repeat-movement-task', '/tmp/project', { + projectCwd: '/tmp/project', + taskPrefix: 'override-persona-provider', + taskColorIndex: 0, + }); + + const output = stdoutSpy.mock.calls.map((call) => String(call[0])).join(''); + const normalizedOutput = output.replace(/\x1b\[[0-9;]*m/g, ''); + expect(normalizedOutput).toContain('[over][implement](1/5)(1) [INFO] [1/5] implement (coder)'); + expect(normalizedOutput).toContain('[over][implement](2/5)(2) [INFO] [2/5] implement (coder)'); + } finally { + stdoutSpy.mockRestore(); + } + }); + + it('should fail fast when taskPrefix is provided without taskColorIndex', async () => { + await expect( + executePiece(makeConfig(), 'task', '/tmp/project', { + projectCwd: '/tmp/project', + taskPrefix: 'override-persona-provider', + }) + ).rejects.toThrow('taskPrefix and taskColorIndex must be provided together'); + }); }); diff --git a/src/__tests__/runAllTasks-concurrency.test.ts b/src/__tests__/runAllTasks-concurrency.test.ts index 249209c..1bc7ccd 100644 --- a/src/__tests__/runAllTasks-concurrency.test.ts +++ b/src/__tests__/runAllTasks-concurrency.test.ts @@ -225,11 +225,11 @@ describe('runAllTasks concurrency', () => { // Then: Task names displayed with prefix in stdout const allOutput = stdoutChunks.join(''); - expect(allOutput).toContain('[task-1]'); + expect(allOutput).toContain('[task]'); expect(allOutput).toContain('=== Task: task-1 ==='); - expect(allOutput).toContain('[task-2]'); + expect(allOutput).toContain('[task]'); expect(allOutput).toContain('=== Task: task-2 ==='); - expect(allOutput).toContain('[task-3]'); + expect(allOutput).toContain('[task]'); expect(allOutput).toContain('=== Task: task-3 ==='); expect(mockStatus).toHaveBeenCalledWith('Total', '3'); }); diff --git a/src/__tests__/task-prefix-writer.test.ts b/src/__tests__/task-prefix-writer.test.ts index 5bc8f0f..8cc4fdb 100644 --- a/src/__tests__/task-prefix-writer.test.ts +++ b/src/__tests__/task-prefix-writer.test.ts @@ -42,13 +42,13 @@ describe('TaskPrefixWriter', () => { }); describe('writeLine', () => { - it('should output single line with prefix', () => { + it('should output single line with truncated task prefix', () => { const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn }); writer.writeLine('Hello World'); expect(output).toHaveLength(1); - expect(output[0]).toContain('[my-task]'); + expect(output[0]).toContain('[my-t]'); expect(output[0]).toContain('Hello World'); expect(output[0]).toMatch(/\n$/); }); @@ -94,7 +94,7 @@ describe('TaskPrefixWriter', () => { writer.writeChunk(' World\n'); expect(output).toHaveLength(1); - expect(output[0]).toContain('[task-a]'); + expect(output[0]).toContain('[task]'); expect(output[0]).toContain('Hello World'); }); @@ -154,7 +154,7 @@ describe('TaskPrefixWriter', () => { writer.flush(); expect(output).toHaveLength(1); - expect(output[0]).toContain('[task-a]'); + expect(output[0]).toContain('[task]'); expect(output[0]).toContain('partial content'); expect(output[0]).toMatch(/\n$/); }); @@ -181,4 +181,23 @@ describe('TaskPrefixWriter', () => { }); }); + describe('setMovementContext', () => { + it('should include movement context in prefix after context update', () => { + const writer = new TaskPrefixWriter({ taskName: 'override-persona-provider', colorIndex: 0, writeFn }); + + writer.setMovementContext({ + movementName: 'implement', + iteration: 4, + maxIterations: 30, + movementIteration: 2, + }); + writer.writeLine('content'); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('[over]'); + expect(output[0]).toContain('[implement](4/30)(2)'); + expect(output[0]).toContain('content'); + }); + }); + }); diff --git a/src/__tests__/workerPool.test.ts b/src/__tests__/workerPool.test.ts index a72351e..c1b2896 100644 --- a/src/__tests__/workerPool.test.ts +++ b/src/__tests__/workerPool.test.ts @@ -118,7 +118,7 @@ describe('runWithWorkerPool', () => { // Then: Task names appear in prefixed stdout output writeSpy.mockRestore(); const allOutput = stdoutChunks.join(''); - expect(allOutput).toContain('[alpha]'); + expect(allOutput).toContain('[alph]'); expect(allOutput).toContain('=== Task: alpha ==='); expect(allOutput).toContain('[beta]'); expect(allOutput).toContain('=== Task: beta ==='); diff --git a/src/core/piece/engine/ParallelRunner.ts b/src/core/piece/engine/ParallelRunner.ts index 6cdf8f1..a6f9ba5 100644 --- a/src/core/piece/engine/ParallelRunner.ts +++ b/src/core/piece/engine/ParallelRunner.ts @@ -20,6 +20,7 @@ import { buildSessionKey } from '../session-key.js'; import type { OptionsBuilder } from './OptionsBuilder.js'; import type { MovementExecutor } from './MovementExecutor.js'; import type { PieceEngineOptions, PhaseName } from '../types.js'; +import type { ParallelLoggerOptions } from './parallel-logger.js'; const log = createLogger('parallel-runner'); @@ -69,14 +70,7 @@ export class ParallelRunner { // Create parallel logger for prefixed output (only when streaming is enabled) const parallelLogger = this.deps.engineOptions.onStream - ? new ParallelLogger({ - subMovementNames: subMovements.map((s) => s.name), - parentOnStream: this.deps.engineOptions.onStream, - progressInfo: { - iteration: state.iteration, - maxIterations, - }, - }) + ? new ParallelLogger(this.buildParallelLoggerOptions(step.name, movementIteration, subMovements.map((s) => s.name), state.iteration, maxIterations)) : undefined; const ruleCtx = { @@ -202,4 +196,33 @@ export class ParallelRunner { return { response: aggregatedResponse, instruction: aggregatedInstruction }; } + private buildParallelLoggerOptions( + movementName: string, + movementIteration: number, + subMovementNames: string[], + iteration: number, + maxIterations: number, + ): ParallelLoggerOptions { + const options: ParallelLoggerOptions = { + subMovementNames, + parentOnStream: this.deps.engineOptions.onStream, + progressInfo: { + iteration, + maxIterations, + }, + }; + + if (this.deps.engineOptions.taskPrefix != null && this.deps.engineOptions.taskColorIndex != null) { + return { + ...options, + taskLabel: this.deps.engineOptions.taskPrefix, + taskColorIndex: this.deps.engineOptions.taskColorIndex, + parentMovementName: movementName, + movementIteration, + }; + } + + return options; + } + } diff --git a/src/core/piece/engine/PieceEngine.ts b/src/core/piece/engine/PieceEngine.ts index 849294d..d04c984 100644 --- a/src/core/piece/engine/PieceEngine.ts +++ b/src/core/piece/engine/PieceEngine.ts @@ -69,6 +69,7 @@ export class PieceEngine extends EventEmitter { constructor(config: PieceConfig, cwd: string, task: string, options: PieceEngineOptions) { super(); + this.assertTaskPrefixPair(options.taskPrefix, options.taskColorIndex); this.config = config; this.projectCwd = options.projectCwd; this.cwd = cwd; @@ -146,6 +147,14 @@ export class PieceEngine extends EventEmitter { }); } + private assertTaskPrefixPair(taskPrefix: string | undefined, taskColorIndex: number | undefined): void { + const hasTaskPrefix = taskPrefix != null; + const hasTaskColorIndex = taskColorIndex != null; + if (hasTaskPrefix !== hasTaskColorIndex) { + throw new Error('taskPrefix and taskColorIndex must be provided together'); + } + } + /** Ensure report directory exists (in cwd, which is clone dir in worktree mode) */ private ensureReportDirExists(): void { const reportDirPath = join(this.cwd, this.reportDir); diff --git a/src/core/piece/engine/parallel-logger.ts b/src/core/piece/engine/parallel-logger.ts index e3f2fd4..9994ef5 100644 --- a/src/core/piece/engine/parallel-logger.ts +++ b/src/core/piece/engine/parallel-logger.ts @@ -30,6 +30,14 @@ export interface ParallelLoggerOptions { writeFn?: (text: string) => void; /** Progress information for display */ progressInfo?: ParallelProgressInfo; + /** Task label for rich parallel prefix display */ + taskLabel?: string; + /** Task color index for rich parallel prefix display */ + taskColorIndex?: number; + /** Parent movement name for rich parallel prefix display */ + parentMovementName?: string; + /** Parent movement iteration count for rich parallel prefix display */ + movementIteration?: number; } /** @@ -47,6 +55,10 @@ export class ParallelLogger { private readonly writeFn: (text: string) => void; private readonly progressInfo?: ParallelProgressInfo; private readonly totalSubMovements: number; + private readonly taskLabel?: string; + private readonly taskColorIndex?: number; + private readonly parentMovementName?: string; + private readonly movementIteration?: number; constructor(options: ParallelLoggerOptions) { this.maxNameLength = Math.max(...options.subMovementNames.map((n) => n.length)); @@ -54,6 +66,10 @@ export class ParallelLogger { this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text)); this.progressInfo = options.progressInfo; this.totalSubMovements = options.subMovementNames.length; + this.taskLabel = options.taskLabel ? options.taskLabel.slice(0, 4) : undefined; + this.taskColorIndex = options.taskColorIndex; + this.parentMovementName = options.parentMovementName; + this.movementIteration = options.movementIteration; for (const name of options.subMovementNames) { this.lineBuffers.set(name, ''); @@ -65,6 +81,12 @@ export class ParallelLogger { * Format: `\x1b[COLORm[name](iteration/max) step index/total\x1b[0m` + padding spaces */ buildPrefix(name: string, index: number): string { + if (this.taskLabel && this.parentMovementName && this.progressInfo && this.movementIteration != null && this.taskColorIndex != null) { + const taskColor = COLORS[this.taskColorIndex % COLORS.length]; + const { iteration, maxIterations } = this.progressInfo; + return `${taskColor}[${this.taskLabel}]${RESET}[${this.parentMovementName}][${name}](${iteration}/${maxIterations})(${this.movementIteration}) `; + } + const color = COLORS[index % COLORS.length]; const padding = ' '.repeat(this.maxNameLength - name.length); diff --git a/src/core/piece/types.ts b/src/core/piece/types.ts index f72b2b2..2fd1dc6 100644 --- a/src/core/piece/types.ts +++ b/src/core/piece/types.ts @@ -189,6 +189,10 @@ export interface PieceEngineOptions { startMovement?: string; /** Retry note explaining why task is being retried */ retryNote?: string; + /** Task name prefix for parallel task execution output */ + taskPrefix?: string; + /** Color index for task prefix (cycled across tasks) */ + taskColorIndex?: number; } /** Loop detection result */ diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index d6ada72..786d67a 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -78,6 +78,17 @@ interface OutputFns { logLine: (text: string) => void; } +function assertTaskPrefixPair( + taskPrefix: string | undefined, + taskColorIndex: number | undefined +): void { + const hasTaskPrefix = taskPrefix != null; + const hasTaskColorIndex = taskColorIndex != null; + if (hasTaskPrefix !== hasTaskColorIndex) { + throw new Error('taskPrefix and taskColorIndex must be provided together'); + } +} + function createOutputFns(prefixWriter: TaskPrefixWriter | undefined): OutputFns { if (!prefixWriter) { return { @@ -181,10 +192,11 @@ export async function executePiece( // projectCwd is where .takt/ lives (project root, not the clone) const projectCwd = options.projectCwd; + assertTaskPrefixPair(options.taskPrefix, options.taskColorIndex); // When taskPrefix is set (parallel execution), route all output through TaskPrefixWriter - const prefixWriter = options.taskPrefix - ? new TaskPrefixWriter({ taskName: options.taskPrefix, colorIndex: options.taskColorIndex ?? 0 }) + const prefixWriter = options.taskPrefix != null + ? new TaskPrefixWriter({ taskName: options.taskPrefix, colorIndex: options.taskColorIndex! }) : undefined; const out = createOutputFns(prefixWriter); @@ -334,6 +346,8 @@ export async function executePiece( callAiJudge, startMovement: options.startMovement, retryNote: options.retryNote, + taskPrefix: options.taskPrefix, + taskColorIndex: options.taskColorIndex, }); let abortReason: string | undefined; @@ -341,6 +355,7 @@ export async function executePiece( let lastMovementName: string | undefined; let currentIteration = 0; const phasePrompts = new Map(); + const movementIterations = new Map(); engine.on('phase:start', (step, phase, phaseName, instruction) => { log.debug('Phase starting', { step: step.name, phase, phaseName }); @@ -395,6 +410,14 @@ export async function executePiece( engine.on('movement:start', (step, iteration, instruction) => { log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration }); currentIteration = iteration; + const movementIteration = (movementIterations.get(step.name) ?? 0) + 1; + movementIterations.set(step.name, movementIteration); + prefixWriter?.setMovementContext({ + movementName: step.name, + iteration, + maxIterations: pieceConfig.maxIterations, + movementIteration, + }); out.info(`[${iteration}/${pieceConfig.maxIterations}] ${step.name} (${step.personaDisplayName})`); // Log prompt content for debugging diff --git a/src/shared/ui/TaskPrefixWriter.ts b/src/shared/ui/TaskPrefixWriter.ts index da6fac4..188ea88 100644 --- a/src/shared/ui/TaskPrefixWriter.ts +++ b/src/shared/ui/TaskPrefixWriter.ts @@ -27,6 +27,13 @@ export interface TaskPrefixWriterOptions { writeFn?: (text: string) => void; } +export interface MovementPrefixContext { + movementName: string; + iteration: number; + maxIterations: number; + movementIteration: number; +} + /** * Prefixed line writer for a single parallel task. * @@ -35,16 +42,31 @@ export interface TaskPrefixWriterOptions { * non-empty output line. */ export class TaskPrefixWriter { - private readonly prefix: string; + private readonly taskPrefix: string; private readonly writeFn: (text: string) => void; + private movementContext: MovementPrefixContext | undefined; private lineBuffer = ''; constructor(options: TaskPrefixWriterOptions) { const color = TASK_COLORS[options.colorIndex % TASK_COLORS.length]; - this.prefix = `${color}[${options.taskName}]${RESET} `; + const taskLabel = options.taskName.slice(0, 4); + this.taskPrefix = `${color}[${taskLabel}]${RESET}`; this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text)); } + setMovementContext(context: MovementPrefixContext): void { + this.movementContext = context; + } + + private buildPrefix(): string { + if (!this.movementContext) { + return `${this.taskPrefix} `; + } + + const { movementName, iteration, maxIterations, movementIteration } = this.movementContext; + return `${this.taskPrefix}[${movementName}](${iteration}/${maxIterations})(${movementIteration}) `; + } + /** * Write a complete line with prefix. * Multi-line text is split and each non-empty line gets the prefix. @@ -57,7 +79,7 @@ export class TaskPrefixWriter { if (line === '') { this.writeFn('\n'); } else { - this.writeFn(`${this.prefix}${line}\n`); + this.writeFn(`${this.buildPrefix()}${line}\n`); } } } @@ -78,7 +100,7 @@ export class TaskPrefixWriter { if (line === '') { this.writeFn('\n'); } else { - this.writeFn(`${this.prefix}${line}\n`); + this.writeFn(`${this.buildPrefix()}${line}\n`); } } } @@ -88,7 +110,7 @@ export class TaskPrefixWriter { */ flush(): void { if (this.lineBuffer !== '') { - this.writeFn(`${this.prefix}${this.lineBuffer}\n`); + this.writeFn(`${this.buildPrefix()}${this.lineBuffer}\n`); this.lineBuffer = ''; } }