github-issue-159-takt-run-noro (#166)

* caffeinate に -d フラグを追加し、ディスプレイスリープ中の App Nap によるプロセス凍結を防止

* takt 対話モードの save_task を takt add と同じ worktree 設定フローに統一

takt 対話モードで Save Task を選択した際に worktree/branch/auto_pr の
設定プロンプトがスキップされ、takt run で clone なしに実行されて成果物が
消失するバグを修正。promptWorktreeSettings() を共通関数として抽出し、
saveTaskFromInteractive() と addTask() の両方から使用するようにした。

* Release v0.9.0

* takt: github-issue-159-takt-run-noro
This commit is contained in:
nrs 2026-02-09 00:24:12 +09:00 committed by GitHub
parent f7d540b069
commit 4b14a58982
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 467 additions and 57 deletions

View File

@ -212,13 +212,25 @@ describe('runAllTasks concurrency', () => {
.mockReturnValueOnce([task1, task2, task3]) .mockReturnValueOnce([task1, task2, task3])
.mockReturnValueOnce([]); .mockReturnValueOnce([]);
// In parallel mode, task start messages go through TaskPrefixWriter → process.stdout.write
const stdoutChunks: string[] = [];
const writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation((chunk: unknown) => {
stdoutChunks.push(String(chunk));
return true;
});
// When // When
await runAllTasks('/project'); await runAllTasks('/project');
writeSpy.mockRestore();
// Then: Task names displayed // Then: Task names displayed with prefix in stdout
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-1 ==='); const allOutput = stdoutChunks.join('');
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-2 ==='); expect(allOutput).toContain('[task-1]');
expect(mockInfo).toHaveBeenCalledWith('=== Task: task-3 ==='); expect(allOutput).toContain('=== Task: task-1 ===');
expect(allOutput).toContain('[task-2]');
expect(allOutput).toContain('=== Task: task-2 ===');
expect(allOutput).toContain('[task-3]');
expect(allOutput).toContain('=== Task: task-3 ===');
expect(mockStatus).toHaveBeenCalledWith('Total', '3'); expect(mockStatus).toHaveBeenCalledWith('Total', '3');
}); });

View File

@ -0,0 +1,184 @@
/**
* Tests for TaskPrefixWriter
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { TaskPrefixWriter } from '../shared/ui/TaskPrefixWriter.js';
describe('TaskPrefixWriter', () => {
let output: string[];
let writeFn: (text: string) => void;
beforeEach(() => {
output = [];
writeFn = (text: string) => output.push(text);
});
describe('constructor', () => {
it('should cycle colors for different colorIndex values', () => {
const writer0 = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
const writer4 = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 4, writeFn });
writer0.writeLine('hello');
writer4.writeLine('hello');
// Both index 0 and 4 should use cyan (\x1b[36m)
expect(output[0]).toContain('\x1b[36m');
expect(output[1]).toContain('\x1b[36m');
});
it('should assign correct colors in order', () => {
const writers = [0, 1, 2, 3].map(
(i) => new TaskPrefixWriter({ taskName: `t${i}`, colorIndex: i, writeFn }),
);
writers.forEach((w) => w.writeLine('x'));
expect(output[0]).toContain('\x1b[36m'); // cyan
expect(output[1]).toContain('\x1b[33m'); // yellow
expect(output[2]).toContain('\x1b[35m'); // magenta
expect(output[3]).toContain('\x1b[32m'); // green
});
});
describe('writeLine', () => {
it('should output single line with prefix', () => {
const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn });
writer.writeLine('Hello World');
expect(output).toHaveLength(1);
expect(output[0]).toContain('[my-task]');
expect(output[0]).toContain('Hello World');
expect(output[0]).toMatch(/\n$/);
});
it('should output empty line as bare newline', () => {
const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn });
writer.writeLine('');
expect(output).toHaveLength(1);
expect(output[0]).toBe('\n');
});
it('should split multi-line text and prefix each non-empty line', () => {
const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn });
writer.writeLine('Line 1\nLine 2\n\nLine 4');
expect(output).toHaveLength(4);
expect(output[0]).toContain('Line 1');
expect(output[1]).toContain('Line 2');
expect(output[2]).toBe('\n'); // empty line
expect(output[3]).toContain('Line 4');
});
it('should strip ANSI codes from input text', () => {
const writer = new TaskPrefixWriter({ taskName: 'my-task', colorIndex: 0, writeFn });
writer.writeLine('\x1b[31mRed Text\x1b[0m');
expect(output).toHaveLength(1);
expect(output[0]).toContain('Red Text');
expect(output[0]).not.toContain('\x1b[31m');
});
});
describe('writeChunk (line buffering)', () => {
it('should buffer partial line and output on newline', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('Hello');
expect(output).toHaveLength(0);
writer.writeChunk(' World\n');
expect(output).toHaveLength(1);
expect(output[0]).toContain('[task-a]');
expect(output[0]).toContain('Hello World');
});
it('should handle multiple lines in single chunk', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('Line 1\nLine 2\n');
expect(output).toHaveLength(2);
expect(output[0]).toContain('Line 1');
expect(output[1]).toContain('Line 2');
});
it('should output empty line without prefix', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('Hello\n\nWorld\n');
expect(output).toHaveLength(3);
expect(output[0]).toContain('Hello');
expect(output[1]).toBe('\n');
expect(output[2]).toContain('World');
});
it('should keep trailing partial in buffer', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('Complete\nPartial');
expect(output).toHaveLength(1);
expect(output[0]).toContain('Complete');
writer.flush();
expect(output).toHaveLength(2);
expect(output[1]).toContain('Partial');
});
it('should strip ANSI codes from streamed chunks', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('\x1b[31mHello');
writer.writeChunk(' World\x1b[0m\n');
expect(output).toHaveLength(1);
expect(output[0]).toContain('Hello World');
expect(output[0]).not.toContain('\x1b[31m');
});
});
describe('flush', () => {
it('should output remaining buffered content with prefix', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('partial content');
expect(output).toHaveLength(0);
writer.flush();
expect(output).toHaveLength(1);
expect(output[0]).toContain('[task-a]');
expect(output[0]).toContain('partial content');
expect(output[0]).toMatch(/\n$/);
});
it('should not output anything when buffer is empty', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('complete line\n');
output.length = 0;
writer.flush();
expect(output).toHaveLength(0);
});
it('should clear buffer after flush', () => {
const writer = new TaskPrefixWriter({ taskName: 'task-a', colorIndex: 0, writeFn });
writer.writeChunk('content');
writer.flush();
output.length = 0;
writer.flush();
expect(output).toHaveLength(0);
});
});
});

View File

@ -102,17 +102,26 @@ describe('runWithWorkerPool', () => {
expect(result).toEqual({ success: 2, fail: 1 }); expect(result).toEqual({ success: 2, fail: 1 });
}); });
it('should display task name for each task', async () => { it('should display task name for each task via prefix writer in parallel mode', async () => {
// Given // Given
const tasks = [createTask('alpha'), createTask('beta')]; const tasks = [createTask('alpha'), createTask('beta')];
const runner = createMockTaskRunner([]); const runner = createMockTaskRunner([]);
const stdoutChunks: string[] = [];
const writeSpy = vi.spyOn(process.stdout, 'write').mockImplementation((chunk: unknown) => {
stdoutChunks.push(String(chunk));
return true;
});
// When // When
await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default', undefined, TEST_POLL_INTERVAL_MS); await runWithWorkerPool(runner as never, tasks, 2, '/cwd', 'default', undefined, TEST_POLL_INTERVAL_MS);
// Then // Then: Task names appear in prefixed stdout output
expect(mockInfo).toHaveBeenCalledWith('=== Task: alpha ==='); writeSpy.mockRestore();
expect(mockInfo).toHaveBeenCalledWith('=== Task: beta ==='); const allOutput = stdoutChunks.join('');
expect(allOutput).toContain('[alpha]');
expect(allOutput).toContain('=== Task: alpha ===');
expect(allOutput).toContain('[beta]');
expect(allOutput).toContain('=== Task: beta ===');
}); });
it('should pass taskPrefix for parallel execution (concurrency > 1)', async () => { it('should pass taskPrefix for parallel execution (concurrency > 1)', async () => {
@ -129,6 +138,7 @@ describe('runWithWorkerPool', () => {
expect(parallelOpts).toEqual({ expect(parallelOpts).toEqual({
abortSignal: expect.any(AbortSignal), abortSignal: expect.any(AbortSignal),
taskPrefix: 'my-task', taskPrefix: 'my-task',
taskColorIndex: 0,
}); });
}); });
@ -146,6 +156,7 @@ describe('runWithWorkerPool', () => {
expect(parallelOpts).toEqual({ expect(parallelOpts).toEqual({
abortSignal: undefined, abortSignal: undefined,
taskPrefix: undefined, taskPrefix: undefined,
taskColorIndex: undefined,
}); });
}); });

View File

@ -13,6 +13,7 @@
import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js'; import type { TaskRunner, TaskInfo } from '../../../infra/task/index.js';
import { info, blankLine } from '../../../shared/ui/index.js'; import { info, blankLine } from '../../../shared/ui/index.js';
import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js';
import { createLogger } from '../../../shared/utils/index.js'; import { createLogger } from '../../../shared/utils/index.js';
import { executeAndCompleteTask } from './taskExecution.js'; import { executeAndCompleteTask } from './taskExecution.js';
import { installSigIntHandler } from './sigintHandler.js'; import { installSigIntHandler } from './sigintHandler.js';
@ -102,6 +103,7 @@ export async function runWithWorkerPool(
const queue = [...initialTasks]; const queue = [...initialTasks];
const active = new Map<Promise<boolean>, TaskInfo>(); const active = new Map<Promise<boolean>, TaskInfo>();
const colorCounter = { value: 0 };
try { try {
while (queue.length > 0 || active.size > 0) { while (queue.length > 0 || active.size > 0) {
@ -109,7 +111,7 @@ export async function runWithWorkerPool(
break; break;
} }
fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController); fillSlots(queue, active, concurrency, taskRunner, cwd, pieceName, options, abortController, colorCounter);
if (active.size === 0) { if (active.size === 0) {
break; break;
@ -171,17 +173,25 @@ function fillSlots(
pieceName: string, pieceName: string,
options: TaskExecutionOptions | undefined, options: TaskExecutionOptions | undefined,
abortController: AbortController, abortController: AbortController,
colorCounter: { value: number },
): void { ): void {
while (active.size < concurrency && queue.length > 0) { while (active.size < concurrency && queue.length > 0) {
const task = queue.shift()!; const task = queue.shift()!;
const isParallel = concurrency > 1; const isParallel = concurrency > 1;
const colorIndex = colorCounter.value++;
if (isParallel) {
const writer = new TaskPrefixWriter({ taskName: task.name, colorIndex });
writer.writeLine(`=== Task: ${task.name} ===`);
} else {
blankLine(); blankLine();
info(`=== Task: ${task.name} ===`); info(`=== Task: ${task.name} ===`);
}
const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, { const promise = executeAndCompleteTask(task, taskRunner, cwd, pieceName, options, {
abortSignal: isParallel ? abortController.signal : undefined, abortSignal: isParallel ? abortController.signal : undefined,
taskPrefix: isParallel ? task.name : undefined, taskPrefix: isParallel ? task.name : undefined,
taskColorIndex: isParallel ? colorIndex : undefined,
}); });
active.set(promise, task); active.set(promise, task);
} }

View File

@ -21,15 +21,16 @@ import {
} from '../../../infra/config/index.js'; } from '../../../infra/config/index.js';
import { isQuietMode } from '../../../shared/context.js'; import { isQuietMode } from '../../../shared/context.js';
import { import {
header, header as rawHeader,
info, info as rawInfo,
warn, warn as rawWarn,
error, error as rawError,
success, success as rawSuccess,
status, status as rawStatus,
blankLine, blankLine as rawBlankLine,
StreamDisplay, StreamDisplay,
} from '../../../shared/ui/index.js'; } from '../../../shared/ui/index.js';
import { TaskPrefixWriter } from '../../../shared/ui/TaskPrefixWriter.js';
import { import {
generateSessionId, generateSessionId,
createSessionLog, createSessionLog,
@ -62,6 +63,80 @@ import { installSigIntHandler } from './sigintHandler.js';
const log = createLogger('piece'); const log = createLogger('piece');
/**
* Output facade routes through TaskPrefixWriter when task prefix is active,
* or falls through to the raw module functions for single-task execution.
*/
interface OutputFns {
header: (title: string) => void;
info: (message: string) => void;
warn: (message: string) => void;
error: (message: string) => void;
success: (message: string) => void;
status: (label: string, value: string, color?: 'green' | 'yellow' | 'red') => void;
blankLine: () => void;
logLine: (text: string) => void;
}
function createOutputFns(prefixWriter: TaskPrefixWriter | undefined): OutputFns {
if (!prefixWriter) {
return {
header: rawHeader,
info: rawInfo,
warn: rawWarn,
error: rawError,
success: rawSuccess,
status: rawStatus,
blankLine: rawBlankLine,
logLine: (text: string) => console.log(text),
};
}
return {
header: (title: string) => prefixWriter.writeLine(`=== ${title} ===`),
info: (message: string) => prefixWriter.writeLine(`[INFO] ${message}`),
warn: (message: string) => prefixWriter.writeLine(`[WARN] ${message}`),
error: (message: string) => prefixWriter.writeLine(`[ERROR] ${message}`),
success: (message: string) => prefixWriter.writeLine(message),
status: (label: string, value: string) => prefixWriter.writeLine(`${label}: ${value}`),
blankLine: () => prefixWriter.writeLine(''),
logLine: (text: string) => prefixWriter.writeLine(text),
};
}
/**
* Create a stream handler that routes all stream events through TaskPrefixWriter.
* Text and tool_output are line-buffered; block events are output per-line with prefix.
*/
function createPrefixedStreamHandler(
writer: TaskPrefixWriter,
): (event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0]) => void {
return (event) => {
switch (event.type) {
case 'text':
writer.writeChunk(event.data.text);
break;
case 'tool_use':
writer.writeLine(`[tool] ${event.data.tool}`);
break;
case 'tool_result': {
const label = event.data.isError ? '✗' : '✓';
writer.writeLine(` ${label} ${event.data.content}`);
break;
}
case 'tool_output':
writer.writeChunk(event.data.output);
break;
case 'thinking':
writer.writeChunk(event.data.thinking);
break;
case 'init':
case 'result':
case 'error':
break;
}
};
}
/** /**
* Truncate string to maximum length * Truncate string to maximum length
*/ */
@ -107,10 +182,16 @@ export async function executePiece(
// projectCwd is where .takt/ lives (project root, not the clone) // projectCwd is where .takt/ lives (project root, not the clone)
const projectCwd = options.projectCwd; const projectCwd = options.projectCwd;
// When taskPrefix is set (parallel execution), route all output through TaskPrefixWriter
const prefixWriter = options.taskPrefix
? new TaskPrefixWriter({ taskName: options.taskPrefix, colorIndex: options.taskColorIndex ?? 0 })
: undefined;
const out = createOutputFns(prefixWriter);
// Always continue from previous sessions (use /clear to reset) // Always continue from previous sessions (use /clear to reset)
log.debug('Continuing session (use /clear to reset)'); log.debug('Continuing session (use /clear to reset)');
header(`${headerPrefix} ${pieceConfig.name}`); out.header(`${headerPrefix} ${pieceConfig.name}`);
const pieceSessionId = generateSessionId(); const pieceSessionId = generateSessionId();
let sessionLog = createSessionLog(task, projectCwd, pieceConfig.name); let sessionLog = createSessionLog(task, projectCwd, pieceConfig.name);
@ -139,10 +220,12 @@ export async function executePiece(
// Track current display for streaming // Track current display for streaming
const displayRef: { current: StreamDisplay | null } = { current: null }; const displayRef: { current: StreamDisplay | null } = { current: null };
// Create stream handler that delegates to UI display // Create stream handler — when prefixWriter is active, use it for line-buffered
const streamHandler = ( // output to prevent mid-line interleaving between concurrent tasks.
event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0] // When not in parallel mode, delegate to StreamDisplay as before.
): void => { const streamHandler = prefixWriter
? createPrefixedStreamHandler(prefixWriter)
: (event: Parameters<ReturnType<StreamDisplay['createHandler']>>[0]): void => {
if (!displayRef.current) return; if (!displayRef.current) return;
if (event.type === 'result') return; if (event.type === 'result') return;
displayRef.current.createHandler()(event); displayRef.current.createHandler()(event);
@ -180,14 +263,14 @@ export async function executePiece(
displayRef.current = null; displayRef.current = null;
} }
blankLine(); out.blankLine();
warn( out.warn(
getLabel('piece.iterationLimit.maxReached', undefined, { getLabel('piece.iterationLimit.maxReached', undefined, {
currentIteration: String(request.currentIteration), currentIteration: String(request.currentIteration),
maxIterations: String(request.maxIterations), maxIterations: String(request.maxIterations),
}) })
); );
info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement })); out.info(getLabel('piece.iterationLimit.currentMovement', undefined, { currentMovement: request.currentMovement }));
if (shouldNotify) { if (shouldNotify) {
playWarningSound(); playWarningSound();
@ -218,7 +301,7 @@ export async function executePiece(
return additionalIterations; return additionalIterations;
} }
warn(getLabel('piece.iterationLimit.invalidInput')); out.warn(getLabel('piece.iterationLimit.invalidInput'));
} }
}; };
@ -228,8 +311,8 @@ export async function executePiece(
displayRef.current.flush(); displayRef.current.flush();
displayRef.current = null; displayRef.current = null;
} }
blankLine(); out.blankLine();
info(request.prompt.trim()); out.info(request.prompt.trim());
const input = await promptInput(getLabel('piece.iterationLimit.userInputPrompt')); const input = await promptInput(getLabel('piece.iterationLimit.userInputPrompt'));
return input && input.trim() ? input.trim() : null; return input && input.trim() ? input.trim() : null;
} }
@ -311,7 +394,7 @@ export async function executePiece(
engine.on('movement:start', (step, iteration, instruction) => { engine.on('movement:start', (step, iteration, instruction) => {
log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration }); log.debug('Movement starting', { step: step.name, persona: step.personaDisplayName, iteration });
currentIteration = iteration; currentIteration = iteration;
info(`[${iteration}/${pieceConfig.maxIterations}] ${step.name} (${step.personaDisplayName})`); out.info(`[${iteration}/${pieceConfig.maxIterations}] ${step.name} (${step.personaDisplayName})`);
// Log prompt content for debugging // Log prompt content for debugging
if (instruction) { if (instruction) {
@ -322,15 +405,18 @@ export async function executePiece(
const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name); const movementIndex = pieceConfig.movements.findIndex((m) => m.name === step.name);
const totalMovements = pieceConfig.movements.length; const totalMovements = pieceConfig.movements.length;
// In parallel mode, StreamDisplay is not used (prefixWriter handles output).
// In single mode, StreamDisplay renders stream events directly.
if (!prefixWriter) {
const quiet = isQuietMode(); const quiet = isQuietMode();
const prefix = options.taskPrefix; const agentLabel = step.personaDisplayName;
const agentLabel = prefix ? `${prefix}:${step.personaDisplayName}` : step.personaDisplayName;
displayRef.current = new StreamDisplay(agentLabel, quiet, { displayRef.current = new StreamDisplay(agentLabel, quiet, {
iteration, iteration,
maxIterations: pieceConfig.maxIterations, maxIterations: pieceConfig.maxIterations,
movementIndex: movementIndex >= 0 ? movementIndex : 0, movementIndex: movementIndex >= 0 ? movementIndex : 0,
totalMovements, totalMovements,
}); });
}
// Write step_start record to NDJSON log // Write step_start record to NDJSON log
const record: NdjsonStepStart = { const record: NdjsonStepStart = {
@ -364,25 +450,26 @@ export async function executePiece(
displayRef.current.flush(); displayRef.current.flush();
displayRef.current = null; displayRef.current = null;
} }
blankLine(); prefixWriter?.flush();
out.blankLine();
if (response.matchedRuleIndex != null && step.rules) { if (response.matchedRuleIndex != null && step.rules) {
const rule = step.rules[response.matchedRuleIndex]; const rule = step.rules[response.matchedRuleIndex];
if (rule) { if (rule) {
const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : ''; const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : '';
status('Status', `${rule.condition}${methodLabel}`); out.status('Status', `${rule.condition}${methodLabel}`);
} else { } else {
status('Status', response.status); out.status('Status', response.status);
} }
} else { } else {
status('Status', response.status); out.status('Status', response.status);
} }
if (response.error) { if (response.error) {
error(`Error: ${response.error}`); out.error(`Error: ${response.error}`);
} }
if (response.sessionId) { if (response.sessionId) {
status('Session', response.sessionId); out.status('Session', response.sessionId);
} }
// Write step_complete record to NDJSON log // Write step_complete record to NDJSON log
@ -408,8 +495,8 @@ export async function executePiece(
engine.on('movement:report', (_step, filePath, fileName) => { engine.on('movement:report', (_step, filePath, fileName) => {
const content = readFileSync(filePath, 'utf-8'); const content = readFileSync(filePath, 'utf-8');
console.log(`\n📄 Report: ${fileName}\n`); out.logLine(`\n📄 Report: ${fileName}\n`);
console.log(content); out.logLine(content);
}); });
engine.on('piece:complete', (state) => { engine.on('piece:complete', (state) => {
@ -445,8 +532,8 @@ export async function executePiece(
: ''; : '';
const elapsedDisplay = elapsed ? `, ${elapsed}` : ''; const elapsedDisplay = elapsed ? `, ${elapsed}` : '';
success(`Piece completed (${state.iteration} iterations${elapsedDisplay})`); out.success(`Piece completed (${state.iteration} iterations${elapsedDisplay})`);
info(`Session log: ${ndjsonLogPath}`); out.info(`Session log: ${ndjsonLogPath}`);
if (shouldNotify) { if (shouldNotify) {
notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) })); notifySuccess('TAKT', getLabel('piece.notifyComplete', undefined, { iteration: String(state.iteration) }));
} }
@ -459,6 +546,7 @@ export async function executePiece(
displayRef.current.flush(); displayRef.current.flush();
displayRef.current = null; displayRef.current = null;
} }
prefixWriter?.flush();
abortReason = reason; abortReason = reason;
sessionLog = finalizeSessionLog(sessionLog, 'aborted'); sessionLog = finalizeSessionLog(sessionLog, 'aborted');
@ -492,8 +580,8 @@ export async function executePiece(
: ''; : '';
const elapsedDisplay = elapsed ? ` (${elapsed})` : ''; const elapsedDisplay = elapsed ? ` (${elapsed})` : '';
error(`Piece aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`); out.error(`Piece aborted after ${state.iteration} iterations${elapsedDisplay}: ${reason}`);
info(`Session log: ${ndjsonLogPath}`); out.info(`Session log: ${ndjsonLogPath}`);
if (shouldNotify) { if (shouldNotify) {
notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason })); notifyError('TAKT', getLabel('piece.notifyAbort', undefined, { reason }));
} }
@ -536,6 +624,7 @@ export async function executePiece(
reason: abortReason, reason: abortReason,
}; };
} finally { } finally {
prefixWriter?.flush();
sigintCleanup?.(); sigintCleanup?.();
if (onAbortSignal && options.abortSignal) { if (onAbortSignal && options.abortSignal) {
options.abortSignal.removeEventListener('abort', onAbortSignal); options.abortSignal.removeEventListener('abort', onAbortSignal);

View File

@ -52,7 +52,7 @@ function resolveTaskIssue(issueNumber: number | undefined): ReturnType<typeof fe
* Execute a single task with piece. * Execute a single task with piece.
*/ */
export async function executeTask(options: ExecuteTaskOptions): Promise<boolean> { export async function executeTask(options: ExecuteTaskOptions): Promise<boolean> {
const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix } = options; const { task, cwd, pieceIdentifier, projectCwd, agentOverrides, interactiveUserInput, interactiveMetadata, startMovement, retryNote, abortSignal, taskPrefix, taskColorIndex } = options;
const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd); const pieceConfig = loadPieceByIdentifier(pieceIdentifier, projectCwd);
if (!pieceConfig) { if (!pieceConfig) {
@ -83,6 +83,7 @@ export async function executeTask(options: ExecuteTaskOptions): Promise<boolean>
retryNote, retryNote,
abortSignal, abortSignal,
taskPrefix, taskPrefix,
taskColorIndex,
}); });
return result.success; return result.success;
} }
@ -101,7 +102,7 @@ export async function executeAndCompleteTask(
cwd: string, cwd: string,
pieceName: string, pieceName: string,
options?: TaskExecutionOptions, options?: TaskExecutionOptions,
parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string }, parallelOptions?: { abortSignal?: AbortSignal; taskPrefix?: string; taskColorIndex?: number },
): Promise<boolean> { ): Promise<boolean> {
const startedAt = new Date().toISOString(); const startedAt = new Date().toISOString();
const executionLog: string[] = []; const executionLog: string[] = [];
@ -120,6 +121,7 @@ export async function executeAndCompleteTask(
retryNote, retryNote,
abortSignal: parallelOptions?.abortSignal, abortSignal: parallelOptions?.abortSignal,
taskPrefix: parallelOptions?.taskPrefix, taskPrefix: parallelOptions?.taskPrefix,
taskColorIndex: parallelOptions?.taskColorIndex,
}); });
const completedAt = new Date().toISOString(); const completedAt = new Date().toISOString();

View File

@ -42,6 +42,8 @@ export interface PieceExecutionOptions {
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
/** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */
taskPrefix?: string; taskPrefix?: string;
/** Color index for task prefix (cycled mod 4 across concurrent tasks) */
taskColorIndex?: number;
} }
export interface TaskExecutionOptions { export interface TaskExecutionOptions {
@ -72,6 +74,8 @@ export interface ExecuteTaskOptions {
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
/** Task name prefix for parallel execution output (e.g. "[task-name] output...") */ /** Task name prefix for parallel execution output (e.g. "[task-name] output...") */
taskPrefix?: string; taskPrefix?: string;
/** Color index for task prefix (cycled mod 4 across concurrent tasks) */
taskColorIndex?: number;
} }
export interface PipelineExecutionOptions { export interface PipelineExecutionOptions {

View File

@ -0,0 +1,96 @@
/**
* Line-buffered, prefixed writer for task-level parallel execution.
*
* When multiple tasks run concurrently (takt run --concurrency N), each task's
* output must be identifiable and line-aligned to prevent mid-line interleaving.
* This class wraps process.stdout.write with line buffering and a colored
* `[taskName]` prefix on every non-empty line.
*
* Design mirrors ParallelLogger (movement-level) but targets task-level output:
* - Regular log lines (info, header, status) get the prefix
* - Stream output gets line-buffered then prefixed
* - Empty lines are passed through without prefix
*/
import { stripAnsi } from '../utils/text.js';
/** ANSI color codes for task prefixes (cycled by task index) */
const TASK_COLORS = ['\x1b[36m', '\x1b[33m', '\x1b[35m', '\x1b[32m'] as const;
const RESET = '\x1b[0m';
export interface TaskPrefixWriterOptions {
/** Task name used in the prefix */
taskName: string;
/** Color index for the prefix (cycled mod 4) */
colorIndex: number;
/** Override process.stdout.write for testing */
writeFn?: (text: string) => void;
}
/**
* Prefixed line writer for a single parallel task.
*
* All output goes through `writeLine` (complete lines) or `writeChunk`
* (buffered partial lines). The prefix `[taskName]` is prepended to every
* non-empty output line.
*/
export class TaskPrefixWriter {
private readonly prefix: string;
private readonly writeFn: (text: string) => void;
private lineBuffer = '';
constructor(options: TaskPrefixWriterOptions) {
const color = TASK_COLORS[options.colorIndex % TASK_COLORS.length];
this.prefix = `${color}[${options.taskName}]${RESET} `;
this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text));
}
/**
* Write a complete line with prefix.
* Multi-line text is split and each non-empty line gets the prefix.
*/
writeLine(text: string): void {
const cleaned = stripAnsi(text);
const lines = cleaned.split('\n');
for (const line of lines) {
if (line === '') {
this.writeFn('\n');
} else {
this.writeFn(`${this.prefix}${line}\n`);
}
}
}
/**
* Write a chunk of streaming text with line buffering.
* Partial lines are buffered until a newline arrives, then output with prefix.
*/
writeChunk(text: string): void {
const cleaned = stripAnsi(text);
const combined = this.lineBuffer + cleaned;
const parts = combined.split('\n');
const remainder = parts.pop() ?? '';
this.lineBuffer = remainder;
for (const line of parts) {
if (line === '') {
this.writeFn('\n');
} else {
this.writeFn(`${this.prefix}${line}\n`);
}
}
}
/**
* Flush any remaining buffered content.
*/
flush(): void {
if (this.lineBuffer !== '') {
this.writeFn(`${this.prefix}${this.lineBuffer}\n`);
this.lineBuffer = '';
}
}
}

View File

@ -29,3 +29,5 @@ export {
export { Spinner } from './Spinner.js'; export { Spinner } from './Spinner.js';
export { StreamDisplay, type ProgressInfo } from './StreamDisplay.js'; export { StreamDisplay, type ProgressInfo } from './StreamDisplay.js';
export { TaskPrefixWriter } from './TaskPrefixWriter.js';