feat: ルールマッチ方法の可視化と5段階フォールバック検出を実装

- RuleMatchMethod型を追加し、検出方法(aggregate/phase3_tag/phase1_tag/ai_judge/ai_judge_fallback)を記録
- detectMatchedRuleを5段階フォールバックに拡張(Phase3タグ→Phase1タグ→AI judge→全条件AI judge)
- matchedRuleMethodをセッションログとUI出力の両方に表示
- Phase 3のmaxTurnsを3に増加
- ParallelLoggerによるパラレルステップのプレフィックス付き出力を追加
This commit is contained in:
nrslib 2026-01-30 17:07:18 +09:00
parent b10773d310
commit 9c05b45e1e
7 changed files with 741 additions and 19 deletions

View File

@ -0,0 +1,417 @@
/**
* Tests for parallel-logger module
*/
import { describe, it, expect, beforeEach } from 'vitest';
import { ParallelLogger } from '../workflow/parallel-logger.js';
import type { StreamEvent } from '../claude/types.js';
describe('ParallelLogger', () => {
let output: string[];
let writeFn: (text: string) => void;
beforeEach(() => {
output = [];
writeFn = (text: string) => output.push(text);
});
describe('buildPrefix', () => {
it('should build colored prefix with padding', () => {
const logger = new ParallelLogger({
subStepNames: ['arch-review', 'sec'],
writeFn,
});
// arch-review is longest (11 chars), sec gets padding
const prefix = logger.buildPrefix('sec', 1);
// yellow color for index 1
expect(prefix).toContain('[sec]');
expect(prefix).toContain('\x1b[33m'); // yellow
expect(prefix).toContain('\x1b[0m'); // reset
// 11 - 3 = 8 spaces of padding
expect(prefix).toMatch(/\x1b\[0m {8} $/);
});
it('should cycle colors for index >= 4', () => {
const logger = new ParallelLogger({
subStepNames: ['a', 'b', 'c', 'd', 'e'],
writeFn,
});
const prefix0 = logger.buildPrefix('a', 0);
const prefix4 = logger.buildPrefix('e', 4);
// Both should use cyan (\x1b[36m)
expect(prefix0).toContain('\x1b[36m');
expect(prefix4).toContain('\x1b[36m');
});
it('should assign correct colors in order', () => {
const logger = new ParallelLogger({
subStepNames: ['a', 'b', 'c', 'd'],
writeFn,
});
expect(logger.buildPrefix('a', 0)).toContain('\x1b[36m'); // cyan
expect(logger.buildPrefix('b', 1)).toContain('\x1b[33m'); // yellow
expect(logger.buildPrefix('c', 2)).toContain('\x1b[35m'); // magenta
expect(logger.buildPrefix('d', 3)).toContain('\x1b[32m'); // green
});
it('should have no extra padding for longest name', () => {
const logger = new ParallelLogger({
subStepNames: ['long-name', 'short'],
writeFn,
});
const prefix = logger.buildPrefix('long-name', 0);
// No padding needed (0 spaces)
expect(prefix).toMatch(/\x1b\[0m $/);
});
});
describe('text event line buffering', () => {
it('should buffer partial line and output on newline', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
const handler = logger.createStreamHandler('step-a', 0);
// Partial text (no newline)
handler({ type: 'text', data: { text: 'Hello' } } as StreamEvent);
expect(output).toHaveLength(0);
// Complete the line
handler({ type: 'text', data: { text: ' World\n' } } as StreamEvent);
expect(output).toHaveLength(1);
expect(output[0]).toContain('[step-a]');
expect(output[0]).toContain('Hello World');
expect(output[0]).toMatch(/\n$/);
});
it('should handle multiple lines in single text event', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
const handler = logger.createStreamHandler('step-a', 0);
handler({ type: 'text', data: { text: 'Line 1\nLine 2\n' } } as StreamEvent);
expect(output).toHaveLength(2);
expect(output[0]).toContain('Line 1');
expect(output[1]).toContain('Line 2');
});
it('should output empty line without prefix', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
const handler = logger.createStreamHandler('step-a', 0);
handler({ type: 'text', data: { text: 'Hello\n\nWorld\n' } } as StreamEvent);
expect(output).toHaveLength(3);
expect(output[0]).toContain('Hello');
expect(output[1]).toBe('\n'); // empty line without prefix
expect(output[2]).toContain('World');
});
it('should keep trailing partial in buffer', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
const handler = logger.createStreamHandler('step-a', 0);
handler({ type: 'text', data: { text: 'Complete\nPartial' } } as StreamEvent);
expect(output).toHaveLength(1);
expect(output[0]).toContain('Complete');
// Flush remaining
logger.flush();
expect(output).toHaveLength(2);
expect(output[1]).toContain('Partial');
});
});
describe('block events (tool_use, tool_result, tool_output, thinking)', () => {
it('should prefix tool_use events', () => {
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
handler({
type: 'tool_use',
data: { tool: 'Read', input: {}, id: '1' },
} as StreamEvent);
expect(output).toHaveLength(1);
expect(output[0]).toContain('[sub-a]');
expect(output[0]).toContain('[tool] Read');
});
it('should prefix tool_result events', () => {
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
handler({
type: 'tool_result',
data: { content: 'File content here', isError: false },
} as StreamEvent);
expect(output).toHaveLength(1);
expect(output[0]).toContain('File content here');
});
it('should prefix multi-line tool output', () => {
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
handler({
type: 'tool_output',
data: { tool: 'Bash', output: 'line1\nline2' },
} as StreamEvent);
expect(output).toHaveLength(2);
expect(output[0]).toContain('line1');
expect(output[1]).toContain('line2');
});
it('should prefix thinking events', () => {
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
handler({
type: 'thinking',
data: { thinking: 'Considering options...' },
} as StreamEvent);
expect(output).toHaveLength(1);
expect(output[0]).toContain('Considering options...');
});
});
describe('delegated events (init, result, error)', () => {
it('should delegate init event to parent callback', () => {
const parentEvents: StreamEvent[] = [];
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
parentOnStream: (event) => parentEvents.push(event),
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
const initEvent: StreamEvent = {
type: 'init',
data: { model: 'claude-3', sessionId: 'sess-1' },
};
handler(initEvent);
expect(parentEvents).toHaveLength(1);
expect(parentEvents[0]).toBe(initEvent);
expect(output).toHaveLength(0); // Not written to stdout
});
it('should delegate result event to parent callback', () => {
const parentEvents: StreamEvent[] = [];
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
parentOnStream: (event) => parentEvents.push(event),
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
const resultEvent: StreamEvent = {
type: 'result',
data: { result: 'done', sessionId: 'sess-1', success: true },
};
handler(resultEvent);
expect(parentEvents).toHaveLength(1);
expect(parentEvents[0]).toBe(resultEvent);
});
it('should delegate error event to parent callback', () => {
const parentEvents: StreamEvent[] = [];
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
parentOnStream: (event) => parentEvents.push(event),
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
const errorEvent: StreamEvent = {
type: 'error',
data: { message: 'Something went wrong' },
};
handler(errorEvent);
expect(parentEvents).toHaveLength(1);
expect(parentEvents[0]).toBe(errorEvent);
});
it('should not crash when no parent callback for delegated events', () => {
const logger = new ParallelLogger({
subStepNames: ['sub-a'],
writeFn,
});
const handler = logger.createStreamHandler('sub-a', 0);
// Should not throw
handler({ type: 'init', data: { model: 'claude-3', sessionId: 'sess-1' } } as StreamEvent);
handler({ type: 'result', data: { result: 'done', sessionId: 'sess-1', success: true } } as StreamEvent);
handler({ type: 'error', data: { message: 'err' } } as StreamEvent);
expect(output).toHaveLength(0);
});
});
describe('flush', () => {
it('should output remaining buffered content', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a', 'step-b'],
writeFn,
});
const handlerA = logger.createStreamHandler('step-a', 0);
const handlerB = logger.createStreamHandler('step-b', 1);
handlerA({ type: 'text', data: { text: 'partial-a' } } as StreamEvent);
handlerB({ type: 'text', data: { text: 'partial-b' } } as StreamEvent);
expect(output).toHaveLength(0);
logger.flush();
expect(output).toHaveLength(2);
expect(output[0]).toContain('partial-a');
expect(output[1]).toContain('partial-b');
});
it('should not output empty buffers', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a', 'step-b'],
writeFn,
});
const handlerA = logger.createStreamHandler('step-a', 0);
handlerA({ type: 'text', data: { text: 'content\n' } } as StreamEvent);
output.length = 0; // Clear previous output
logger.flush();
expect(output).toHaveLength(0); // Nothing to flush
});
});
describe('printSummary', () => {
it('should print completion summary', () => {
const logger = new ParallelLogger({
subStepNames: ['arch-review', 'security-review'],
writeFn,
});
logger.printSummary('parallel-review', [
{ name: 'arch-review', condition: 'approved' },
{ name: 'security-review', condition: 'rejected' },
]);
const fullOutput = output.join('');
expect(fullOutput).toContain('parallel-review results');
expect(fullOutput).toContain('arch-review:');
expect(fullOutput).toContain('approved');
expect(fullOutput).toContain('security-review:');
expect(fullOutput).toContain('rejected');
// Header and footer contain ─
expect(fullOutput).toContain('─');
});
it('should show (no result) for undefined condition', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
logger.printSummary('parallel-step', [
{ name: 'step-a', condition: undefined },
]);
const fullOutput = output.join('');
expect(fullOutput).toContain('(no result)');
});
it('should right-pad sub-step names to align results', () => {
const logger = new ParallelLogger({
subStepNames: ['short', 'very-long-name'],
writeFn,
});
logger.printSummary('test', [
{ name: 'short', condition: 'done' },
{ name: 'very-long-name', condition: 'done' },
]);
// Find the result lines (indented with 2 spaces)
const resultLines = output.filter((l) => l.startsWith(' '));
expect(resultLines).toHaveLength(2);
// Both 'done' values should be at the same column
const doneIndex0 = resultLines[0]!.indexOf('done');
const doneIndex1 = resultLines[1]!.indexOf('done');
expect(doneIndex0).toBe(doneIndex1);
});
it('should flush remaining buffers before printing summary', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a'],
writeFn,
});
const handler = logger.createStreamHandler('step-a', 0);
// Leave partial content in buffer
handler({ type: 'text', data: { text: 'trailing content' } } as StreamEvent);
logger.printSummary('test', [
{ name: 'step-a', condition: 'done' },
]);
// First output should be the flushed buffer
expect(output[0]).toContain('trailing content');
// Then the summary
const fullOutput = output.join('');
expect(fullOutput).toContain('test results');
});
});
describe('interleaved output from multiple sub-steps', () => {
it('should correctly interleave prefixed output', () => {
const logger = new ParallelLogger({
subStepNames: ['step-a', 'step-b'],
writeFn,
});
const handlerA = logger.createStreamHandler('step-a', 0);
const handlerB = logger.createStreamHandler('step-b', 1);
handlerA({ type: 'text', data: { text: 'A output\n' } } as StreamEvent);
handlerB({ type: 'text', data: { text: 'B output\n' } } as StreamEvent);
handlerA({ type: 'text', data: { text: 'A second\n' } } as StreamEvent);
expect(output).toHaveLength(3);
expect(output[0]).toContain('[step-a]');
expect(output[0]).toContain('A output');
expect(output[1]).toContain('[step-b]');
expect(output[1]).toContain('B output');
expect(output[2]).toContain('[step-a]');
expect(output[2]).toContain('A second');
});
});
});

View File

@ -190,6 +190,7 @@ export async function executeWorkflow(
step: step.name,
status: response.status,
matchedRuleIndex: response.matchedRuleIndex,
matchedRuleMethod: response.matchedRuleMethod,
contentLength: response.content.length,
sessionId: response.sessionId,
error: response.error,
@ -203,7 +204,8 @@ export async function executeWorkflow(
if (response.matchedRuleIndex != null && step.rules) {
const rule = step.rules[response.matchedRuleIndex];
if (rule) {
status('Status', rule.condition);
const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : '';
status('Status', `${rule.condition}${methodLabel}`);
} else {
status('Status', response.status);
}

View File

@ -2,6 +2,7 @@
export type {
AgentType,
Status,
RuleMatchMethod,
ReportConfig,
ReportObjectConfig,
AgentResponse,

View File

@ -17,6 +17,14 @@ export type Status =
| 'interrupted'
| 'answer';
/** How a rule match was detected */
export type RuleMatchMethod =
| 'aggregate'
| 'phase3_tag'
| 'phase1_tag'
| 'ai_judge'
| 'ai_judge_fallback';
/** Response from an agent execution */
export interface AgentResponse {
agent: string;
@ -28,6 +36,8 @@ export interface AgentResponse {
error?: string;
/** Matched rule index (0-based) when rules-based detection was used */
matchedRuleIndex?: number;
/** How the rule match was detected */
matchedRuleMethod?: RuleMatchMethod;
}
/** Session state for workflow execution */

View File

@ -24,6 +24,10 @@ export interface SessionLog {
timestamp: string;
content: string;
error?: string;
/** Matched rule index (0-based) when rules-based detection was used */
matchedRuleIndex?: number;
/** How the rule match was detected */
matchedRuleMethod?: string;
}>;
}
@ -88,6 +92,8 @@ export function addToSessionLog(
timestamp: response.timestamp.toISOString(),
content: response.content,
...(response.error ? { error: response.error } : {}),
...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}),
...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}),
});
log.iterations++;
}

View File

@ -10,6 +10,7 @@ import type {
WorkflowState,
WorkflowStep,
AgentResponse,
RuleMatchMethod,
} from '../models/types.js';
import { runAgent, type RunAgentOptions } from '../agents/runner.js';
import { COMPLETE_STEP, ABORT_STEP, ERROR_MESSAGES } from './constants.js';
@ -19,6 +20,7 @@ import { detectRuleIndex, callAiJudge } from '../claude/client.js';
import { buildInstruction as buildInstructionFromTemplate, buildReportInstruction as buildReportInstructionFromTemplate, buildStatusJudgmentInstruction as buildStatusJudgmentInstructionFromTemplate, isReportObjectConfig } from './instruction-builder.js';
import { LoopDetector } from './loop-detector.js';
import { handleBlocked } from './blocked-handler.js';
import { ParallelLogger } from './parallel-logger.js';
import {
createInitialState,
addUserInput,
@ -263,8 +265,10 @@ export class WorkflowEngine extends EventEmitter {
* Detect matched rule for a step's response.
* Evaluation order (first match wins):
* 1. Aggregate conditions: all()/any() evaluate sub-step results
* 2. Standard [STEP:N] tag detection (from tagContent, i.e. Phase 3 output)
* 3. ai() condition evaluation via AI judge (from agentContent, i.e. Phase 1 output)
* 2. Tag detection from Phase 3 output
* 3. Tag detection from Phase 1 output (fallback)
* 4. ai() condition evaluation via AI judge
* 5. All-conditions AI judge (final fallback)
*
* Returns undefined for steps without rules.
* Throws if rules exist but no rule matched (Fail Fast).
@ -273,27 +277,41 @@ export class WorkflowEngine extends EventEmitter {
* @param agentContent - Phase 1 output (main execution)
* @param tagContent - Phase 3 output (status judgment); empty string skips tag detection
*/
private async detectMatchedRule(step: WorkflowStep, agentContent: string, tagContent: string): Promise<number | undefined> {
private async detectMatchedRule(step: WorkflowStep, agentContent: string, tagContent: string): Promise<{ index: number; method: RuleMatchMethod } | undefined> {
if (!step.rules || step.rules.length === 0) return undefined;
// 1. Aggregate conditions (all/any) — only meaningful for parallel parent steps
const aggIndex = this.evaluateAggregateConditions(step);
if (aggIndex >= 0) {
return aggIndex;
return { index: aggIndex, method: 'aggregate' };
}
// 2. Standard tag detection (from Phase 3 output)
// 2. Tag detection from Phase 3 output
if (tagContent) {
const ruleIndex = detectRuleIndex(tagContent, step.name);
if (ruleIndex >= 0 && ruleIndex < step.rules.length) {
return ruleIndex;
return { index: ruleIndex, method: 'phase3_tag' };
}
}
// 3. AI judge fallback (from Phase 1 output)
// 3. Tag detection from Phase 1 output (fallback)
if (agentContent) {
const ruleIndex = detectRuleIndex(agentContent, step.name);
if (ruleIndex >= 0 && ruleIndex < step.rules.length) {
return { index: ruleIndex, method: 'phase1_tag' };
}
}
// 4. AI judge for ai() conditions only
const aiRuleIndex = await this.evaluateAiConditions(step, agentContent);
if (aiRuleIndex >= 0) {
return aiRuleIndex;
return { index: aiRuleIndex, method: 'ai_judge' };
}
// 5. AI judge for all conditions (final fallback)
const fallbackIndex = await this.evaluateAllConditionsViaAiJudge(step, agentContent);
if (fallbackIndex >= 0) {
return { index: fallbackIndex, method: 'ai_judge_fallback' };
}
throw new Error(`Status not found for step "${step.name}": no rule matched after all detection phases`);
@ -381,9 +399,10 @@ export class WorkflowEngine extends EventEmitter {
tagContent = await this.runStatusJudgmentPhase(step);
}
const matchedRuleIndex = await this.detectMatchedRule(step, response.content, tagContent);
if (matchedRuleIndex != null) {
response = { ...response, matchedRuleIndex };
const match = await this.detectMatchedRule(step, response.content, tagContent);
if (match) {
log.debug('Rule matched', { step: step.name, ruleIndex: match.index, method: match.method });
response = { ...response, matchedRuleIndex: match.index, matchedRuleMethod: match.method };
}
this.state.stepOutputs.set(step.name, response);
@ -454,7 +473,7 @@ export class WorkflowEngine extends EventEmitter {
const judgmentOptions = this.buildResumeOptions(step, sessionId, {
allowedTools: [],
maxTurns: 1,
maxTurns: 3,
});
const judgmentResponse = await runAgent(step.agent, judgmentInstruction, judgmentOptions);
@ -469,6 +488,9 @@ export class WorkflowEngine extends EventEmitter {
/**
* Run a parallel step: execute all sub-steps concurrently, then aggregate results.
* The aggregated output becomes the parent step's response for rules evaluation.
*
* When onStream is provided, uses ParallelLogger to prefix each sub-step's
* output with `[name]` for readable interleaved display.
*/
private async runParallelStep(step: WorkflowStep): Promise<{ response: AgentResponse; instruction: string }> {
const subSteps = step.parallel!;
@ -479,14 +501,28 @@ export class WorkflowEngine extends EventEmitter {
stepIteration,
});
// Create parallel logger for prefixed output (only when streaming is enabled)
const parallelLogger = this.options.onStream
? new ParallelLogger({
subStepNames: subSteps.map((s) => s.name),
parentOnStream: this.options.onStream,
})
: undefined;
// Run all sub-steps concurrently
const subResults = await Promise.all(
subSteps.map(async (subStep) => {
subSteps.map(async (subStep, index) => {
const subIteration = incrementStepIteration(this.state, subStep.name);
const subInstruction = this.buildInstruction(subStep, subIteration);
// Phase 1: main execution (Write excluded if sub-step has report)
const agentOptions = this.buildAgentOptions(subStep);
// Override onStream with parallel logger's prefixed handler
if (parallelLogger) {
agentOptions.onStream = parallelLogger.createStreamHandler(subStep.name, index);
}
const subResponse = await runAgent(subStep.agent, subInstruction, agentOptions);
this.updateAgentSession(subStep.agent, subResponse.sessionId);
@ -501,9 +537,9 @@ export class WorkflowEngine extends EventEmitter {
subTagContent = await this.runStatusJudgmentPhase(subStep);
}
const matchedRuleIndex = await this.detectMatchedRule(subStep, subResponse.content, subTagContent);
const finalResponse = matchedRuleIndex != null
? { ...subResponse, matchedRuleIndex }
const match = await this.detectMatchedRule(subStep, subResponse.content, subTagContent);
const finalResponse = match
? { ...subResponse, matchedRuleIndex: match.index, matchedRuleMethod: match.method }
: subResponse;
this.state.stepOutputs.set(subStep.name, finalResponse);
@ -513,6 +549,19 @@ export class WorkflowEngine extends EventEmitter {
}),
);
// Print completion summary
if (parallelLogger) {
parallelLogger.printSummary(
step.name,
subResults.map((r) => ({
name: r.subStep.name,
condition: r.response.matchedRuleIndex != null && r.subStep.rules
? r.subStep.rules[r.response.matchedRuleIndex]?.condition
: undefined,
})),
);
}
// Aggregate sub-step outputs into parent step's response
const aggregatedContent = subResults
.map((r) => `## ${r.subStep.name}\n${r.response.content}`)
@ -523,14 +572,14 @@ export class WorkflowEngine extends EventEmitter {
.join('\n\n');
// Parent step uses aggregate conditions, so tagContent is empty
const matchedRuleIndex = await this.detectMatchedRule(step, aggregatedContent, '');
const match = await this.detectMatchedRule(step, aggregatedContent, '');
const aggregatedResponse: AgentResponse = {
agent: step.name,
status: 'done',
content: aggregatedContent,
timestamp: new Date(),
...(matchedRuleIndex != null && { matchedRuleIndex }),
...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }),
};
this.state.stepOutputs.set(step.name, aggregatedResponse);
@ -580,6 +629,37 @@ export class WorkflowEngine extends EventEmitter {
return -1;
}
/**
* Final fallback: evaluate ALL rule conditions via AI judge.
* Unlike evaluateAiConditions (which only handles ai() flagged rules),
* this sends every rule's condition text to the judge.
* Returns the 0-based rule index, or -1 if no match.
*/
private async evaluateAllConditionsViaAiJudge(step: WorkflowStep, agentOutput: string): Promise<number> {
if (!step.rules || step.rules.length === 0) return -1;
const conditions = step.rules.map((rule, i) => ({ index: i, text: rule.condition }));
log.debug('Evaluating all conditions via AI judge (final fallback)', {
step: step.name,
conditionCount: conditions.length,
});
const judgeResult = await callAiJudge(agentOutput, conditions, { cwd: this.cwd });
if (judgeResult >= 0 && judgeResult < conditions.length) {
log.debug('AI judge (fallback) matched condition', {
step: step.name,
ruleIndex: judgeResult,
condition: conditions[judgeResult]!.text,
});
return judgeResult;
}
log.debug('AI judge (fallback) did not match any condition', { step: step.name });
return -1;
}
/**
* Determine next step for a completed step using rules-based routing.
*/

View File

@ -0,0 +1,206 @@
/**
* Parallel step log display
*
* Provides prefixed, color-coded interleaved output for parallel sub-steps.
* Each sub-step's stream output gets a `[name]` prefix with right-padding
* aligned to the longest sub-step name.
*/
import type { StreamCallback, StreamEvent } from '../claude/types.js';
/** ANSI color codes for sub-step prefixes (cycled in order) */
const COLORS = ['\x1b[36m', '\x1b[33m', '\x1b[35m', '\x1b[32m'] as const; // cyan, yellow, magenta, green
const RESET = '\x1b[0m';
export interface ParallelLoggerOptions {
/** Sub-step names (used to calculate prefix width) */
subStepNames: string[];
/** Parent onStream callback to delegate non-prefixed events */
parentOnStream?: StreamCallback;
/** Override process.stdout.write for testing */
writeFn?: (text: string) => void;
}
/**
* Logger for parallel step execution.
*
* Creates per-sub-step StreamCallback wrappers that:
* - Buffer partial lines until newline
* - Prepend colored `[name]` prefix to each complete line
* - Delegate init/result/error events to the parent callback
*/
export class ParallelLogger {
private readonly maxNameLength: number;
private readonly lineBuffers: Map<string, string> = new Map();
private readonly parentOnStream?: StreamCallback;
private readonly writeFn: (text: string) => void;
constructor(options: ParallelLoggerOptions) {
this.maxNameLength = Math.max(...options.subStepNames.map((n) => n.length));
this.parentOnStream = options.parentOnStream;
this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text));
for (const name of options.subStepNames) {
this.lineBuffers.set(name, '');
}
}
/**
* Build the colored prefix string for a sub-step.
* Format: `\x1b[COLORm[name]\x1b[0m` + padding spaces
*/
buildPrefix(name: string, index: number): string {
const color = COLORS[index % COLORS.length]!;
const padding = ' '.repeat(this.maxNameLength - name.length);
return `${color}[${name}]${RESET}${padding} `;
}
/**
* Create a StreamCallback wrapper for a specific sub-step.
*
* - `text`: buffered line-by-line with prefix
* - `tool_use`, `tool_result`, `tool_output`, `thinking`: prefixed per-line, no buffering
* - `init`, `result`, `error`: delegated to parent callback (no prefix)
*/
createStreamHandler(subStepName: string, index: number): StreamCallback {
const prefix = this.buildPrefix(subStepName, index);
return (event: StreamEvent) => {
switch (event.type) {
case 'text':
this.handleTextEvent(subStepName, prefix, event.data.text);
break;
case 'tool_use':
case 'tool_result':
case 'tool_output':
case 'thinking':
this.handleBlockEvent(prefix, event);
break;
case 'init':
case 'result':
case 'error':
// Delegate to parent without prefix
this.parentOnStream?.(event);
break;
}
};
}
/**
* Handle text event with line buffering.
* Buffer until newline, then output prefixed complete lines.
* Empty lines get no prefix per spec.
*/
private handleTextEvent(name: string, prefix: string, text: string): void {
const buffer = this.lineBuffers.get(name) ?? '';
const combined = buffer + text;
const parts = combined.split('\n');
// Last part is incomplete (no trailing newline) — keep in buffer
this.lineBuffers.set(name, parts.pop()!);
// Output all complete lines
for (const line of parts) {
if (line === '') {
this.writeFn('\n');
} else {
this.writeFn(`${prefix}${line}\n`);
}
}
}
/**
* Handle block events (tool_use, tool_result, tool_output, thinking).
* Output with prefix, splitting multi-line content.
*/
private handleBlockEvent(prefix: string, event: StreamEvent): void {
let text: string;
switch (event.type) {
case 'tool_use':
text = `[tool] ${event.data.tool}`;
break;
case 'tool_result':
text = event.data.content;
break;
case 'tool_output':
text = event.data.output;
break;
case 'thinking':
text = event.data.thinking;
break;
default:
return;
}
for (const line of text.split('\n')) {
if (line === '') {
this.writeFn('\n');
} else {
this.writeFn(`${prefix}${line}\n`);
}
}
}
/**
* Flush remaining line buffers for all sub-steps.
* Call after all sub-steps complete to output any trailing partial lines.
*/
flush(): void {
// Build prefixes for flush — need index mapping
// Since we don't store index, iterate lineBuffers in insertion order
// (Map preserves insertion order, matching subStepNames order)
let index = 0;
for (const [name, buffer] of this.lineBuffers) {
if (buffer !== '') {
const prefix = this.buildPrefix(name, index);
this.writeFn(`${prefix}${buffer}\n`);
this.lineBuffers.set(name, '');
}
index++;
}
}
/**
* Print completion summary after all sub-steps finish.
*
* Format:
* ```
* parallel-review results
* arch-review: approved
* security-review: rejected
*
* ```
*/
printSummary(
parentStepName: string,
results: Array<{ name: string; condition: string | undefined }>,
): void {
this.flush();
const maxResultNameLength = Math.max(...results.map((r) => r.name.length));
const resultLines = results.map((r) => {
const padding = ' '.repeat(maxResultNameLength - r.name.length);
const condition = r.condition ?? '(no result)';
return ` ${r.name}:${padding} ${condition}`;
});
// Header line: ── name results ──
const headerText = ` ${parentStepName} results `;
const maxLineLength = Math.max(
headerText.length + 4, // 4 for "── " + " ──"
...resultLines.map((l) => l.length),
);
const sideWidth = Math.max(1, Math.floor((maxLineLength - headerText.length) / 2));
const headerLine = `${'─'.repeat(sideWidth)}${headerText}${'─'.repeat(sideWidth)}`;
const footerLine = '─'.repeat(headerLine.length);
this.writeFn(`${headerLine}\n`);
for (const line of resultLines) {
this.writeFn(`${line}\n`);
}
this.writeFn(`${footerLine}\n`);
}
}