From 9c05b45e1e91b7a6fd9c4b6f0b54ac5c1b2384ab Mon Sep 17 00:00:00 2001 From: nrslib <38722970+nrslib@users.noreply.github.com> Date: Fri, 30 Jan 2026 17:07:18 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E3=83=AB=E3=83=BC=E3=83=AB=E3=83=9E?= =?UTF-8?q?=E3=83=83=E3=83=81=E6=96=B9=E6=B3=95=E3=81=AE=E5=8F=AF=E8=A6=96?= =?UTF-8?q?=E5=8C=96=E3=81=A85=E6=AE=B5=E9=9A=8E=E3=83=95=E3=82=A9?= =?UTF-8?q?=E3=83=BC=E3=83=AB=E3=83=90=E3=83=83=E3=82=AF=E6=A4=9C=E5=87=BA?= =?UTF-8?q?=E3=82=92=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - RuleMatchMethod型を追加し、検出方法(aggregate/phase3_tag/phase1_tag/ai_judge/ai_judge_fallback)を記録 - detectMatchedRuleを5段階フォールバックに拡張(Phase3タグ→Phase1タグ→AI judge→全条件AI judge) - matchedRuleMethodをセッションログとUI出力の両方に表示 - Phase 3のmaxTurnsを3に増加 - ParallelLoggerによるパラレルステップのプレフィックス付き出力を追加 --- src/__tests__/parallel-logger.test.ts | 417 ++++++++++++++++++++++++++ src/commands/workflowExecution.ts | 4 +- src/models/index.ts | 1 + src/models/types.ts | 10 + src/utils/session.ts | 6 + src/workflow/engine.ts | 116 +++++-- src/workflow/parallel-logger.ts | 206 +++++++++++++ 7 files changed, 741 insertions(+), 19 deletions(-) create mode 100644 src/__tests__/parallel-logger.test.ts create mode 100644 src/workflow/parallel-logger.ts diff --git a/src/__tests__/parallel-logger.test.ts b/src/__tests__/parallel-logger.test.ts new file mode 100644 index 0000000..893954f --- /dev/null +++ b/src/__tests__/parallel-logger.test.ts @@ -0,0 +1,417 @@ +/** + * Tests for parallel-logger module + */ + +import { describe, it, expect, beforeEach } from 'vitest'; +import { ParallelLogger } from '../workflow/parallel-logger.js'; +import type { StreamEvent } from '../claude/types.js'; + +describe('ParallelLogger', () => { + let output: string[]; + let writeFn: (text: string) => void; + + beforeEach(() => { + output = []; + writeFn = (text: string) => output.push(text); + }); + + describe('buildPrefix', () => { + it('should build colored prefix with padding', () => { + const logger = new ParallelLogger({ + subStepNames: ['arch-review', 'sec'], + writeFn, + }); + + // arch-review is longest (11 chars), sec gets padding + const prefix = logger.buildPrefix('sec', 1); + // yellow color for index 1 + expect(prefix).toContain('[sec]'); + expect(prefix).toContain('\x1b[33m'); // yellow + expect(prefix).toContain('\x1b[0m'); // reset + // 11 - 3 = 8 spaces of padding + expect(prefix).toMatch(/\x1b\[0m {8} $/); + }); + + it('should cycle colors for index >= 4', () => { + const logger = new ParallelLogger({ + subStepNames: ['a', 'b', 'c', 'd', 'e'], + writeFn, + }); + + const prefix0 = logger.buildPrefix('a', 0); + const prefix4 = logger.buildPrefix('e', 4); + // Both should use cyan (\x1b[36m) + expect(prefix0).toContain('\x1b[36m'); + expect(prefix4).toContain('\x1b[36m'); + }); + + it('should assign correct colors in order', () => { + const logger = new ParallelLogger({ + subStepNames: ['a', 'b', 'c', 'd'], + writeFn, + }); + + expect(logger.buildPrefix('a', 0)).toContain('\x1b[36m'); // cyan + expect(logger.buildPrefix('b', 1)).toContain('\x1b[33m'); // yellow + expect(logger.buildPrefix('c', 2)).toContain('\x1b[35m'); // magenta + expect(logger.buildPrefix('d', 3)).toContain('\x1b[32m'); // green + }); + + it('should have no extra padding for longest name', () => { + const logger = new ParallelLogger({ + subStepNames: ['long-name', 'short'], + writeFn, + }); + + const prefix = logger.buildPrefix('long-name', 0); + // No padding needed (0 spaces) + expect(prefix).toMatch(/\x1b\[0m $/); + }); + }); + + describe('text event line buffering', () => { + it('should buffer partial line and output on newline', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + const handler = logger.createStreamHandler('step-a', 0); + + // Partial text (no newline) + handler({ type: 'text', data: { text: 'Hello' } } as StreamEvent); + expect(output).toHaveLength(0); + + // Complete the line + handler({ type: 'text', data: { text: ' World\n' } } as StreamEvent); + expect(output).toHaveLength(1); + expect(output[0]).toContain('[step-a]'); + expect(output[0]).toContain('Hello World'); + expect(output[0]).toMatch(/\n$/); + }); + + it('should handle multiple lines in single text event', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + const handler = logger.createStreamHandler('step-a', 0); + + handler({ type: 'text', data: { text: 'Line 1\nLine 2\n' } } as StreamEvent); + expect(output).toHaveLength(2); + expect(output[0]).toContain('Line 1'); + expect(output[1]).toContain('Line 2'); + }); + + it('should output empty line without prefix', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + const handler = logger.createStreamHandler('step-a', 0); + + handler({ type: 'text', data: { text: 'Hello\n\nWorld\n' } } as StreamEvent); + expect(output).toHaveLength(3); + expect(output[0]).toContain('Hello'); + expect(output[1]).toBe('\n'); // empty line without prefix + expect(output[2]).toContain('World'); + }); + + it('should keep trailing partial in buffer', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + const handler = logger.createStreamHandler('step-a', 0); + + handler({ type: 'text', data: { text: 'Complete\nPartial' } } as StreamEvent); + expect(output).toHaveLength(1); + expect(output[0]).toContain('Complete'); + + // Flush remaining + logger.flush(); + expect(output).toHaveLength(2); + expect(output[1]).toContain('Partial'); + }); + }); + + describe('block events (tool_use, tool_result, tool_output, thinking)', () => { + it('should prefix tool_use events', () => { + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + handler({ + type: 'tool_use', + data: { tool: 'Read', input: {}, id: '1' }, + } as StreamEvent); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('[sub-a]'); + expect(output[0]).toContain('[tool] Read'); + }); + + it('should prefix tool_result events', () => { + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + handler({ + type: 'tool_result', + data: { content: 'File content here', isError: false }, + } as StreamEvent); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('File content here'); + }); + + it('should prefix multi-line tool output', () => { + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + handler({ + type: 'tool_output', + data: { tool: 'Bash', output: 'line1\nline2' }, + } as StreamEvent); + + expect(output).toHaveLength(2); + expect(output[0]).toContain('line1'); + expect(output[1]).toContain('line2'); + }); + + it('should prefix thinking events', () => { + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + handler({ + type: 'thinking', + data: { thinking: 'Considering options...' }, + } as StreamEvent); + + expect(output).toHaveLength(1); + expect(output[0]).toContain('Considering options...'); + }); + }); + + describe('delegated events (init, result, error)', () => { + it('should delegate init event to parent callback', () => { + const parentEvents: StreamEvent[] = []; + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + parentOnStream: (event) => parentEvents.push(event), + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + const initEvent: StreamEvent = { + type: 'init', + data: { model: 'claude-3', sessionId: 'sess-1' }, + }; + handler(initEvent); + + expect(parentEvents).toHaveLength(1); + expect(parentEvents[0]).toBe(initEvent); + expect(output).toHaveLength(0); // Not written to stdout + }); + + it('should delegate result event to parent callback', () => { + const parentEvents: StreamEvent[] = []; + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + parentOnStream: (event) => parentEvents.push(event), + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + const resultEvent: StreamEvent = { + type: 'result', + data: { result: 'done', sessionId: 'sess-1', success: true }, + }; + handler(resultEvent); + + expect(parentEvents).toHaveLength(1); + expect(parentEvents[0]).toBe(resultEvent); + }); + + it('should delegate error event to parent callback', () => { + const parentEvents: StreamEvent[] = []; + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + parentOnStream: (event) => parentEvents.push(event), + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + const errorEvent: StreamEvent = { + type: 'error', + data: { message: 'Something went wrong' }, + }; + handler(errorEvent); + + expect(parentEvents).toHaveLength(1); + expect(parentEvents[0]).toBe(errorEvent); + }); + + it('should not crash when no parent callback for delegated events', () => { + const logger = new ParallelLogger({ + subStepNames: ['sub-a'], + writeFn, + }); + const handler = logger.createStreamHandler('sub-a', 0); + + // Should not throw + handler({ type: 'init', data: { model: 'claude-3', sessionId: 'sess-1' } } as StreamEvent); + handler({ type: 'result', data: { result: 'done', sessionId: 'sess-1', success: true } } as StreamEvent); + handler({ type: 'error', data: { message: 'err' } } as StreamEvent); + + expect(output).toHaveLength(0); + }); + }); + + describe('flush', () => { + it('should output remaining buffered content', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a', 'step-b'], + writeFn, + }); + const handlerA = logger.createStreamHandler('step-a', 0); + const handlerB = logger.createStreamHandler('step-b', 1); + + handlerA({ type: 'text', data: { text: 'partial-a' } } as StreamEvent); + handlerB({ type: 'text', data: { text: 'partial-b' } } as StreamEvent); + + expect(output).toHaveLength(0); + + logger.flush(); + + expect(output).toHaveLength(2); + expect(output[0]).toContain('partial-a'); + expect(output[1]).toContain('partial-b'); + }); + + it('should not output empty buffers', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a', 'step-b'], + writeFn, + }); + const handlerA = logger.createStreamHandler('step-a', 0); + + handlerA({ type: 'text', data: { text: 'content\n' } } as StreamEvent); + output.length = 0; // Clear previous output + + logger.flush(); + expect(output).toHaveLength(0); // Nothing to flush + }); + }); + + describe('printSummary', () => { + it('should print completion summary', () => { + const logger = new ParallelLogger({ + subStepNames: ['arch-review', 'security-review'], + writeFn, + }); + + logger.printSummary('parallel-review', [ + { name: 'arch-review', condition: 'approved' }, + { name: 'security-review', condition: 'rejected' }, + ]); + + const fullOutput = output.join(''); + expect(fullOutput).toContain('parallel-review results'); + expect(fullOutput).toContain('arch-review:'); + expect(fullOutput).toContain('approved'); + expect(fullOutput).toContain('security-review:'); + expect(fullOutput).toContain('rejected'); + // Header and footer contain ─ + expect(fullOutput).toContain('─'); + }); + + it('should show (no result) for undefined condition', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + + logger.printSummary('parallel-step', [ + { name: 'step-a', condition: undefined }, + ]); + + const fullOutput = output.join(''); + expect(fullOutput).toContain('(no result)'); + }); + + it('should right-pad sub-step names to align results', () => { + const logger = new ParallelLogger({ + subStepNames: ['short', 'very-long-name'], + writeFn, + }); + + logger.printSummary('test', [ + { name: 'short', condition: 'done' }, + { name: 'very-long-name', condition: 'done' }, + ]); + + // Find the result lines (indented with 2 spaces) + const resultLines = output.filter((l) => l.startsWith(' ')); + expect(resultLines).toHaveLength(2); + + // Both 'done' values should be at the same column + const doneIndex0 = resultLines[0]!.indexOf('done'); + const doneIndex1 = resultLines[1]!.indexOf('done'); + expect(doneIndex0).toBe(doneIndex1); + }); + + it('should flush remaining buffers before printing summary', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a'], + writeFn, + }); + const handler = logger.createStreamHandler('step-a', 0); + + // Leave partial content in buffer + handler({ type: 'text', data: { text: 'trailing content' } } as StreamEvent); + + logger.printSummary('test', [ + { name: 'step-a', condition: 'done' }, + ]); + + // First output should be the flushed buffer + expect(output[0]).toContain('trailing content'); + // Then the summary + const fullOutput = output.join(''); + expect(fullOutput).toContain('test results'); + }); + }); + + describe('interleaved output from multiple sub-steps', () => { + it('should correctly interleave prefixed output', () => { + const logger = new ParallelLogger({ + subStepNames: ['step-a', 'step-b'], + writeFn, + }); + const handlerA = logger.createStreamHandler('step-a', 0); + const handlerB = logger.createStreamHandler('step-b', 1); + + handlerA({ type: 'text', data: { text: 'A output\n' } } as StreamEvent); + handlerB({ type: 'text', data: { text: 'B output\n' } } as StreamEvent); + handlerA({ type: 'text', data: { text: 'A second\n' } } as StreamEvent); + + expect(output).toHaveLength(3); + expect(output[0]).toContain('[step-a]'); + expect(output[0]).toContain('A output'); + expect(output[1]).toContain('[step-b]'); + expect(output[1]).toContain('B output'); + expect(output[2]).toContain('[step-a]'); + expect(output[2]).toContain('A second'); + }); + }); +}); diff --git a/src/commands/workflowExecution.ts b/src/commands/workflowExecution.ts index 0c7be9f..b727db2 100644 --- a/src/commands/workflowExecution.ts +++ b/src/commands/workflowExecution.ts @@ -190,6 +190,7 @@ export async function executeWorkflow( step: step.name, status: response.status, matchedRuleIndex: response.matchedRuleIndex, + matchedRuleMethod: response.matchedRuleMethod, contentLength: response.content.length, sessionId: response.sessionId, error: response.error, @@ -203,7 +204,8 @@ export async function executeWorkflow( if (response.matchedRuleIndex != null && step.rules) { const rule = step.rules[response.matchedRuleIndex]; if (rule) { - status('Status', rule.condition); + const methodLabel = response.matchedRuleMethod ? ` (${response.matchedRuleMethod})` : ''; + status('Status', `${rule.condition}${methodLabel}`); } else { status('Status', response.status); } diff --git a/src/models/index.ts b/src/models/index.ts index 5b3e865..b43dd20 100644 --- a/src/models/index.ts +++ b/src/models/index.ts @@ -2,6 +2,7 @@ export type { AgentType, Status, + RuleMatchMethod, ReportConfig, ReportObjectConfig, AgentResponse, diff --git a/src/models/types.ts b/src/models/types.ts index 7db34e1..fc0bd83 100644 --- a/src/models/types.ts +++ b/src/models/types.ts @@ -17,6 +17,14 @@ export type Status = | 'interrupted' | 'answer'; +/** How a rule match was detected */ +export type RuleMatchMethod = + | 'aggregate' + | 'phase3_tag' + | 'phase1_tag' + | 'ai_judge' + | 'ai_judge_fallback'; + /** Response from an agent execution */ export interface AgentResponse { agent: string; @@ -28,6 +36,8 @@ export interface AgentResponse { error?: string; /** Matched rule index (0-based) when rules-based detection was used */ matchedRuleIndex?: number; + /** How the rule match was detected */ + matchedRuleMethod?: RuleMatchMethod; } /** Session state for workflow execution */ diff --git a/src/utils/session.ts b/src/utils/session.ts index fa88f27..1ab50da 100644 --- a/src/utils/session.ts +++ b/src/utils/session.ts @@ -24,6 +24,10 @@ export interface SessionLog { timestamp: string; content: string; error?: string; + /** Matched rule index (0-based) when rules-based detection was used */ + matchedRuleIndex?: number; + /** How the rule match was detected */ + matchedRuleMethod?: string; }>; } @@ -88,6 +92,8 @@ export function addToSessionLog( timestamp: response.timestamp.toISOString(), content: response.content, ...(response.error ? { error: response.error } : {}), + ...(response.matchedRuleIndex != null ? { matchedRuleIndex: response.matchedRuleIndex } : {}), + ...(response.matchedRuleMethod ? { matchedRuleMethod: response.matchedRuleMethod } : {}), }); log.iterations++; } diff --git a/src/workflow/engine.ts b/src/workflow/engine.ts index 612e0d4..e06fcd4 100644 --- a/src/workflow/engine.ts +++ b/src/workflow/engine.ts @@ -10,6 +10,7 @@ import type { WorkflowState, WorkflowStep, AgentResponse, + RuleMatchMethod, } from '../models/types.js'; import { runAgent, type RunAgentOptions } from '../agents/runner.js'; import { COMPLETE_STEP, ABORT_STEP, ERROR_MESSAGES } from './constants.js'; @@ -19,6 +20,7 @@ import { detectRuleIndex, callAiJudge } from '../claude/client.js'; import { buildInstruction as buildInstructionFromTemplate, buildReportInstruction as buildReportInstructionFromTemplate, buildStatusJudgmentInstruction as buildStatusJudgmentInstructionFromTemplate, isReportObjectConfig } from './instruction-builder.js'; import { LoopDetector } from './loop-detector.js'; import { handleBlocked } from './blocked-handler.js'; +import { ParallelLogger } from './parallel-logger.js'; import { createInitialState, addUserInput, @@ -263,8 +265,10 @@ export class WorkflowEngine extends EventEmitter { * Detect matched rule for a step's response. * Evaluation order (first match wins): * 1. Aggregate conditions: all()/any() — evaluate sub-step results - * 2. Standard [STEP:N] tag detection (from tagContent, i.e. Phase 3 output) - * 3. ai() condition evaluation via AI judge (from agentContent, i.e. Phase 1 output) + * 2. Tag detection from Phase 3 output + * 3. Tag detection from Phase 1 output (fallback) + * 4. ai() condition evaluation via AI judge + * 5. All-conditions AI judge (final fallback) * * Returns undefined for steps without rules. * Throws if rules exist but no rule matched (Fail Fast). @@ -273,27 +277,41 @@ export class WorkflowEngine extends EventEmitter { * @param agentContent - Phase 1 output (main execution) * @param tagContent - Phase 3 output (status judgment); empty string skips tag detection */ - private async detectMatchedRule(step: WorkflowStep, agentContent: string, tagContent: string): Promise { + private async detectMatchedRule(step: WorkflowStep, agentContent: string, tagContent: string): Promise<{ index: number; method: RuleMatchMethod } | undefined> { if (!step.rules || step.rules.length === 0) return undefined; // 1. Aggregate conditions (all/any) — only meaningful for parallel parent steps const aggIndex = this.evaluateAggregateConditions(step); if (aggIndex >= 0) { - return aggIndex; + return { index: aggIndex, method: 'aggregate' }; } - // 2. Standard tag detection (from Phase 3 output) + // 2. Tag detection from Phase 3 output if (tagContent) { const ruleIndex = detectRuleIndex(tagContent, step.name); if (ruleIndex >= 0 && ruleIndex < step.rules.length) { - return ruleIndex; + return { index: ruleIndex, method: 'phase3_tag' }; } } - // 3. AI judge fallback (from Phase 1 output) + // 3. Tag detection from Phase 1 output (fallback) + if (agentContent) { + const ruleIndex = detectRuleIndex(agentContent, step.name); + if (ruleIndex >= 0 && ruleIndex < step.rules.length) { + return { index: ruleIndex, method: 'phase1_tag' }; + } + } + + // 4. AI judge for ai() conditions only const aiRuleIndex = await this.evaluateAiConditions(step, agentContent); if (aiRuleIndex >= 0) { - return aiRuleIndex; + return { index: aiRuleIndex, method: 'ai_judge' }; + } + + // 5. AI judge for all conditions (final fallback) + const fallbackIndex = await this.evaluateAllConditionsViaAiJudge(step, agentContent); + if (fallbackIndex >= 0) { + return { index: fallbackIndex, method: 'ai_judge_fallback' }; } throw new Error(`Status not found for step "${step.name}": no rule matched after all detection phases`); @@ -381,9 +399,10 @@ export class WorkflowEngine extends EventEmitter { tagContent = await this.runStatusJudgmentPhase(step); } - const matchedRuleIndex = await this.detectMatchedRule(step, response.content, tagContent); - if (matchedRuleIndex != null) { - response = { ...response, matchedRuleIndex }; + const match = await this.detectMatchedRule(step, response.content, tagContent); + if (match) { + log.debug('Rule matched', { step: step.name, ruleIndex: match.index, method: match.method }); + response = { ...response, matchedRuleIndex: match.index, matchedRuleMethod: match.method }; } this.state.stepOutputs.set(step.name, response); @@ -454,7 +473,7 @@ export class WorkflowEngine extends EventEmitter { const judgmentOptions = this.buildResumeOptions(step, sessionId, { allowedTools: [], - maxTurns: 1, + maxTurns: 3, }); const judgmentResponse = await runAgent(step.agent, judgmentInstruction, judgmentOptions); @@ -469,6 +488,9 @@ export class WorkflowEngine extends EventEmitter { /** * Run a parallel step: execute all sub-steps concurrently, then aggregate results. * The aggregated output becomes the parent step's response for rules evaluation. + * + * When onStream is provided, uses ParallelLogger to prefix each sub-step's + * output with `[name]` for readable interleaved display. */ private async runParallelStep(step: WorkflowStep): Promise<{ response: AgentResponse; instruction: string }> { const subSteps = step.parallel!; @@ -479,14 +501,28 @@ export class WorkflowEngine extends EventEmitter { stepIteration, }); + // Create parallel logger for prefixed output (only when streaming is enabled) + const parallelLogger = this.options.onStream + ? new ParallelLogger({ + subStepNames: subSteps.map((s) => s.name), + parentOnStream: this.options.onStream, + }) + : undefined; + // Run all sub-steps concurrently const subResults = await Promise.all( - subSteps.map(async (subStep) => { + subSteps.map(async (subStep, index) => { const subIteration = incrementStepIteration(this.state, subStep.name); const subInstruction = this.buildInstruction(subStep, subIteration); // Phase 1: main execution (Write excluded if sub-step has report) const agentOptions = this.buildAgentOptions(subStep); + + // Override onStream with parallel logger's prefixed handler + if (parallelLogger) { + agentOptions.onStream = parallelLogger.createStreamHandler(subStep.name, index); + } + const subResponse = await runAgent(subStep.agent, subInstruction, agentOptions); this.updateAgentSession(subStep.agent, subResponse.sessionId); @@ -501,9 +537,9 @@ export class WorkflowEngine extends EventEmitter { subTagContent = await this.runStatusJudgmentPhase(subStep); } - const matchedRuleIndex = await this.detectMatchedRule(subStep, subResponse.content, subTagContent); - const finalResponse = matchedRuleIndex != null - ? { ...subResponse, matchedRuleIndex } + const match = await this.detectMatchedRule(subStep, subResponse.content, subTagContent); + const finalResponse = match + ? { ...subResponse, matchedRuleIndex: match.index, matchedRuleMethod: match.method } : subResponse; this.state.stepOutputs.set(subStep.name, finalResponse); @@ -513,6 +549,19 @@ export class WorkflowEngine extends EventEmitter { }), ); + // Print completion summary + if (parallelLogger) { + parallelLogger.printSummary( + step.name, + subResults.map((r) => ({ + name: r.subStep.name, + condition: r.response.matchedRuleIndex != null && r.subStep.rules + ? r.subStep.rules[r.response.matchedRuleIndex]?.condition + : undefined, + })), + ); + } + // Aggregate sub-step outputs into parent step's response const aggregatedContent = subResults .map((r) => `## ${r.subStep.name}\n${r.response.content}`) @@ -523,14 +572,14 @@ export class WorkflowEngine extends EventEmitter { .join('\n\n'); // Parent step uses aggregate conditions, so tagContent is empty - const matchedRuleIndex = await this.detectMatchedRule(step, aggregatedContent, ''); + const match = await this.detectMatchedRule(step, aggregatedContent, ''); const aggregatedResponse: AgentResponse = { agent: step.name, status: 'done', content: aggregatedContent, timestamp: new Date(), - ...(matchedRuleIndex != null && { matchedRuleIndex }), + ...(match && { matchedRuleIndex: match.index, matchedRuleMethod: match.method }), }; this.state.stepOutputs.set(step.name, aggregatedResponse); @@ -580,6 +629,37 @@ export class WorkflowEngine extends EventEmitter { return -1; } + /** + * Final fallback: evaluate ALL rule conditions via AI judge. + * Unlike evaluateAiConditions (which only handles ai() flagged rules), + * this sends every rule's condition text to the judge. + * Returns the 0-based rule index, or -1 if no match. + */ + private async evaluateAllConditionsViaAiJudge(step: WorkflowStep, agentOutput: string): Promise { + if (!step.rules || step.rules.length === 0) return -1; + + const conditions = step.rules.map((rule, i) => ({ index: i, text: rule.condition })); + + log.debug('Evaluating all conditions via AI judge (final fallback)', { + step: step.name, + conditionCount: conditions.length, + }); + + const judgeResult = await callAiJudge(agentOutput, conditions, { cwd: this.cwd }); + + if (judgeResult >= 0 && judgeResult < conditions.length) { + log.debug('AI judge (fallback) matched condition', { + step: step.name, + ruleIndex: judgeResult, + condition: conditions[judgeResult]!.text, + }); + return judgeResult; + } + + log.debug('AI judge (fallback) did not match any condition', { step: step.name }); + return -1; + } + /** * Determine next step for a completed step using rules-based routing. */ diff --git a/src/workflow/parallel-logger.ts b/src/workflow/parallel-logger.ts new file mode 100644 index 0000000..31d14a3 --- /dev/null +++ b/src/workflow/parallel-logger.ts @@ -0,0 +1,206 @@ +/** + * Parallel step log display + * + * Provides prefixed, color-coded interleaved output for parallel sub-steps. + * Each sub-step's stream output gets a `[name]` prefix with right-padding + * aligned to the longest sub-step name. + */ + +import type { StreamCallback, StreamEvent } from '../claude/types.js'; + +/** ANSI color codes for sub-step prefixes (cycled in order) */ +const COLORS = ['\x1b[36m', '\x1b[33m', '\x1b[35m', '\x1b[32m'] as const; // cyan, yellow, magenta, green +const RESET = '\x1b[0m'; + +export interface ParallelLoggerOptions { + /** Sub-step names (used to calculate prefix width) */ + subStepNames: string[]; + /** Parent onStream callback to delegate non-prefixed events */ + parentOnStream?: StreamCallback; + /** Override process.stdout.write for testing */ + writeFn?: (text: string) => void; +} + +/** + * Logger for parallel step execution. + * + * Creates per-sub-step StreamCallback wrappers that: + * - Buffer partial lines until newline + * - Prepend colored `[name]` prefix to each complete line + * - Delegate init/result/error events to the parent callback + */ +export class ParallelLogger { + private readonly maxNameLength: number; + private readonly lineBuffers: Map = new Map(); + private readonly parentOnStream?: StreamCallback; + private readonly writeFn: (text: string) => void; + + constructor(options: ParallelLoggerOptions) { + this.maxNameLength = Math.max(...options.subStepNames.map((n) => n.length)); + this.parentOnStream = options.parentOnStream; + this.writeFn = options.writeFn ?? ((text: string) => process.stdout.write(text)); + + for (const name of options.subStepNames) { + this.lineBuffers.set(name, ''); + } + } + + /** + * Build the colored prefix string for a sub-step. + * Format: `\x1b[COLORm[name]\x1b[0m` + padding spaces + */ + buildPrefix(name: string, index: number): string { + const color = COLORS[index % COLORS.length]!; + const padding = ' '.repeat(this.maxNameLength - name.length); + return `${color}[${name}]${RESET}${padding} `; + } + + /** + * Create a StreamCallback wrapper for a specific sub-step. + * + * - `text`: buffered line-by-line with prefix + * - `tool_use`, `tool_result`, `tool_output`, `thinking`: prefixed per-line, no buffering + * - `init`, `result`, `error`: delegated to parent callback (no prefix) + */ + createStreamHandler(subStepName: string, index: number): StreamCallback { + const prefix = this.buildPrefix(subStepName, index); + + return (event: StreamEvent) => { + switch (event.type) { + case 'text': + this.handleTextEvent(subStepName, prefix, event.data.text); + break; + + case 'tool_use': + case 'tool_result': + case 'tool_output': + case 'thinking': + this.handleBlockEvent(prefix, event); + break; + + case 'init': + case 'result': + case 'error': + // Delegate to parent without prefix + this.parentOnStream?.(event); + break; + } + }; + } + + /** + * Handle text event with line buffering. + * Buffer until newline, then output prefixed complete lines. + * Empty lines get no prefix per spec. + */ + private handleTextEvent(name: string, prefix: string, text: string): void { + const buffer = this.lineBuffers.get(name) ?? ''; + const combined = buffer + text; + const parts = combined.split('\n'); + + // Last part is incomplete (no trailing newline) — keep in buffer + this.lineBuffers.set(name, parts.pop()!); + + // Output all complete lines + for (const line of parts) { + if (line === '') { + this.writeFn('\n'); + } else { + this.writeFn(`${prefix}${line}\n`); + } + } + } + + /** + * Handle block events (tool_use, tool_result, tool_output, thinking). + * Output with prefix, splitting multi-line content. + */ + private handleBlockEvent(prefix: string, event: StreamEvent): void { + let text: string; + switch (event.type) { + case 'tool_use': + text = `[tool] ${event.data.tool}`; + break; + case 'tool_result': + text = event.data.content; + break; + case 'tool_output': + text = event.data.output; + break; + case 'thinking': + text = event.data.thinking; + break; + default: + return; + } + + for (const line of text.split('\n')) { + if (line === '') { + this.writeFn('\n'); + } else { + this.writeFn(`${prefix}${line}\n`); + } + } + } + + /** + * Flush remaining line buffers for all sub-steps. + * Call after all sub-steps complete to output any trailing partial lines. + */ + flush(): void { + // Build prefixes for flush — need index mapping + // Since we don't store index, iterate lineBuffers in insertion order + // (Map preserves insertion order, matching subStepNames order) + let index = 0; + for (const [name, buffer] of this.lineBuffers) { + if (buffer !== '') { + const prefix = this.buildPrefix(name, index); + this.writeFn(`${prefix}${buffer}\n`); + this.lineBuffers.set(name, ''); + } + index++; + } + } + + /** + * Print completion summary after all sub-steps finish. + * + * Format: + * ``` + * ── parallel-review results ── + * arch-review: approved + * security-review: rejected + * ────────────────────────────── + * ``` + */ + printSummary( + parentStepName: string, + results: Array<{ name: string; condition: string | undefined }>, + ): void { + this.flush(); + + const maxResultNameLength = Math.max(...results.map((r) => r.name.length)); + + const resultLines = results.map((r) => { + const padding = ' '.repeat(maxResultNameLength - r.name.length); + const condition = r.condition ?? '(no result)'; + return ` ${r.name}:${padding} ${condition}`; + }); + + // Header line: ── name results ── + const headerText = ` ${parentStepName} results `; + const maxLineLength = Math.max( + headerText.length + 4, // 4 for "── " + " ──" + ...resultLines.map((l) => l.length), + ); + const sideWidth = Math.max(1, Math.floor((maxLineLength - headerText.length) / 2)); + const headerLine = `${'─'.repeat(sideWidth)}${headerText}${'─'.repeat(sideWidth)}`; + const footerLine = '─'.repeat(headerLine.length); + + this.writeFn(`${headerLine}\n`); + for (const line of resultLines) { + this.writeFn(`${line}\n`); + } + this.writeFn(`${footerLine}\n`); + } +}