From 4b14a5898255b18765f017011318cf711fff8d6d Mon Sep 17 00:00:00 2001 From: nrs <38722970+nrslib@users.noreply.github.com> Date: Mon, 9 Feb 2026 00:24:12 +0900 Subject: [PATCH] github-issue-159-takt-run-noro (#166) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * caffeinate に -d フラグを追加し、ディスプレイスリープ中の App Nap によるプロセス凍結を防止 * takt 対話モードの save_task を takt add と同じ worktree 設定フローに統一 takt 対話モードで Save Task を選択した際に worktree/branch/auto_pr の 設定プロンプトがスキップされ、takt run で clone なしに実行されて成果物が 消失するバグを修正。promptWorktreeSettings() を共通関数として抽出し、 saveTaskFromInteractive() と addTask() の両方から使用するようにした。 * Release v0.9.0 * takt: github-issue-159-takt-run-noro --- src/__tests__/runAllTasks-concurrency.test.ts | 20 +- src/__tests__/task-prefix-writer.test.ts | 184 ++++++++++++++++++ src/__tests__/workerPool.test.ts | 19 +- .../tasks/execute/parallelExecution.ts | 16 +- src/features/tasks/execute/pieceExecution.ts | 177 ++++++++++++----- src/features/tasks/execute/taskExecution.ts | 6 +- src/features/tasks/execute/types.ts | 4 + src/shared/ui/TaskPrefixWriter.ts | 96 +++++++++ src/shared/ui/index.ts | 2 + 9 files changed, 467 insertions(+), 57 deletions(-) create mode 100644 src/__tests__/task-prefix-writer.test.ts create mode 100644 src/shared/ui/TaskPrefixWriter.ts diff --git a/src/__tests__/runAllTasks-concurrency.test.ts b/src/__tests__/runAllTasks-concurrency.test.ts index 02bdd5a..249209c 100644 --- a/src/__tests__/runAllTasks-concurrency.test.ts +++ b/src/__tests__/runAllTasks-concurrency.test.ts @@ -212,13 +212,25 @@ describe('runAllTasks concurrency', () => { .mockReturnValueOnce([task1, task2, task3]) .mockReturnValueOnce([]); + // In parallel mode, task start messages go through TaskPrefixWriter → process.stdout.write + const stdoutChunks: string[] = []; + const writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation((chunk: unknown) => { + stdoutChunks.push(String(chunk)); + return true; + }); + // When await runAllTasks('/project'); + writeSpy.mockRestore(); - // Then: Task names displayed - expect(mockInfo).toHaveBeenCalledWith('=== Task: task-1 ==='); - expect(mockInfo).toHaveBeenCalledWith('=== Task: task-2 ==='); - expect(mockInfo).toHaveBeenCalledWith('=== Task: task-3 ==='); + // Then: Task names displayed with prefix in stdout + const allOutput = stdoutChunks.join(''); + expect(allOutput).toContain('[task-1]'); + expect(allOutput).toContain('=== Task: task-1 ==='); + expect(allOutput).toContain('[task-2]'); + expect(allOutput).toContain('=== Task: task-2 ==='); + expect(allOutput).toContain('[task-3]'); + expect(allOutput).toContain('=== Task: task-3 ==='); expect(mockStatus).toHaveBeenCalledWith('Total', '3'); }); diff --git a/src/__tests__/task-prefix-writer.test.ts b/src/__tests__/task-prefix-writer.test.ts new file mode 100644 index 0000000..5bc8f0f --- /dev/null +++ b/src/__tests__/task-prefix-writer.test.ts @@ -0,0 +1,184 @@ +/** + * Tests for TaskPrefixWriter + */ + +import { describe, it, expect, beforeEach } from 'vitest'; +import { TaskPrefixWriter } from '../shared/ui/TaskPrefixWriter.js'; + +describe('TaskPrefixWriter', () => { + let output: string[]; + let writeFn: (text: string) => void; + + beforeEach(() => { + output = []; + writeFn = (text: string) => output.push(text); + }); + + describe('constructor', () => { + it('should cycle colors for different colorIndex values', () => { + const writer0 = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + const writer4 = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 4, writeFn }); + + writer0.writeLine('hello'); + writer4.writeLine('hello'); + + // Both index 0 and 4 should use cyan (\x1b[36m) + expect(output[0]).toContain('\x1b[36m'); + expect(output[1]).toContain('\x1b[36m'); + }); + + it('should assign correct colors in order', () => { + const writers = [0, 1, 2, 3].map( + (i) => new TaskPrefixWriter({ taskName: `t${i}`, colorIndex: i, writeFn }), + ); + + writers.forEach((w) => w.writeLine('x')); + + expect(output[0]).toContain('\x1b[36m'); // cyan + expect(output[1]).toContain('\x1b[33m'); // yellow + expect(output[2]).toContain('\x1b[35m'); // magenta + expect(output[3]).toContain('\x1b[32m'); // green + }); + }); + + describe('writeLine', () => { + it('should output single line with prefix', () => { + const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn }); + + writer.writeLine('Hello World'); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('[my-task]'); + expect(output[0]).toContain('Hello World'); + expect(output[0]).toMatch(/\n$/); + }); + + it('should output empty line as bare newline', () => { + const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn }); + + writer.writeLine(''); + + expect(output).toHaveLength(1); + expect(output[0]).toBe('\n'); + }); + + it('should split multi-line text and prefix each non-empty line', () => { + const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn }); + + writer.writeLine('Line 1\nLine 2\n\nLine 4'); + + expect(output).toHaveLength(4); + expect(output[0]).toContain('Line 1'); + expect(output[1]).toContain('Line 2'); + expect(output[2]).toBe('\n'); // empty line + expect(output[3]).toContain('Line 4'); + }); + + it('should strip ANSI codes from input text', () => { + const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn }); + + writer.writeLine('\x1b[31mRed Text\x1b[0m'); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('Red Text'); + expect(output[0]).not.toContain('\x1b[31m'); + }); + }); + + describe('writeChunk (line buffering)', () => { + it('should buffer partial line and output on newline', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('Hello'); + expect(output).toHaveLength(0); + + writer.writeChunk(' World\n'); + expect(output).toHaveLength(1); + expect(output[0]).toContain('[task-a]'); + expect(output[0]).toContain('Hello World'); + }); + + it('should handle multiple lines in single chunk', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('Line 1\nLine 2\n'); + + expect(output).toHaveLength(2); + expect(output[0]).toContain('Line 1'); + expect(output[1]).toContain('Line 2'); + }); + + it('should output empty line without prefix', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('Hello\n\nWorld\n'); + + expect(output).toHaveLength(3); + expect(output[0]).toContain('Hello'); + expect(output[1]).toBe('\n'); + expect(output[2]).toContain('World'); + }); + + it('should keep trailing partial in buffer', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('Complete\nPartial'); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('Complete'); + + writer.flush(); + expect(output).toHaveLength(2); + expect(output[1]).toContain('Partial'); + }); + + it('should strip ANSI codes from streamed chunks', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('\x1b[31mHello'); + writer.writeChunk(' World\x1b[0m\n'); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('Hello World'); + expect(output[0]).not.toContain('\x1b[31m'); + }); + }); + + describe('flush', () => { + it('should output remaining buffered content with prefix', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('partial content'); + expect(output).toHaveLength(0); + + writer.flush(); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('[task-a]'); + expect(output[0]).toContain('partial content'); + expect(output[0]).toMatch(/\n$/); + }); + + it('should not output anything when buffer is empty', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('complete line\n'); + output.length = 0; + + writer.flush(); + expect(output).toHaveLength(0); + }); + + it('should clear buffer after flush', () => { + const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn }); + + writer.writeChunk('content'); + writer.flush(); + output.length = 0; + + writer.flush(); + expect(output).toHaveLength(0); + }); + }); + +}); diff --git a/src/__tests__/workerPool.test.ts b/src/__tests__/workerPool.test.ts index 546eaa5..a72351e 100644 --- a/src/__tests__/workerPool.test.ts +++ b/src/__tests__/workerPool.test.ts @@ -102,17 +102,26 @@ describe('runWithWorkerPool', () => { expect(result).toEqual({ success: 2, fail: 1 }); }); - it('should display task name for each task', async () => { + it('should display task name for each task via prefix writer in parallel mode', async () => { // Given const tasks = [createTask('alpha'), createTask('beta')]; const runner = createMockTaskRunner([]); + const stdoutChunks: string[] = []; + const writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation((chunk: unknown) => { + stdoutChunks.push(String(chunk)); + return true; + }); // When await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default', undefined, TEST_POLL_INTERVAL_MS); - // Then - expect(mockInfo).toHaveBeenCalledWith('=== Task: alpha ==='); - expect(mockInfo).toHaveBeenCalledWith('=== Task: beta ==='); + // Then: Task names appear in prefixed stdout output + writeSpy.mockRestore(); + const allOutput = stdoutChunks.join(''); + expect(allOutput).toContain('[alpha]'); + expect(allOutput).toContain('=== Task: alpha ==='); + expect(allOutput).toContain('[beta]'); + expect(allOutput).toContain('=== Task: beta ==='); }); it('should pass taskPrefix for parallel execution (concurrency > 1)', async () => { @@ -129,6 +138,7 @@ describe('runWithWorkerPool', () => { expect(parallelOpts).toEqual({ abortSignal: expect.any(AbortSignal), taskPrefix: 'my-task', + taskColorIndex: 0, }); }); @@ -146,6 +156,7 @@ describe('runWithWorkerPool', () => { expect(parallelOpts).toEqual({ abortSignal: undefined, taskPrefix: undefined, + taskColorIndex: undefined, }); }); diff --git a/src/features/tasks/execute/parallelExecution.ts b/src/features/tasks/execute/parallelExecution.ts index 12e4ad7..67a3c4c 100644 --- a/src/features/tasks/execute/parallelExecution.ts +++ b/src/features/tasks/execute/parallelExecution.ts @@ -13,6 +13,7 @@ import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js'; import { info, blankLine } from '../../../shared/ui/index.js'; +import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; import { createLogger } from '../../../shared/utils/index.js'; import { executeAndCompleteTask } from './taskExecution.js'; import { installSigIntHandler } from './sigintHandler.js'; @@ -102,6 +103,7 @@ export async function runWithWorkerPool( const queue = [...initialTasks]; const active = new Map, TaskInfo>(); + const colorCounter = { value: 0 }; try { while (queue.length > 0 || active.size > 0) { @@ -109,7 +111,7 @@ export async function runWithWorkerPool( break; } - fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController); + fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter); if (active.size === 0) { break; @@ -171,17 +173,25 @@ function fillSlots( pieceName: string, options: TaskExecutionOptions | undefined, abortController: AbortController, + colorCounter: { value: number }, ): void { while (active.size < concurrency && queue.length > 0) { const task = queue.shift()!; const isParallel = concurrency > 1; + const colorIndex = colorCounter.value++; - blankLine(); - info(`=== Task: ${task.name} ===`); + if (isParallel) { + const writer = new TaskPrefixWriter({ taskName: task.name, colorIndex }); + writer.writeLine(`=== Task: ${task.name} ===`); + } else { + blankLine(); + info(`=== Task: ${task.name} ===`); + } const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { abortSignal: isParallel ? abortController.signal : undefined, taskPrefix: isParallel ? task.name : undefined, + taskColorIndex: isParallel ? colorIndex : undefined, }); active.set(promise, task); } diff --git a/src/features/tasks/execute/pieceExecution.ts b/src/features/tasks/execute/pieceExecution.ts index f6f5156..481d7fa 100644 --- a/src/features/tasks/execute/pieceExecution.ts +++ b/src/features/tasks/execute/pieceExecution.ts @@ -21,15 +21,16 @@ import { } from '../../../infra/config/index.js'; import { isQuietMode } from '../../../shared/context.js'; import { - header, - info, - warn, - error, - success, - status, - blankLine, + header as rawHeader, + info as rawInfo, + warn as rawWarn, + error as rawError, + success as rawSuccess, + status as rawStatus, + blankLine as rawBlankLine, StreamDisplay, } from '../../../shared/ui/index.js'; +import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js'; import { generateSessionId, createSessionLog, @@ -62,6 +63,80 @@ import { installSigIntHandler } from './sigintHandler.js'; const log = createLogger('piece'); +/** + * Output facade — routes through TaskPrefixWriter when task prefix is active, + * or falls through to the raw module functions for single-task execution. + */ +interface OutputFns { + header: (title: string) => void; + info: (message: string) => void; + warn: (message: string) => void; + error: (message: string) => void; + success: (message: string) => void; + status: (label: string, value: string, color?: 'green' | 'yellow' | 'red') => void; + blankLine: () => void; + logLine: (text: string) => void; +} + +function createOutputFns(prefixWriter: TaskPrefixWriter | undefined): OutputFns { + if (!prefixWriter) { + return { + header: rawHeader, + info: rawInfo, + warn: rawWarn, + error: rawError, + success: rawSuccess, + status: rawStatus, + blankLine: rawBlankLine, + logLine: (text: string) => console.log(text), + }; + } + return { + header: (title: string) => prefixWriter.writeLine(`=== ${title} ===`), + info: (message: string) => prefixWriter.writeLine(`[INFO] ${message}`), + warn: (message: string) => prefixWriter.writeLine(`[WARN] ${message}`), + error: (message: string) => prefixWriter.writeLine(`[ERROR] ${message}`), + success: (message: string) => prefixWriter.writeLine(message), + status: (label: string, value: string) => prefixWriter.writeLine(`${label}: ${value}`), + blankLine: () => prefixWriter.writeLine(''), + logLine: (text: string) => prefixWriter.writeLine(text), + }; +} + +/** + * Create a stream handler that routes all stream events through TaskPrefixWriter. + * Text and tool_output are line-buffered; block events are output per-line with prefix. + */ +function createPrefixedStreamHandler( + writer: TaskPrefixWriter, +): (event: Parameters>[0]) => void { + return (event) => { + switch (event.type) { + case 'text': + writer.writeChunk(event.data.text); + break; + case 'tool_use': + writer.writeLine(`[tool] ${event.data.tool}`); + break; + case 'tool_result': { + const label = event.data.isError ? '✗' : '✓'; + writer.writeLine(` ${label} ${event.data.content}`); + break; + } + case 'tool_output': + writer.writeChunk(event.data.output); + break; + case 'thinking': + writer.writeChunk(event.data.thinking); + break; + case 'init': + case 'result': + case 'error': + break; + } + }; +} + /** * Truncate string to maximum length */ @@ -107,10 +182,16 @@ export async function executePiece( // projectCwd is where .takt/ lives (project root, not the clone) const projectCwd = options.projectCwd; + // When taskPrefix is set (parallel execution), route all output through TaskPrefixWriter + const prefixWriter = options.taskPrefix + ? new TaskPrefixWriter({ taskName: options.taskPrefix, colorIndex: options.taskColorIndex ?? 0 }) + : undefined; + const out = createOutputFns(prefixWriter); + // Always continue from previous sessions (use /clear to reset) log.debug('Continuing session (use /clear to reset)'); - header(`${headerPrefix} ${pieceConfig.name}`); + out.header(`${headerPrefix} ${pieceConfig.name}`); const pieceSessionId = generateSessionId(); let sessionLog = createSessionLog(task, projectCwd, pieceConfig.name); @@ -139,14 +220,16 @@ export async function executePiece( // Track current display for streaming const displayRef: { current: StreamDisplay | null } = { current: null }; - // Create stream handler that delegates to UI display - const streamHandler = ( - event: Parameters>[0] - ): void => { - if (!displayRef.current) return; - if (event.type === 'result') return; - displayRef.current.createHandler()(event); - }; + // Create stream handler — when prefixWriter is active, use it for line-buffered + // output to prevent mid-line interleaving between concurrent tasks. + // When not in parallel mode, delegate to StreamDisplay as before. + const streamHandler = prefixWriter + ? createPrefixedStreamHandler(prefixWriter) + : (event: Parameters>[0]): void => { + if (!displayRef.current) return; + if (event.type === 'result') return; + displayRef.current.createHandler()(event); + }; // Load saved agent sessions for continuity (from project root or clone-specific storage) const isWorktree = cwd !== projectCwd; @@ -180,14 +263,14 @@ export async function executePiece( displayRef.current = null; } - blankLine(); - warn( + out.blankLine(); + out.warn( getLabel('piece.iterationLimit.maxReached', undefined, { currentIteration: String(request.currentIteration), maxIterations: String(request.maxIterations), }) ); - info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement })); + out.info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement })); if (shouldNotify) { playWarningSound(); @@ -218,7 +301,7 @@ export async function executePiece( return additionalIterations; } - warn(getLabel('piece.iterationLimit.invalidInput')); + out.warn(getLabel('piece.iterationLimit.invalidInput')); } }; @@ -228,8 +311,8 @@ export async function executePiece( displayRef.current.flush(); displayRef.current = null; } - blankLine(); - info(request.prompt.trim()); + out.blankLine(); + out.info(request.prompt.trim()); const input = await promptInput(getLabel('piece.iterationLimit.userInputPrompt')); return input && input.trim() ? input.trim() : null; } @@ -311,7 +394,7 @@ export async function executePiece( engine.on('movement:start', (step, iteration, instruction) => { log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration }); currentIteration = iteration; - info(`[${iteration}/${pieceConfig.maxIterations}] ${step.name} (${step.personaDisplayName})`); + out.info(`[${iteration}/${pieceConfig.maxIterations}] ${step.name} (${step.personaDisplayName})`); // Log prompt content for debugging if (instruction) { @@ -322,15 +405,18 @@ export async function executePiece( const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); const totalMovements = pieceConfig.movements.length; - const quiet = isQuietMode(); - const prefix = options.taskPrefix; - const agentLabel = prefix ? `${prefix}:${step.personaDisplayName}` : step.personaDisplayName; - displayRef.current = new StreamDisplay(agentLabel, quiet, { - iteration, - maxIterations: pieceConfig.maxIterations, - movementIndex: movementIndex >= 0 ? movementIndex : 0, - totalMovements, - }); + // In parallel mode, StreamDisplay is not used (prefixWriter handles output). + // In single mode, StreamDisplay renders stream events directly. + if (!prefixWriter) { + const quiet = isQuietMode(); + const agentLabel = step.personaDisplayName; + displayRef.current = new StreamDisplay(agentLabel, quiet, { + iteration, + maxIterations: pieceConfig.maxIterations, + movementIndex: movementIndex >= 0 ? movementIndex : 0, + totalMovements, + }); + } // Write step_start record to NDJSON log const record: NdjsonStepStart = { @@ -364,25 +450,26 @@ export async function executePiece( displayRef.current.flush(); displayRef.current = null; } - blankLine(); + prefixWriter?.flush(); + out.blankLine(); if (response.matchedRuleIndex != null && step.rules) { const rule = step.rules[response.matchedRuleIndex]; if (rule) { const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : ''; - status('Status', `${rule.condition}${methodLabel}`); + out.status('Status', `${rule.condition}${methodLabel}`); } else { - status('Status', response.status); + out.status('Status', response.status); } } else { - status('Status', response.status); + out.status('Status', response.status); } if (response.error) { - error(`Error: ${response.error}`); + out.error(`Error: ${response.error}`); } if (response.sessionId) { - status('Session', response.sessionId); + out.status('Session', response.sessionId); } // Write step_complete record to NDJSON log @@ -408,8 +495,8 @@ export async function executePiece( engine.on('movement:report', (_step, filePath, fileName) => { const content = readFileSync(filePath, 'utf-8'); - console.log(`\n📄 Report: ${fileName}\n`); - console.log(content); + out.logLine(`\n📄 Report: ${fileName}\n`); + out.logLine(content); }); engine.on('piece:complete', (state) => { @@ -445,8 +532,8 @@ export async function executePiece( : ''; const elapsedDisplay = elapsed ? `, ${elapsed}` : ''; - success(`Piece completed (${state.iteration} iterations${elapsedDisplay})`); - info(`Session log: ${ndjsonLogPath}`); + out.success(`Piece completed (${state.iteration} iterations${elapsedDisplay})`); + out.info(`Session log: ${ndjsonLogPath}`); if (shouldNotify) { notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) })); } @@ -459,6 +546,7 @@ export async function executePiece( displayRef.current.flush(); displayRef.current = null; } + prefixWriter?.flush(); abortReason = reason; sessionLog = finalizeSessionLog(sessionLog, 'aborted'); @@ -492,8 +580,8 @@ export async function executePiece( : ''; const elapsedDisplay = elapsed ? ` (${elapsed})` : ''; - error(`Piece aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`); - info(`Session log: ${ndjsonLogPath}`); + out.error(`Piece aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`); + out.info(`Session log: ${ndjsonLogPath}`); if (shouldNotify) { notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason })); } @@ -536,6 +624,7 @@ export async function executePiece( reason: abortReason, }; } finally { + prefixWriter?.flush(); sigintCleanup?.(); if (onAbortSignal && options.abortSignal) { options.abortSignal.removeEventListener('abort', onAbortSignal); diff --git a/src/features/tasks/execute/taskExecution.ts b/src/features/tasks/execute/taskExecution.ts index 66796bd..f9dbb6e 100644 --- a/src/features/tasks/execute/taskExecution.ts +++ b/src/features/tasks/execute/taskExecution.ts @@ -52,7 +52,7 @@ function resolveTaskIssue(issueNumber: number | undefined): ReturnType { - const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix } = options; + const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix, taskColorIndex } = options; const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd); if (!pieceConfig) { @@ -83,6 +83,7 @@ export async function executeTask(options: ExecuteTaskOptions): Promise retryNote, abortSignal, taskPrefix, + taskColorIndex, }); return result.success; } @@ -101,7 +102,7 @@ export async function executeAndCompleteTask( cwd: string, pieceName: string, options?: TaskExecutionOptions, - parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string }, + parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string; taskColorIndex?: number }, ): Promise { const startedAt = new Date().toISOString(); const executionLog: string[] = []; @@ -120,6 +121,7 @@ export async function executeAndCompleteTask( retryNote, abortSignal: parallelOptions?.abortSignal, taskPrefix: parallelOptions?.taskPrefix, + taskColorIndex: parallelOptions?.taskColorIndex, }); const completedAt = new Date().toISOString(); diff --git a/src/features/tasks/execute/types.ts b/src/features/tasks/execute/types.ts index d450607..9081072 100644 --- a/src/features/tasks/execute/types.ts +++ b/src/features/tasks/execute/types.ts @@ -42,6 +42,8 @@ export interface PieceExecutionOptions { abortSignal?: AbortSignal; /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ taskPrefix?: string; + /** Color index for task prefix (cycled mod 4 across concurrent tasks) */ + taskColorIndex?: number; } export interface TaskExecutionOptions { @@ -72,6 +74,8 @@ export interface ExecuteTaskOptions { abortSignal?: AbortSignal; /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ taskPrefix?: string; + /** Color index for task prefix (cycled mod 4 across concurrent tasks) */ + taskColorIndex?: number; } export interface PipelineExecutionOptions { diff --git a/src/shared/ui/TaskPrefixWriter.ts b/src/shared/ui/TaskPrefixWriter.ts new file mode 100644 index 0000000..da6fac4 --- /dev/null +++ b/src/shared/ui/TaskPrefixWriter.ts @@ -0,0 +1,96 @@ +/** + * Line-buffered, prefixed writer for task-level parallel execution. + * + * When multiple tasks run concurrently (takt run --concurrency N), each task's + * output must be identifiable and line-aligned to prevent mid-line interleaving. + * This class wraps process.stdout.write with line buffering and a colored + * `[taskName]` prefix on every non-empty line. + * + * Design mirrors ParallelLogger (movement-level) but targets task-level output: + * - Regular log lines (info, header, status) get the prefix + * - Stream output gets line-buffered then prefixed + * - Empty lines are passed through without prefix + */ + +import { stripAnsi } from '../utils/text.js'; + +/** ANSI color codes for task prefixes (cycled by task index) */ +const TASK_COLORS = ['\x1b[36m', '\x1b[33m', '\x1b[35m', '\x1b[32m'] as const; +const RESET = '\x1b[0m'; + +export interface TaskPrefixWriterOptions { + /** Task name used in the prefix */ + taskName: string; + /** Color index for the prefix (cycled mod 4) */ + colorIndex: number; + /** Override process.stdout.write for testing */ + writeFn?: (text: string) => void; +} + +/** + * Prefixed line writer for a single parallel task. + * + * All output goes through `writeLine` (complete lines) or `writeChunk` + * (buffered partial lines). The prefix `[taskName]` is prepended to every + * non-empty output line. + */ +export class TaskPrefixWriter { + private readonly prefix: string; + private readonly writeFn: (text: string) => void; + private lineBuffer = ''; + + constructor(options: TaskPrefixWriterOptions) { + const color = TASK_COLORS[options.colorIndex % TASK_COLORS.length]; + this.prefix = `${color}[${options.taskName}]${RESET} `; + this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text)); + } + + /** + * Write a complete line with prefix. + * Multi-line text is split and each non-empty line gets the prefix. + */ + writeLine(text: string): void { + const cleaned = stripAnsi(text); + const lines = cleaned.split('\n'); + + for (const line of lines) { + if (line === '') { + this.writeFn('\n'); + } else { + this.writeFn(`${this.prefix}${line}\n`); + } + } + } + + /** + * Write a chunk of streaming text with line buffering. + * Partial lines are buffered until a newline arrives, then output with prefix. + */ + writeChunk(text: string): void { + const cleaned = stripAnsi(text); + const combined = this.lineBuffer + cleaned; + const parts = combined.split('\n'); + + const remainder = parts.pop() ?? ''; + this.lineBuffer = remainder; + + for (const line of parts) { + if (line === '') { + this.writeFn('\n'); + } else { + this.writeFn(`${this.prefix}${line}\n`); + } + } + } + + /** + * Flush any remaining buffered content. + */ + flush(): void { + if (this.lineBuffer !== '') { + this.writeFn(`${this.prefix}${this.lineBuffer}\n`); + this.lineBuffer = ''; + } + } + +} diff --git a/src/shared/ui/index.ts b/src/shared/ui/index.ts index 22df098..faec026 100644 --- a/src/shared/ui/index.ts +++ b/src/shared/ui/index.ts @@ -29,3 +29,5 @@ export { export { Spinner } from './Spinner.js'; export { StreamDisplay, type ProgressInfo } from './StreamDisplay.js'; + +export { TaskPrefixWriter } from './TaskPrefixWriter.js';